From 40c05cc7e3ffe60e8c717f465c12c05a20f8a5b1 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 24 Mar 2021 18:44:41 -0400 Subject: [PATCH] wip: feat(pool): keep one idle thread see #360; in combination with max_size=1 it means the pool contains exactly one thread. --- src/threads/CCPool.ml | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml index 59dffa92..2f2ca6f3 100644 --- a/src/threads/CCPool.ml +++ b/src/threads/CCPool.ml @@ -79,17 +79,24 @@ module Make(P : PARAM) = struct (* thread: seek what to do next (including dying). Assumes the pool is locked. *) let get_next_ pool = - (*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*) - if pool.stop || (Queue.is_empty pool.jobs && pool.cur_size > 0) then ( - (* die: the thread would be idle otherwise *) - (*Printf.printf "time… to die (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*) + Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop; + if pool.stop then ( decr_size_ pool; Die - ) else if Queue.is_empty pool.jobs then ( - Wait ) else ( - let job = Queue.pop pool.jobs in - Process job + match Queue.take pool.jobs with + | exception Queue.Empty -> + if pool.cur_idle > 0 then ( + (* die: there's already at least one idle thread *) + Printf.printf "DIE (idle>0)\n%!"; + decr_size_ pool; + Die + ) else ( + incr_idle_ pool; + Wait + ) + | job -> + Process job ) (* Thread: entry point. They seek jobs in the queue *) @@ -103,9 +110,9 @@ module Make(P : PARAM) = struct and run_cmd = function | Die -> () | Wait -> + Printf.printf "WAIT\n%!"; with_lock_ pool (fun p -> - incr_idle_ pool; Condition.wait p.cond p.mutex; decr_idle_ pool); serve pool @@ -167,10 +174,13 @@ module Make(P : PARAM) = struct (* kill threads in the pool *) let stop () = + Printf.printf "STOP\n%!"; with_lock_ pool (fun p -> p.stop <- true; - Queue.clear p.jobs) + Queue.clear p.jobs; + Condition.broadcast p.cond; (* wait up idlers *) + ) (* stop threads if pool is GC'd *) let () = Gc.finalise (fun _ -> stop ()) pool