fix pool: on shutdown, finish reading from all queues

This commit is contained in:
Simon Cruanes 2023-10-24 09:51:40 -04:00
parent b8a31b088f
commit faeb95b49d
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -108,15 +108,7 @@ let worker_thread_ (runner : t) ~on_exn ~around_task (active : bool A.t)
let num_qs = Array.length qs in
let (AT_pair (before_task, after_task)) = around_task in
let main_loop () =
while A.get active do
(* last resort: block on my queue *)
let pop_blocking () =
let my_q = qs.(offset mod num_qs) in
Bb_queue.pop my_q
in
let task =
let get_task_without_blocking () : _ option =
try
for i = 0 to num_qs - 1 do
let q = qs.((offset + i) mod num_qs) in
@ -124,10 +116,17 @@ let worker_thread_ (runner : t) ~on_exn ~around_task (active : bool A.t)
| Some f -> raise_notrace (Got_task f)
| None -> ()
done;
pop_blocking ()
with Got_task f -> f
None
with Got_task f -> Some f
in
(* last resort: block on my queue *)
let[@inline] pop_blocking () =
let my_q = qs.(offset mod num_qs) in
Bb_queue.pop my_q
in
let run_task task : unit =
let _ctx = before_task runner in
(* run the task now, catching errors *)
(try task ()
@ -135,9 +134,31 @@ let worker_thread_ (runner : t) ~on_exn ~around_task (active : bool A.t)
let bt = Printexc.get_raw_backtrace () in
on_exn e bt);
after_task runner _ctx
in
let run_tasks_already_present () =
(* drain the queues from existing tasks *)
let continue = ref true in
while !continue do
match get_task_without_blocking () with
| None -> continue := false
| Some task -> run_task task
done
in
let main_loop () =
while A.get active do
run_tasks_already_present ();
(* no task available, block until one comes *)
let task = pop_blocking () in
run_task task
done;
(* cleanup *)
run_tasks_already_present ()
in
try
(* handle domain-local await *)
Dla_.using ~prepare_for_await ~while_running:main_loop