From c792d70ac7cccfcc0faf466ff23943b3808f4d6c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 13 Sep 2017 18:39:14 +0200 Subject: [PATCH] assertions and cleanup in `CCPool` --- src/threads/CCPool.ml | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml index 1863e2a8..914461cc 100644 --- a/src/threads/CCPool.ml +++ b/src/threads/CCPool.ml @@ -69,6 +69,8 @@ module Make(P : PARAM) = struct let incr_size_ p = p.cur_size <- p.cur_size + 1 let decr_size_ p = p.cur_size <- p.cur_size - 1 + let incr_idle_ p = p.cur_idle <- p.cur_idle + 1 + let decr_idle_ p = p.cur_idle <- p.cur_idle - 1 (* next thing a thread should do *) type command = @@ -85,8 +87,7 @@ module Make(P : PARAM) = struct assert (pool.cur_size > 0); decr_size_ pool; Die - ) - else if Queue.is_empty pool.jobs then Wait + ) else if Queue.is_empty pool.jobs then Wait else ( let job = Queue.pop pool.jobs in Process job @@ -94,6 +95,8 @@ module Make(P : PARAM) = struct (* Thread: entry point. They seek jobs in the queue *) let rec serve pool = + assert (pool.cur_size <= P.max_size); + assert (pool.cur_size > 0); let cmd = with_lock_ pool get_next_ in run_cmd cmd @@ -101,7 +104,12 @@ module Make(P : PARAM) = struct and run_cmd = function | Die -> () | Wait -> - with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) + with_lock_ pool + (fun p -> + incr_idle_ pool; + Condition.wait p.cond p.mutex; + decr_idle_ pool); + serve pool | Process (Job1 (f, x)) -> begin try ignore (f x) with e -> pool.exn_handler e end; serve pool | Process (Job2 (f, x, y)) -> @@ -116,6 +124,8 @@ module Make(P : PARAM) = struct (* launch the minimum required number of threads *) let () = + if P.min_size < 0 then invalid_arg "CCPool: min_size must be >= 0"; + if P.min_size > P.max_size then invalid_arg "CCPool: min_size must be <= max_size"; for _i = 1 to P.min_size do launch_worker_ pool done (* heuristic criterion for starting a new thread. *) @@ -137,7 +147,7 @@ module Make(P : PARAM) = struct ) else ( (* cannot start thread, push and wait for some worker to pick it up *) Queue.push job pool.jobs; - Condition.signal pool.cond; (* wake up *) + Condition.signal pool.cond; (* wake up some worker, if any *) (* might want to process in the background, if all threads are busy *) if pool.cur_idle = 0 && can_start_thread_ pool then ( incr_size_ pool; @@ -264,7 +274,7 @@ module Make(P : PARAM) = struct let l = List.rev_map (fun i -> Fut.make (fun () -> - Thread.delay 0.05; + Thread.delay 0.01; 1 )) l in let l' = List.map Fut.get l in