introduce a min_size in CCPool, to keep threads alive

- in benchs, keep `min_size=0` for resource management (need to spawn
  lots of threads)
This commit is contained in:
Simon Cruanes 2016-01-26 02:14:35 +01:00
parent f95825a2e5
commit 663a3cd693
3 changed files with 47 additions and 18 deletions

View file

@ -1010,7 +1010,7 @@ module Thread = struct
] ]
let fib_pool_ ~size n = let fib_pool_ ~size n =
let module P = CCPool.Make(struct let max_size = size end) in let module P = CCPool.Make(struct let min_size = 0 let max_size = size end) in
let open P.Fut.Infix in let open P.Fut.Infix in
let rec fib n = let rec fib n =
if n<=1 then P.Fut.return 1 if n<=1 then P.Fut.return 1
@ -1037,7 +1037,7 @@ module Thread = struct
] ]
let bench_sequence ~size n = let bench_sequence ~size n =
let module P = CCPool.Make(struct let max_size = size end) in let module P = CCPool.Make(struct let min_size = 0 let max_size = size end) in
let id_ x = Thread.delay 0.0001; x in let id_ x = Thread.delay 0.0001; x in
let mk_list() = CCList.init n (P.Fut.make1 id_) in let mk_list() = CCList.init n (P.Fut.make1 id_) in
let mk_sequence () = let mk_sequence () =

View file

