mirror of
https://github.com/c-cube/ocaml-containers.git
synced 2025-12-06 19:25:28 -05:00
add CCThread.Barrier for simple synchronization
This commit is contained in:
parent
c1837dbb9c
commit
470ab8e49c
2 changed files with 83 additions and 0 deletions
|
|
@ -24,6 +24,63 @@ end
|
||||||
assert_equal ~printer:CCInt.to_string n (CCLock.get l)
|
assert_equal ~printer:CCInt.to_string n (CCLock.get l)
|
||||||
*)
|
*)
|
||||||
|
|
||||||
|
module Barrier = struct
|
||||||
|
type t = {
|
||||||
|
lock: Mutex.t;
|
||||||
|
cond: Condition.t;
|
||||||
|
mutable activated: bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
let create () = {
|
||||||
|
lock=Mutex.create();
|
||||||
|
cond=Condition.create();
|
||||||
|
activated=false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let with_lock_ b f =
|
||||||
|
Mutex.lock b.lock;
|
||||||
|
try
|
||||||
|
let x = f () in
|
||||||
|
Mutex.unlock b.lock;
|
||||||
|
x
|
||||||
|
with e ->
|
||||||
|
Mutex.unlock b.lock;
|
||||||
|
raise e
|
||||||
|
|
||||||
|
let reset b = with_lock_ b (fun () -> b.activated <- false)
|
||||||
|
|
||||||
|
let wait b =
|
||||||
|
with_lock_ b
|
||||||
|
(fun () ->
|
||||||
|
while not b.activated do
|
||||||
|
Condition.wait b.cond b.lock
|
||||||
|
done
|
||||||
|
)
|
||||||
|
|
||||||
|
let activate b =
|
||||||
|
with_lock_ b
|
||||||
|
(fun () ->
|
||||||
|
if not b.activated then (
|
||||||
|
b.activated <- true;
|
||||||
|
Condition.broadcast b.cond
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
let activated b = with_lock_ b (fun () -> b.activated)
|
||||||
|
end
|
||||||
|
|
||||||
|
(*$R
|
||||||
|
let b = Barrier.create () in
|
||||||
|
let res = CCLock.create 0 in
|
||||||
|
let t1 = spawn (fun _ -> Barrier.wait b; CCLock.incr res)
|
||||||
|
and t2 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) in
|
||||||
|
Thread.delay 0.2;
|
||||||
|
assert_equal 0 (CCLock.get res);
|
||||||
|
Barrier.activate b;
|
||||||
|
Thread.join t1; Thread.join t2;
|
||||||
|
assert_equal 2 (CCLock.get res)
|
||||||
|
*)
|
||||||
|
|
||||||
module Queue = struct
|
module Queue = struct
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
q : 'a Queue.t;
|
q : 'a Queue.t;
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,32 @@ module Arr : sig
|
||||||
(** [A.join a] joins every thread in [a] *)
|
(** [A.join a] joins every thread in [a] *)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
(** {2 Single-Use Barrier} *)
|
||||||
|
|
||||||
|
module Barrier : sig
|
||||||
|
type t
|
||||||
|
(** Barrier, used to synchronize threads *)
|
||||||
|
|
||||||
|
val create : unit -> t
|
||||||
|
(** Create a barrier *)
|
||||||
|
|
||||||
|
val reset : t -> unit
|
||||||
|
(** Reset to initial (non-triggered) state *)
|
||||||
|
|
||||||
|
val wait : t -> unit
|
||||||
|
(** [wait b] waits for barrier [b] to be activated by [activate b].
|
||||||
|
All threads calling this wait until [activate b] is called.
|
||||||
|
If [b] is already activated, [wait b] does nothing *)
|
||||||
|
|
||||||
|
val activate : t -> unit
|
||||||
|
(** [activate b] unblocks all threads that were waiting on [b] *)
|
||||||
|
|
||||||
|
val activated : t -> bool
|
||||||
|
(** [activated b] returns [true] iff [activate b] was called, and [reset b]
|
||||||
|
was not called since. In other words, [activated b = true] means
|
||||||
|
[wait b] will not block. *)
|
||||||
|
end
|
||||||
|
|
||||||
(** {2 Blocking Queue}
|
(** {2 Blocking Queue}
|
||||||
|
|
||||||
This queue has a limited size. Pushing a value on the queue when it
|
This queue has a limited size. Pushing a value on the queue when it
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue