From 470ab8e49c26455d60a9d87a0f74fc1d6ef158ed Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 1 Sep 2015 14:00:20 +0200 Subject: [PATCH] add `CCThread.Barrier` for simple synchronization --- src/threads/CCThread.ml | 57 ++++++++++++++++++++++++++++++++++++++++ src/threads/CCThread.mli | 26 ++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml index a6085269..a482b030 100644 --- a/src/threads/CCThread.ml +++ b/src/threads/CCThread.ml @@ -24,6 +24,63 @@ end assert_equal ~printer:CCInt.to_string n (CCLock.get l) *) +module Barrier = struct + type t = { + lock: Mutex.t; + cond: Condition.t; + mutable activated: bool; + } + + let create () = { + lock=Mutex.create(); + cond=Condition.create(); + activated=false; + } + + let with_lock_ b f = + Mutex.lock b.lock; + try + let x = f () in + Mutex.unlock b.lock; + x + with e -> + Mutex.unlock b.lock; + raise e + + let reset b = with_lock_ b (fun () -> b.activated <- false) + + let wait b = + with_lock_ b + (fun () -> + while not b.activated do + Condition.wait b.cond b.lock + done + ) + + let activate b = + with_lock_ b + (fun () -> + if not b.activated then ( + b.activated <- true; + Condition.broadcast b.cond + ) + ) + + let activated b = with_lock_ b (fun () -> b.activated) +end + +(*$R + let b = Barrier.create () in + let res = CCLock.create 0 in + let t1 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) + and t2 = spawn (fun _ -> Barrier.wait b; CCLock.incr res) in + Thread.delay 0.2; + assert_equal 0 (CCLock.get res); + Barrier.activate b; + Thread.join t1; Thread.join t2; + assert_equal 2 (CCLock.get res) +*) + module Queue = struct type 'a t = { q : 'a Queue.t; diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli index fd85e6d7..a88c1113 100644 --- a/src/threads/CCThread.mli +++ b/src/threads/CCThread.mli @@ -24,6 +24,32 @@ module Arr : sig (** [A.join a] joins every thread in [a] *) end +(** {2 Single-Use Barrier} *) + +module Barrier : sig + type t + (** Barrier, used to synchronize threads *) + + val create : unit -> t + (** Create a barrier *) + + val reset : t -> unit + (** Reset to initial (non-triggered) state *) + + val wait : t -> unit + (** [wait b] waits for barrier [b] to be activated by [activate b]. + All threads calling this wait until [activate b] is called. + If [b] is already activated, [wait b] does nothing *) + + val activate : t -> unit + (** [activate b] unblocks all threads that were waiting on [b] *) + + val activated : t -> bool + (** [activated b] returns [true] iff [activate b] was called, and [reset b] + was not called since. In other words, [activated b = true] means + [wait b] will not block. *) +end + (** {2 Blocking Queue} This queue has a limited size. Pushing a value on the queue when it