From 2d306c91b22eeff39a1be1afeb93ea30c8336093 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 24 Nov 2023 23:12:22 -0500 Subject: [PATCH] fix too early exit in Ws_pool when shutting down the pool, workers should check `self.active` only when they have no local task, failed to steal tasks, and found the main queue to be empty. Basically we check `self.active` only just before we wait on the condition. --- src/ws_pool.ml | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/ws_pool.ml b/src/ws_pool.ml index d32c71f8..8683040d 100644 --- a/src/ws_pool.ml +++ b/src/ws_pool.ml @@ -170,19 +170,15 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit = TLS.get k_worker_state := Some w; let rec main () : unit = - if A.get self.active then ( - worker_run_self_tasks_ self ~runner w; - try_steal () - ) + worker_run_self_tasks_ self ~runner w; + try_steal () and run_task task : unit = run_task_now_ self ~runner task; main () and try_steal () = - if A.get self.active then ( - match try_to_steal_work_once_ self w with - | Some task -> run_task task - | None -> wait () - ) + match try_to_steal_work_once_ self w with + | Some task -> run_task task + | None -> wait () and wait () = Mutex.lock self.mutex; match Queue.pop self.main_q with @@ -191,15 +187,26 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit = run_task task | exception Queue.Empty -> (* wait here *) - if A.get self.active then wait_ self; + if A.get self.active then ( + wait_ self; - (* see if a task became available *) - let task = try Some (Queue.pop self.main_q) with Queue.Empty -> None in - Mutex.unlock self.mutex; + (* see if a task became available *) + let task = + try Some (Queue.pop self.main_q) with Queue.Empty -> None + in + Mutex.unlock self.mutex; - (match task with - | Some t -> run_task t - | None -> try_steal ()) + match task with + | Some t -> run_task t + | None -> try_steal () + ) else + (* do nothing more: no task in main queue, and we are shutting + down so no new task should arrive. + The exception is if another task is creating subtasks + that overflow into the main queue, but we can ignore that at + the price of slightly decreased performance for the last few + tasks *) + Mutex.unlock self.mutex in (* handle domain-local await *)