From f41887c36739dbda4c508eca4bc3539fb59cf254 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 25 Mar 2021 23:09:59 -0400 Subject: [PATCH] refactor(pool): less locking, fix deadlock, more parallelism --- src/threads/CCPool.ml | 103 +++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 42 deletions(-) diff --git a/src/threads/CCPool.ml b/src/threads/CCPool.ml index 2f2ca6f3..eee1dd73 100644 --- a/src/threads/CCPool.ml +++ b/src/threads/CCPool.ml @@ -48,7 +48,7 @@ module Make(P : PARAM) = struct exn_handler = nop_; cond = Condition.create(); cur_size = 0; - cur_idle = 0; + cur_idle = 0; (* invariant: cur_idle <= cur_size *) jobs = Queue.create (); mutex = Mutex.create (); } @@ -79,61 +79,76 @@ module Make(P : PARAM) = struct (* thread: seek what to do next (including dying). Assumes the pool is locked. *) let get_next_ pool = - Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop; + (*Printf.printf "get_next (cur=%d, idle=%d, stop=%B)\n%!" pool.cur_size pool.cur_idle pool.stop;*) if pool.stop then ( - decr_size_ pool; Die ) else ( match Queue.take pool.jobs with | exception Queue.Empty -> if pool.cur_idle > 0 then ( (* die: there's already at least one idle thread *) - Printf.printf "DIE (idle>0)\n%!"; - decr_size_ pool; + (*Printf.printf "DIE (idle>0)\n%!";*) Die ) else ( - incr_idle_ pool; + (*Printf.printf "WAIT\n%!";*) Wait ) | job -> Process job ) + (* heuristic criterion for starting a new thread. *) + let[@inline] can_start_thread_ p = p.cur_size < P.max_size + (* Thread: entry point. They seek jobs in the queue *) let rec serve pool = assert (pool.cur_size <= P.max_size); assert (pool.cur_size > 0); - let cmd = with_lock_ pool get_next_ in - run_cmd cmd + Mutex.lock pool.mutex; + let cmd = get_next_ pool in + maybe_start_runner_ pool; + run_cmd pool cmd (* run a command *) - and run_cmd = function - | Die -> () + and run_cmd pool = function + | Die -> + decr_size_ pool; + Mutex.unlock pool.mutex; + () | Wait -> - Printf.printf "WAIT\n%!"; - with_lock_ pool - (fun p -> - Condition.wait p.cond p.mutex; - decr_idle_ pool); + (*Printf.printf "WAIT\n%!";*) + incr_idle_ pool; + Condition.wait pool.cond pool.mutex; + decr_idle_ pool; + Mutex.unlock pool.mutex; serve pool - | Process (Job1 (f, x)) -> + | Process job -> + Mutex.unlock pool.mutex; + run_job pool job + + (* execute the job *) + and run_job pool job = + match job with + | Job1 (f, x) -> begin try ignore (f x) with e -> pool.exn_handler e end; serve pool - | Process (Job2 (f, x, y)) -> + | Job2 (f, x, y) -> begin try ignore (f x y) with e -> pool.exn_handler e end; serve pool - | Process (Job3 (f, x, y, z)) -> + | Job3 (f, x, y, z) -> begin try ignore (f x y z) with e -> pool.exn_handler e end; serve pool - | Process (Job4 (f, x, y, z, w)) -> + | Job4 (f, x, y, z, w) -> begin try ignore (f x y z w) with e -> pool.exn_handler e end; serve pool - (* create a new worker thread *) - let launch_worker_ pool = - with_lock_ pool - (fun pool -> - incr_size_ pool; - ignore (Thread.create serve pool)) + and maybe_start_runner_ pool : unit = + if not (Queue.is_empty pool.jobs) && can_start_thread_ pool then ( + (* there's room for another thread to start processing jobs, + starting with [Queue.pop pool.jobs] *) + let job' = Queue.pop pool.jobs in + launch_worker_on_ pool job'; + ) - (* heuristic criterion for starting a new thread. *) - let can_start_thread_ p = p.cur_size < P.max_size + and[@inline] launch_worker_on_ pool job = + incr_size_ pool; + ignore (Thread.create (run_job pool) job) let run_job job = (* acquire lock and push job in queue, or start thread directly @@ -142,21 +157,26 @@ module Make(P : PARAM) = struct (fun pool -> if pool.stop then raise Stopped; if Queue.is_empty pool.jobs && can_start_thread_ pool && pool.cur_idle = 0 then ( - (* create the thread now, on [job], as it will not break order of - jobs. We do not want to wait for the busy threads to do our task - if we are allowed to spawn a new thread. *) - incr_size_ pool; - ignore (Thread.create run_cmd (Process job)) - ) else ( - (* cannot start thread, push and wait for some worker to pick it up *) + (* create the thread now, on [job], since no other job in + the queue takes precedence. + We do not want to wait for the busy threads to do our task + if we are allowed to spawn a new thread, and no thread is + just idle waiting for new jobs. *) + launch_worker_on_ pool job; + ) else if pool.cur_idle > 0 then ( + (* at least one idle thread, wake it up *) Queue.push job pool.jobs; - Condition.broadcast pool.cond; (* wake up some worker, if any *) - (* might want to process in the background, if all threads are busy *) - if not (Queue.is_empty pool.jobs) - && pool.cur_idle = 0 - && can_start_thread_ pool then ( - launch_worker_ pool; - ) + Condition.broadcast pool.cond; (* wake up some worker *) + ) else ( + Queue.push job pool.jobs; + + (* we might still be able to start another thread to help the + active ones (none is idle). This thread is not necessarily + going to process [job], but rather the first job in the queue *) + if can_start_thread_ pool then ( + let job' = Queue.pop pool.jobs in + launch_worker_on_ pool job'; + ); )) (* run the function on the argument in the given pool *) @@ -174,7 +194,6 @@ module Make(P : PARAM) = struct (* kill threads in the pool *) let stop () = - Printf.printf "STOP\n%!"; with_lock_ pool (fun p -> p.stop <- true;