diff --git a/src/pool.ml b/src/pool.ml index 43cda564..e8e4c863 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -108,34 +108,55 @@ let worker_thread_ (runner : t) ~on_exn ~around_task (active : bool A.t) let num_qs = Array.length qs in let (AT_pair (before_task, after_task)) = around_task in + let get_task_without_blocking () : _ option = + try + for i = 0 to num_qs - 1 do + let q = qs.((offset + i) mod num_qs) in + match Bb_queue.try_pop ~force_lock:false q with + | Some f -> raise_notrace (Got_task f) + | None -> () + done; + None + with Got_task f -> Some f + in + + (* last resort: block on my queue *) + let[@inline] pop_blocking () = + let my_q = qs.(offset mod num_qs) in + Bb_queue.pop my_q + in + + let run_task task : unit = + let _ctx = before_task runner in + (* run the task now, catching errors *) + (try task () + with e -> + let bt = Printexc.get_raw_backtrace () in + on_exn e bt); + after_task runner _ctx + in + + let run_tasks_already_present () = + (* drain the queues from existing tasks *) + let continue = ref true in + while !continue do + match get_task_without_blocking () with + | None -> continue := false + | Some task -> run_task task + done + in + let main_loop () = while A.get active do - (* last resort: block on my queue *) - let pop_blocking () = - let my_q = qs.(offset mod num_qs) in - Bb_queue.pop my_q - in + run_tasks_already_present (); - let task = - try - for i = 0 to num_qs - 1 do - let q = qs.((offset + i) mod num_qs) in - match Bb_queue.try_pop ~force_lock:false q with - | Some f -> raise_notrace (Got_task f) - | None -> () - done; - pop_blocking () - with Got_task f -> f - in + (* no task available, block until one comes *) + let task = pop_blocking () in + run_task task + done; - let _ctx = before_task runner in - (* run the task now, catching errors *) - (try task () - with e -> - let bt = Printexc.get_raw_backtrace () in - on_exn e bt); - after_task runner _ctx - done + (* cleanup *) + run_tasks_already_present () in try