diff --git a/src/threads/CCThread.ml b/src/threads/CCThread.ml index 753ae97d..a6085269 100644 --- a/src/threads/CCThread.ml +++ b/src/threads/CCThread.ml @@ -66,8 +66,8 @@ module Queue = struct assert (q.size < q.capacity); Queue.push x q.q; (* if there are blocked receivers, awake one of them *) - Condition.signal q.cond; incr_size_ q; + Condition.broadcast q.cond; ) let take q = @@ -78,8 +78,8 @@ module Queue = struct done; let x = Queue.take q.q in (* if there are blocked senders, awake one of them *) - Condition.signal q.cond; decr_size_ q; + Condition.broadcast q.cond; x ) @@ -97,18 +97,65 @@ module Queue = struct assert_equal [1;2;3;4] (List.sort Pervasives.compare (CCLock.get l)) *) - (* TODO: more efficient versions (push or pop several items at once when possible) *) + let push_list q l = + let is_empty_ = function [] -> true | _::_ -> false in + (* push elements until it's not possible *) + let rec push_ q l = match l with + | [] -> l + | _::_ when q.size = q.capacity -> l (* no room remaining *) + | x :: tl -> + Queue.push x q.q; + incr_size_ q; + push_ q tl + in + (* push chunks of [l] in [q] until [l] is empty *) + let rec aux q l = + if not (is_empty_ l) + then + let l = with_lock_ q + (fun () -> + while q.size = q.capacity do + Condition.wait q.cond q.lock + done; + let l = push_ q l in + Condition.broadcast q.cond; + l + ) + in + aux q l + in aux q l - let push_list q l = List.iter (push q) l - - let rec take_list q n = - if n=0 then [] - else - let x = take q in - x :: take_list q (n-1) + let take_list q n = + (* take at most [n] elements of [q] and prepend them to [acc] *) + let rec pop_ acc q n = + if n=0 || Queue.is_empty q.q then acc, n + else ( (* take next element *) + let x = Queue.take q.q in + decr_size_ q; + pop_ (x::acc) q (n-1) + ) + in + (* call [pop_] until [n] elements have been gathered *) + let rec aux acc q n = + if n=0 then List.rev acc + else + let acc, n = with_lock_ q + (fun () -> + while q.size = 0 do + Condition.wait q.cond q.lock + done; + let acc, n = pop_ acc q n in + Condition.broadcast q.cond; + acc, n + ) + in + aux acc q n + in + aux [] q n (*$R - let lists = [| CCList.(1 -- 100) ; CCList.(101 -- 200); CCList.(201 -- 300) |] in + let n = 1000 in + let lists = [| CCList.(1 -- n) ; CCList.(n+1 -- 2*n); CCList.(2*n+1 -- 3*n) |] in let q = Queue.create 2 in let senders = Arr.spawn 3 (fun i -> @@ -117,18 +164,22 @@ module Queue = struct else List.iter (Queue.push q) lists.(i) ) in - let l = CCLock.create [] in + let res = CCLock.create [] in let receivers = Arr.spawn 3 - (fun _ -> - for i = 1 to 100 do - let x = Queue.take q in - CCLock.update l (fun acc -> x::acc) - done + (fun i -> + if i=1 then + let l = Queue.take_list q n in + CCLock.update res (fun acc -> l @ acc) + else + for _j = 1 to n do + let x = Queue.take q in + CCLock.update res (fun acc -> x::acc) + done ) in Arr.join senders; Arr.join receivers; - let l = CCLock.get l |> List.sort Pervasives.compare in - assert_equal CCList.(1 -- 300) l + let l = CCLock.get res |> List.sort Pervasives.compare in + assert_equal CCList.(1 -- 3*n) l *) let try_take q =