mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
fix too early exit in Ws_pool
when shutting down the pool, workers should check `self.active` only when they have no local task, failed to steal tasks, and found the main queue to be empty. Basically we check `self.active` only just before we wait on the condition.
This commit is contained in:
parent
16663651d6
commit
2d306c91b2
1 changed files with 23 additions and 16 deletions
|
|
@ -170,19 +170,15 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
|
||||||
TLS.get k_worker_state := Some w;
|
TLS.get k_worker_state := Some w;
|
||||||
|
|
||||||
let rec main () : unit =
|
let rec main () : unit =
|
||||||
if A.get self.active then (
|
worker_run_self_tasks_ self ~runner w;
|
||||||
worker_run_self_tasks_ self ~runner w;
|
try_steal ()
|
||||||
try_steal ()
|
|
||||||
)
|
|
||||||
and run_task task : unit =
|
and run_task task : unit =
|
||||||
run_task_now_ self ~runner task;
|
run_task_now_ self ~runner task;
|
||||||
main ()
|
main ()
|
||||||
and try_steal () =
|
and try_steal () =
|
||||||
if A.get self.active then (
|
match try_to_steal_work_once_ self w with
|
||||||
match try_to_steal_work_once_ self w with
|
| Some task -> run_task task
|
||||||
| Some task -> run_task task
|
| None -> wait ()
|
||||||
| None -> wait ()
|
|
||||||
)
|
|
||||||
and wait () =
|
and wait () =
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
match Queue.pop self.main_q with
|
match Queue.pop self.main_q with
|
||||||
|
|
@ -191,15 +187,26 @@ let worker_thread_ (self : state) ~(runner : t) (w : worker_state) : unit =
|
||||||
run_task task
|
run_task task
|
||||||
| exception Queue.Empty ->
|
| exception Queue.Empty ->
|
||||||
(* wait here *)
|
(* wait here *)
|
||||||
if A.get self.active then wait_ self;
|
if A.get self.active then (
|
||||||
|
wait_ self;
|
||||||
|
|
||||||
(* see if a task became available *)
|
(* see if a task became available *)
|
||||||
let task = try Some (Queue.pop self.main_q) with Queue.Empty -> None in
|
let task =
|
||||||
Mutex.unlock self.mutex;
|
try Some (Queue.pop self.main_q) with Queue.Empty -> None
|
||||||
|
in
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
|
||||||
(match task with
|
match task with
|
||||||
| Some t -> run_task t
|
| Some t -> run_task t
|
||||||
| None -> try_steal ())
|
| None -> try_steal ()
|
||||||
|
) else
|
||||||
|
(* do nothing more: no task in main queue, and we are shutting
|
||||||
|
down so no new task should arrive.
|
||||||
|
The exception is if another task is creating subtasks
|
||||||
|
that overflow into the main queue, but we can ignore that at
|
||||||
|
the price of slightly decreased performance for the last few
|
||||||
|
tasks *)
|
||||||
|
Mutex.unlock self.mutex
|
||||||
in
|
in
|
||||||
|
|
||||||
(* handle domain-local await *)
|
(* handle domain-local await *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue