wip: feat(pool): keep one idle thread

see #360; in combination with max_size=1 it means the pool contains
exactly one thread.
This commit is contained in:
Simon Cruanes 2021-03-24 18:44:41 -04:00
parent 8982f87ca7
commit 40c05cc7e3

View file

@ -79,17 +79,24 @@ module Make(P : PARAM) = struct
(* thread: seek what to do next (including dying). (* thread: seek what to do next (including dying).
Assumes the pool is locked. *) Assumes the pool is locked. *)
let get_next_ pool = let get_next_ pool =
(*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*) Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;
if pool.stop || (Queue.is_empty pool.jobs && pool.cur_size > 0) then ( if pool.stop then (
(* die: the thread would be idle otherwise *)
(*Printf.printf "time… to die (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*)
decr_size_ pool; decr_size_ pool;
Die Die
) else if Queue.is_empty pool.jobs then (
Wait
) else ( ) else (
let job = Queue.pop pool.jobs in match Queue.take pool.jobs with
Process job | exception Queue.Empty ->
if pool.cur_idle > 0 then (
(* die: there's already at least one idle thread *)
Printf.printf "DIE (idle>0)\n%!";
decr_size_ pool;
Die
) else (
incr_idle_ pool;
Wait
)
| job ->
Process job
) )
(* Thread: entry point. They seek jobs in the queue *) (* Thread: entry point. They seek jobs in the queue *)
@ -103,9 +110,9 @@ module Make(P : PARAM) = struct
and run_cmd = function and run_cmd = function
| Die -> () | Die -> ()
| Wait -> | Wait ->
Printf.printf "WAIT\n%!";
with_lock_ pool with_lock_ pool
(fun p -> (fun p ->
incr_idle_ pool;
Condition.wait p.cond p.mutex; Condition.wait p.cond p.mutex;
decr_idle_ pool); decr_idle_ pool);
serve pool serve pool
@ -167,10 +174,13 @@ module Make(P : PARAM) = struct
(* kill threads in the pool *) (* kill threads in the pool *)
let stop () = let stop () =
Printf.printf "STOP\n%!";
with_lock_ pool with_lock_ pool
(fun p -> (fun p ->
p.stop <- true; p.stop <- true;
Queue.clear p.jobs) Queue.clear p.jobs;
Condition.broadcast p.cond; (* wait up idlers *)
)
(* stop threads if pool is GC'd *) (* stop threads if pool is GC'd *)
let () = Gc.finalise (fun _ -> stop ()) pool let () = Gc.finalise (fun _ -> stop ()) pool