@ -9,6 +9,9 @@ type +'a state =
| Failed of exn | Failed of exn
module type PARAM = sig module type PARAM = sig
val min_size : int
(** Minimum number of threads in the pool *)
val max_size : int val max_size : int
(** Maximum number of threads in the pool *) (** Maximum number of threads in the pool *)
end end
@ -16,7 +19,7 @@ end
exception Stopped exception Stopped
(*$inject (*$inject
module P = Make(struct let max_size = 30 end) module P = Make(struct let min_size = 0 let max_size = 30 end)
module Fut = P.Fut module Fut = P.Fut
open Fut.Infix open Fut.Infix
*) *)
@ -33,8 +36,10 @@ module Make(P : PARAM) = struct
mutable stop : bool; (* indicate that threads should stop *) mutable stop : bool; (* indicate that threads should stop *)
mutable exn_handler: (exn -> unit); mutable exn_handler: (exn -> unit);
mutex : Mutex.t; mutex : Mutex.t;
cond : Condition.t;
jobs : job Queue.t; (* waiting jobs *) jobs : job Queue.t; (* waiting jobs *)
mutable cur_size : int; (* total number of threads *) mutable cur_size : int; (* total number of threads *)
mutable cur_idle : int; (* number of idle threads *)
} (** Dynamic, growable thread pool *) } (** Dynamic, growable thread pool *)
let nop_ _ = () let nop_ _ = ()
@ -43,7 +48,9 @@ module Make(P : PARAM) = struct
let pool = { let pool = {
stop = false; stop = false;
exn_handler = nop_; exn_handler = nop_;
cond = Condition.create();
cur_size = 0; cur_size = 0;
cur_idle = 0;
jobs = Queue.create (); jobs = Queue.create ();
mutex = Mutex.create (); mutex = Mutex.create ();
} }
@ -60,20 +67,27 @@ module Make(P : PARAM) = struct
Mutex.unlock t.mutex; Mutex.unlock t.mutex;
raise e raise e
let incr_size_ p = p.cur_size <- p.cur_size + 1
let decr_size_ p = p.cur_size <- p.cur_size - 1
(* next thing a thread should do *) (* next thing a thread should do *)
type command = type command =
| Process of job | Process of job
| Wait (* wait on condition *)
| Die (* thread has no work to do *) | Die (* thread has no work to do *)
(* thread: seek what to do next (including dying). (* thread: seek what to do next (including dying).
Assumes the pool is locked. *) Assumes the pool is locked. *)
let get_next_ pool = let get_next_ pool =
if pool.stop || Queue.is_empty pool.jobs then ( if pool.stop
|| (Queue.is_empty pool.jobs && pool.cur_size > P.min_size) then (
(* die: the thread would be idle otherwise *) (* die: the thread would be idle otherwise *)
assert (pool.cur_size > 0); assert (pool.cur_size > 0);
pool.cur_size <- pool.cur_size - 1; decr_size_ pool;
Die Die
) else ( )
else if Queue.is_empty pool.jobs then Wait
else (
let job = Queue.pop pool.jobs in let job = Queue.pop pool.jobs in
Process job Process job
) )
@ -86,6 +100,8 @@ module Make(P : PARAM) = struct
(* run a command *) (* run a command *)
and run_cmd = function and run_cmd = function
| Die -> () | Die -> ()
| Wait ->
with_lock_ pool (fun p -> Condition.wait p.cond p.mutex)
| Process (Job1 (f, x)) -> | Process (Job1 (f, x)) ->
begin try ignore (f x) with e -> pool.exn_handler e end; serve pool begin try ignore (f x) with e -> pool.exn_handler e end; serve pool
| Process (Job2 (f, x, y)) -> | Process (Job2 (f, x, y)) ->
@ -95,10 +111,15 @@ module Make(P : PARAM) = struct
| Process (Job4 (f, x, y, z, w)) -> | Process (Job4 (f, x, y, z, w)) ->
begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool
(* heuristic criterion for starting a new thread. *) (* create a new worker thread *)
let should_start_thread p = p.cur_size < P.max_size let launch_worker_ pool = ignore (Thread.create serve pool)
let incr_size_ p = p.cur_size <- p.cur_size +1 (* launch the minimum required number of threads *)
let () =
for _i = 1 to P.min_size do launch_worker_ pool done
(* heuristic criterion for starting a new thread. *)
let can_start_thread_ p = p.cur_size < P.max_size
let run_job job = let run_job job =
(* acquire lock and push job in queue, or start thread directly (* acquire lock and push job in queue, or start thread directly
@ -106,19 +127,21 @@ module Make(P : PARAM) = struct
with_lock_ pool with_lock_ pool
(fun pool -> (fun pool ->
if pool.stop then raise Stopped; if pool.stop then raise Stopped;
if Queue.is_empty pool.jobs && should_start_thread pool if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0
then ( then (
pool.cur_size <- pool.cur_size + 1; (* create the thread now, on [job], as it will not break order of
(* create the thread now, on [job], as it will not jobs. We do not want to wait for the busy threads to do our task
break order *) if we are allowed to spawn a new thread. *)
incr_size_ pool;
ignore (Thread.create run_cmd (Process job)) ignore (Thread.create run_cmd (Process job))
) else ( ) else (
assert (pool.cur_size > 0); (* cannot start thread, push and wait for some worker to pick it up *)
Queue.push job pool.jobs; Queue.push job pool.jobs;
(* might want to process in the background *) Condition.signal pool.cond; (* wake up *)
if should_start_thread pool then ( (* might want to process in the background, if all threads are busy *)
if pool.cur_idle = 0 && can_start_thread_ pool then (
incr_size_ pool; incr_size_ pool;
ignore (Thread.create serve pool); launch_worker_ pool;
) )
)) ))
@ -142,6 +165,9 @@ module Make(P : PARAM) = struct
p.stop <- true; p.stop <- true;
Queue.clear p.jobs) Queue.clear p.jobs)
(* stop threads if pool is GC'd *)
let () = Gc.finalise (fun _ -> stop ()) pool
(** {6 Futures} *) (** {6 Futures} *)
module Fut = struct module Fut = struct
type 'a handler = 'a state -> unit type 'a handler = 'a state -> unit

View file

@ -12,6 +12,9 @@ type +'a state =
| Failed of exn | Failed of exn
module type PARAM = sig module type PARAM = sig
val min_size : int
(** Minimum number of threads in the pool *)
val max_size : int val max_size : int
(** Maximum number of threads in the pool *) (** Maximum number of threads in the pool *)
end end