new module CCSemaphore in containers.thread, with simple semaphore

This commit is contained in:
Simon Cruanes 2015-08-31 21:18:17 +02:00
parent 7fec8ca8c2
commit deab575bb3
5 changed files with 163 additions and 4 deletions

View file

@ -172,6 +172,15 @@ In the module `Containers_advanced`:
- `CCCat`, a few categorical structures - `CCCat`, a few categorical structures
- `CCBatch`, to combine operations on collections into one traversal - `CCBatch`, to combine operations on collections into one traversal
### Thread
In the library `containers.thread`, for preemptive system threads:
- `CCFuture`, a set of tools for preemptive threading, including a thread pool,
monadic futures, and MVars (concurrent boxes)
- `CCLock`, values protected by locks
- `CCSemaphore`, a simple implementation of semaphores
### Misc ### Misc
See [doc](http://cedeela.fr/~simon/software/containers/misc). This list See [doc](http://cedeela.fr/~simon/software/containers/misc). This list
@ -191,9 +200,6 @@ is not necessarily up-to-date.
### Others ### Others
- `Future`, a set of tools for preemptive threading, including a thread pool,
monadic futures, and MVars (concurrent boxes)
- `containers.lwt` contains [Lwt](http://ocsigen.org/lwt/)-related modules (experimental) - `containers.lwt` contains [Lwt](http://ocsigen.org/lwt/)-related modules (experimental)
There is a QuickCheck-like library called `QCheck` (now in its own repo). There is a QuickCheck-like library called `QCheck` (now in its own repo).

2
_oasis
View file

@ -130,7 +130,7 @@ Library "containers_misc"
Library "containers_thread" Library "containers_thread"
Path: src/threads/ Path: src/threads/
Modules: CCFuture, CCLock Modules: CCFuture, CCLock, CCSemaphore
FindlibName: thread FindlibName: thread
FindlibParent: containers FindlibParent: containers
Build$: flag(thread) Build$: flag(thread)

View file

@ -156,6 +156,7 @@ Lwt_pipe
{!modules: {!modules:
CCFuture CCFuture
CCLock CCLock
CCSemaphore
} }

119
src/threads/CCSemaphore.ml Normal file
View file

@ -0,0 +1,119 @@
(** {1 Semaphores} *)
type t = {
mutable n : int;
mutex : Mutex.t;
cond : Condition.t;
}
let create n = {
n;
mutex=Mutex.create();
cond=Condition.create();
}
let get t = t.n
(* assume [t.mutex] locked, try to acquire [t] *)
let acquire_once_locked_ m t =
while t.n < m do
Condition.wait t.cond t.mutex;
done;
assert (t.n >= m);
t.n <- t.n - m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let acquire m t =
Mutex.lock t.mutex;
acquire_once_locked_ m t
(* assume [t.mutex] locked, try to release [t] *)
let release_once_locked_ m t =
t.n <- t.n + m;
Condition.broadcast t.cond;
Mutex.unlock t.mutex
let release m t =
Mutex.lock t.mutex;
release_once_locked_ m t;
()
(*$R
let s = create 1 in
let r = CCLock.create false in
let _ = Thread.create (fun s -> acquire 5 s; CCLock.set r true) s in
Thread.yield ();
assert_equal false (CCLock.get r);
release 4 s;
Thread.delay 0.2;
assert_equal true (CCLock.get r);
assert_equal 0 (get s)
*)
let with_acquire ~n t ~f =
Mutex.lock t.mutex;
acquire_once_locked_ n t;
try
let x = f() in
release_once_locked_ n t;
x
with e ->
release_once_locked_ n t;
raise e
(*$R
let s = create 5 in
let n = CCLock.create 0 in
let a = Array.init 100 (fun i ->
Thread.create (fun _ ->
with_acquire ~n:(1 + (i mod 5)) s
~f:(fun () -> CCLock.incr n)
) ())
in
Array.iter Thread.join a;
assert_equal ~printer:CCInt.to_string 5 (get s);
assert_equal ~printer:CCInt.to_string 100 (CCLock.get n)
*)
let wait_until_at_least ~n t ~f =
Mutex.lock t.mutex;
while t.n < n do
Condition.wait t.cond t.mutex;
done;
assert (t.n >= n);
Mutex.unlock t.mutex;
f ()
(*$R
let output s = () in
let s = create 2 in
let res = CCLock.create false in
let id = Thread.create
(fun () ->
output "start";
wait_until_at_least ~n:5 s
~f:(fun () ->
assert (get s >= 5);
output "modify now";
CCLock.set res true)
) ()
in
output "launched thread";
Thread.yield();
assert_bool "start" (not (CCLock.get res));
output "release 2";
release 2 s;
Thread.yield();
assert_bool "after release 2" (not (CCLock.get res));
output "release 1";
release 1 s;
(* should work now *)
Thread.delay 0.2;
Thread.join id;
output "check";
assert_bool "after release 1" (CCLock.get res)
*)

View file

@ -0,0 +1,33 @@
(* This file is free software, part of containers. See file "license" for more details. *)
(** {1 Semaphores}
@since NEXT_RELEASE *)
type t
(** A semaphore *)
val create : int -> t
(** [create n] creates a semaphore with initial value [n]
@raise Invalid_argument if [n < 0] *)
val get : t -> int
(** Current value *)
val acquire : int -> t -> unit
(** [acquire n s] blocks until [get s > n], then atomically
sets [s := !s - n] *)
val release : int -> t -> unit
(** [release n s] atomically sets [s := !s + n] *)
val with_acquire : n:int -> t -> f:(unit -> 'a) -> 'a
(** [with_acquire ~n s ~f] first acquires [s] with [n] units,
calls [f ()], and then release [s] with [n] units.
Safely release the semaphore even if [f ()] fails *)
val wait_until_at_least : n:int -> t -> f:(unit -> 'a) -> 'a
(** [wait_until_at_least ~n s ~f] waits until [get s >= n], then calls [f ()]
and returns its result. Doesn't modify the semaphore. *)