mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
fix too early exit in Ws_pool
when shutting down the pool, workers should check `self.active` only when they have no local task, failed to steal tasks, and found the main queue to be empty. Basically we check `self.active` only just before we wait on the condition.
This commit is contained in:
parent
9513b82bd0
commit
a540c091e6
1 changed files with 23 additions and 16 deletions
|
|
@ -170,19 +170,15 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
|
|||
TLS.get k_worker_state := Some w;
|
||||
|
||||
let rec main () : unit =
|
||||
if A.get self.active then (
|
||||
worker_run_self_tasks_ self ~runner w;
|
||||
try_steal ()
|
||||
)
|
||||
and run_task task : unit =
|
||||
run_task_now_ self ~runner task;
|
||||
main ()
|
||||
and try_steal () =
|
||||
if A.get self.active then (
|
||||
match try_to_steal_work_once_ self w with
|
||||
| Some task -> run_task task
|
||||
| None -> wait ()
|
||||
)
|
||||
and wait () =
|
||||
Mutex.lock self.mutex;
|
||||
match Queue.pop self.main_q with
|
||||
|
|
@ -191,15 +187,26 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
|
|||
run_task task
|
||||
| exception Queue.Empty ->
|
||||
(* wait here *)
|
||||
if A.get self.active then wait_ self;
|
||||
if A.get self.active then (
|
||||
wait_ self;
|
||||
|
||||
(* see if a task became available *)
|
||||
let task = try Some (Queue.pop self.main_q) with Queue.Empty -> None in
|
||||
let task =
|
||||
try Some (Queue.pop self.main_q) with Queue.Empty -> None
|
||||
in
|
||||
Mutex.unlock self.mutex;
|
||||
|
||||
(match task with
|
||||
match task with
|
||||
| Some t -> run_task t
|
||||
| None -> try_steal ())
|
||||
| None -> try_steal ()
|
||||
) else
|
||||
(* do nothing more: no task in main queue, and we are shutting
|
||||
down so no new task should arrive.
|
||||
The exception is if another task is creating subtasks
|
||||
that overflow into the main queue, but we can ignore that at
|
||||
the price of slightly decreased performance for the last few
|
||||
tasks *)
|
||||
Mutex.unlock self.mutex
|
||||
in
|
||||
|
||||
(* handle domain-local await *)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue