diff --git a/benchs/run_benchs.ml b/benchs/run_benchs.ml index a4ab4dce..a7c5c1d1 100644 --- a/benchs/run_benchs.ml +++ b/benchs/run_benchs.ml @@ -1010,7 +1010,7 @@ module Thread = struct ] let fib_pool_ ~size n = - let module P = CCPool.Make(struct let max_size = size end) in + let module P = CCPool.Make(struct let min_size = 0 let max_size = size end) in let open P.Fut.Infix in let rec fib n = if n<=1 then P.Fut.return 1 @@ -1037,7 +1037,7 @@ module Thread = struct ] let bench_sequence ~size n = - let module P = CCPool.Make(struct let max_size = size end) in + let module P = CCPool.Make(struct let min_size = 0 let max_size = size end) in let id_ x = Thread.delay 0.0001; x in let mk_list() = CCList.init n (P.Fut.make1 id_) in let mk_sequence () = diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml index f4aca08e..86ea2bf5 100644 --- a/src/threads/CCPool.ml +++ b/src/threads/CCPool.ml @@ -9,6 +9,9 @@ type +'a state = | Failed of exn module type PARAM = sig + val min_size : int + (** Minimum number of threads in the pool *) + val max_size : int (** Maximum number of threads in the pool *) end @@ -16,7 +19,7 @@ end exception Stopped (*$inject - module P = Make(struct let max_size = 30 end) + module P = Make(struct let min_size = 0 let max_size = 30 end) module Fut = P.Fut open Fut.Infix *) @@ -33,8 +36,10 @@ module Make(P : PARAM) = struct mutable stop : bool; (* indicate that threads should stop *) mutable exn_handler: (exn -> unit); mutex : Mutex.t; + cond : Condition.t; jobs : job Queue.t; (* waiting jobs *) - mutable cur_size : int; (* total number of threads *) + mutable cur_size : int; (* total number of threads *) + mutable cur_idle : int; (* number of idle threads *) } (** Dynamic, growable thread pool *) let nop_ _ = () @@ -43,7 +48,9 @@ module Make(P : PARAM) = struct let pool = { stop = false; exn_handler = nop_; + cond = Condition.create(); cur_size = 0; + cur_idle = 0; jobs = Queue.create (); mutex = Mutex.create (); } @@ -60,20 +67,27 @@ module Make(P : PARAM) = struct Mutex.unlock t.mutex; raise e + let incr_size_ p = p.cur_size <- p.cur_size + 1 + let decr_size_ p = p.cur_size <- p.cur_size - 1 + (* next thing a thread should do *) type command = | Process of job + | Wait (* wait on condition *) | Die (* thread has no work to do *) (* thread: seek what to do next (including dying). Assumes the pool is locked. *) let get_next_ pool = - if pool.stop || Queue.is_empty pool.jobs then ( + if pool.stop + || (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then ( (* die: the thread would be idle otherwise *) assert (pool.cur_size > 0); - pool.cur_size <- pool.cur_size - 1; + decr_size_ pool; Die - ) else ( + ) + else if Queue.is_empty pool.jobs then Wait + else ( let job = Queue.pop pool.jobs in Process job ) @@ -86,6 +100,8 @@ module Make(P : PARAM) = struct (* run a command *) and run_cmd = function | Die -> () + | Wait -> + with_lock_ pool (fun p -> Condition.wait p.cond p.mutex) | Process (Job1 (f, x)) -> begin try ignore (f x) with e -> pool.exn_handler e end; serve pool | Process (Job2 (f, x, y)) -> @@ -95,10 +111,15 @@ module Make(P : PARAM) = struct | Process (Job4 (f, x, y, z, w)) -> begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool - (* heuristic criterion for starting a new thread. *) - let should_start_thread p = p.cur_size < P.max_size + (* create a new worker thread *) + let launch_worker_ pool = ignore (Thread.create serve pool) - let incr_size_ p = p.cur_size <- p.cur_size +1 + (* launch the minimum required number of threads *) + let () = + for _i = 1 to P.min_size do launch_worker_ pool done + + (* heuristic criterion for starting a new thread. *) + let can_start_thread_ p = p.cur_size < P.max_size let run_job job = (* acquire lock and push job in queue, or start thread directly @@ -106,19 +127,21 @@ module Make(P : PARAM) = struct with_lock_ pool (fun pool -> if pool.stop then raise Stopped; - if Queue.is_empty pool.jobs && should_start_thread pool + if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 then ( - pool.cur_size <- pool.cur_size + 1; - (* create the thread now, on [job], as it will not - break order *) + (* create the thread now, on [job], as it will not break order of + jobs. We do not want to wait for the busy threads to do our task + if we are allowed to spawn a new thread. *) + incr_size_ pool; ignore (Thread.create run_cmd (Process job)) ) else ( - assert (pool.cur_size > 0); + (* cannot start thread, push and wait for some worker to pick it up *) Queue.push job pool.jobs; - (* might want to process in the background *) - if should_start_thread pool then ( + Condition.signal pool.cond; (* wake up *) + (* might want to process in the background, if all threads are busy *) + if pool.cur_idle = 0 && can_start_thread_ pool then ( incr_size_ pool; - ignore (Thread.create serve pool); + launch_worker_ pool; ) )) @@ -142,6 +165,9 @@ module Make(P : PARAM) = struct p.stop <- true; Queue.clear p.jobs) + (* stop threads if pool is GC'd *) + let () = Gc.finalise (fun _ -> stop ()) pool + (** {6 Futures} *) module Fut = struct type 'a handler = 'a state -> unit diff --git a/src/threads/CCPool.mli b/src/threads/CCPool.mli index 7f4ff9a8..e00acc5e 100644 --- a/src/threads/CCPool.mli +++ b/src/threads/CCPool.mli @@ -12,6 +12,9 @@ type +'a state = | Failed of exn module type PARAM = sig + val min_size : int + (** Minimum number of threads in the pool *) + val max_size : int (** Maximum number of threads in the pool *) end