From b4b01bc2f7a35c93cde067a756cb32f847f16f4d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 31 Aug 2015 22:46:27 +0200 Subject: [PATCH] modify `CCThread`, add tests --- src/threads/CCThread.ml | 34 +++++++++++++++++++++++++++++----- src/threads/CCThread.mli | 2 +- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml index b509d24a..f34d597c 100644 --- a/src/threads/CCThread.ml +++ b/src/threads/CCThread.ml @@ -9,7 +9,7 @@ let spawn f = Thread.create f () let detach f = ignore (Thread.create f ()) -module Array = struct +module Arr = struct let spawn n f = Array.init n (fun i -> Thread.create f i) @@ -18,8 +18,8 @@ end (*$R let l = CCLock.create 0 in - let a = Array.spawn 101 (fun i -> CCLock.update l ((+) i)) in - Array.join a; + let a = Arr.spawn 101 (fun i -> CCLock.update l ((+) i)) in + Arr.join a; let n = Sequence.(1 -- 100 |> fold (+) 0) in assert_equal ~printer:CCInt.to_string n (CCLock.get l) *) @@ -45,7 +45,7 @@ module Queue = struct q let incr_size_ q = assert(q.size < q.capacity); q.size <- q.size + 1 - let decr_size_ q = q.size <- q.size - 1 + let decr_size_ q = assert(q.size > 0); q.size <- q.size - 1 let with_lock_ q f = Mutex.lock q.lock; @@ -65,6 +65,7 @@ module Queue = struct done; assert (q.size < q.capacity); Queue.push x q.q; + (* if there are blocked receivers, awake them *) if q.size = 0 then Condition.signal q.cond; incr_size_ q; ) @@ -76,7 +77,8 @@ module Queue = struct Condition.wait q.cond q.lock done; let x = Queue.take q.q in - if q.size = q.capacity then Condition.signal q.cond; + (* if there are blocked senders, awake them *) + if q.size = q.capacity then Condition.broadcast q.cond; decr_size_ q; x ) @@ -105,6 +107,28 @@ module Queue = struct let x = take q in x :: take_list q (n-1) + (*$R + let lists = [| CCList.(1 -- 100) ; CCList.(101 -- 200); CCList.(201 -- 300) |] in + let q = Queue.create 2 in + let senders = Arr.spawn 3 + (fun i -> + List.iter (Queue.push q) lists.(i) + ) + in + let l = CCLock.create [] in + let receivers = Arr.spawn 3 + (fun _ -> + for i = 1 to 100 do + let x = Queue.take q in + CCLock.update l (fun acc -> x::acc) + done + ) + in + Arr.join senders; Arr.join receivers; + let l = CCLock.get l |> List.sort Pervasives.compare in + assert_equal CCList.(1 -- 300) l + *) + let try_take q = with_lock_ q (fun () -> diff --git a/src/threads/CCThread.mli b/src/threads/CCThread.mli index c7e02743..1ef7b0bc 100644 --- a/src/threads/CCThread.mli +++ b/src/threads/CCThread.mli @@ -15,7 +15,7 @@ val detach : (unit -> 'a) -> unit (** [detach f] is the same as [ignore (spawn f)] *) (** {2 Array of threads} *) -module Array : sig +module Arr : sig val spawn : int -> (int -> 'a) -> t array (** [A.spawn n f] creates an array [res] of length [n], such that [res.(i) = spawn (fun () -> f i)] *)