diff --git a/README.md b/README.md index c6a53326..285b34bf 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,15 @@ In the module `Containers_advanced`: - `CCCat`, a few categorical structures - `CCBatch`, to combine operations on collections into one traversal +### Thread + +In the library `containers.thread`, for preemptive system threads: + +- `CCFuture`, a set of tools for preemptive threading, including a thread pool, + monadic futures, and MVars (concurrent boxes) +- `CCLock`, values protected by locks +- `CCSemaphore`, a simple implementation of semaphores + ### Misc See [doc](http://cedeela.fr/~simon/software/containers/misc). This list @@ -191,9 +200,6 @@ is not necessarily up-to-date. ### Others -- `Future`, a set of tools for preemptive threading, including a thread pool, -monadic futures, and MVars (concurrent boxes) - - `containers.lwt` contains [Lwt](http://ocsigen.org/lwt/)-related modules (experimental) There is a QuickCheck-like library called `QCheck` (now in its own repo). diff --git a/_oasis b/_oasis index f9c58f97..08da485a 100644 --- a/_oasis +++ b/_oasis @@ -130,7 +130,7 @@ Library "containers_misc" Library "containers_thread" Path: src/threads/ - Modules: CCFuture, CCLock + Modules: CCFuture, CCLock, CCSemaphore FindlibName: thread FindlibParent: containers Build$: flag(thread) diff --git a/doc/intro.txt b/doc/intro.txt index df66e5fd..922d9d36 100644 --- a/doc/intro.txt +++ b/doc/intro.txt @@ -156,6 +156,7 @@ Lwt_pipe {!modules: CCFuture CCLock +CCSemaphore } diff --git a/src/threads/CCSemaphore.ml b/src/threads/CCSemaphore.ml new file mode 100644 index 00000000..582e04a6 --- /dev/null +++ b/src/threads/CCSemaphore.ml @@ -0,0 +1,119 @@ + +(** {1 Semaphores} *) + +type t = { + mutable n : int; + mutex : Mutex.t; + cond : Condition.t; +} + +let create n = { + n; + mutex=Mutex.create(); + cond=Condition.create(); +} + +let get t = t.n + +(* assume [t.mutex] locked, try to acquire [t] *) +let acquire_once_locked_ m t = + while t.n < m do + Condition.wait t.cond t.mutex; + done; + assert (t.n >= m); + t.n <- t.n - m; + Condition.broadcast t.cond; + Mutex.unlock t.mutex + +let acquire m t = + Mutex.lock t.mutex; + acquire_once_locked_ m t + +(* assume [t.mutex] locked, try to release [t] *) +let release_once_locked_ m t = + t.n <- t.n + m; + Condition.broadcast t.cond; + Mutex.unlock t.mutex + +let release m t = + Mutex.lock t.mutex; + release_once_locked_ m t; + () + +(*$R + let s = create 1 in + let r = CCLock.create false in + let _ = Thread.create (fun s -> acquire 5 s; CCLock.set r true) s in + Thread.yield (); + assert_equal false (CCLock.get r); + release 4 s; + Thread.delay 0.2; + assert_equal true (CCLock.get r); + assert_equal 0 (get s) +*) + +let with_acquire ~n t ~f = + Mutex.lock t.mutex; + acquire_once_locked_ n t; + try + let x = f() in + release_once_locked_ n t; + x + with e -> + release_once_locked_ n t; + raise e + +(*$R + let s = create 5 in + let n = CCLock.create 0 in + let a = Array.init 100 (fun i -> + Thread.create (fun _ -> + with_acquire ~n:(1 + (i mod 5)) s + ~f:(fun () -> CCLock.incr n) + ) ()) + in + Array.iter Thread.join a; + assert_equal ~printer:CCInt.to_string 5 (get s); + assert_equal ~printer:CCInt.to_string 100 (CCLock.get n) +*) + +let wait_until_at_least ~n t ~f = + Mutex.lock t.mutex; + while t.n < n do + Condition.wait t.cond t.mutex; + done; + assert (t.n >= n); + Mutex.unlock t.mutex; + f () + +(*$R + let output s = () in + let s = create 2 in + let res = CCLock.create false in + let id = Thread.create + (fun () -> + output "start"; + wait_until_at_least ~n:5 s + ~f:(fun () -> + assert (get s >= 5); + output "modify now"; + CCLock.set res true) + ) () + in + output "launched thread"; + Thread.yield(); + assert_bool "start" (not (CCLock.get res)); + output "release 2"; + release 2 s; + Thread.yield(); + assert_bool "after release 2" (not (CCLock.get res)); + output "release 1"; + release 1 s; + (* should work now *) + Thread.delay 0.2; + Thread.join id; + output "check"; + assert_bool "after release 1" (CCLock.get res) +*) + + diff --git a/src/threads/CCSemaphore.mli b/src/threads/CCSemaphore.mli new file mode 100644 index 00000000..11831cc9 --- /dev/null +++ b/src/threads/CCSemaphore.mli @@ -0,0 +1,33 @@ + +(* This file is free software, part of containers. See file "license" for more details. *) + +(** {1 Semaphores} + + @since NEXT_RELEASE *) + +type t +(** A semaphore *) + +val create : int -> t +(** [create n] creates a semaphore with initial value [n] + @raise Invalid_argument if [n < 0] *) + +val get : t -> int +(** Current value *) + +val acquire : int -> t -> unit +(** [acquire n s] blocks until [get s > n], then atomically + sets [s := !s - n] *) + +val release : int -> t -> unit +(** [release n s] atomically sets [s := !s + n] *) + +val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a +(** [with_acquire ~n s ~f] first acquires [s] with [n] units, + calls [f ()], and then release [s] with [n] units. + Safely release the semaphore even if [f ()] fails *) + +val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a +(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()] + and returns its result. Doesn't modify the semaphore. *) +