diff --git a/src/pool.ml b/src/pool.ml index e4685b9c..590a9586 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -84,26 +84,29 @@ let run_direct_ (self : state) (task : task) : unit = let n_qs = Array.length self.qs in let offset = A.fetch_and_add self.cur_q 1 in - (* blocking push, last resort *) + (* push that forces lock acquisition, last resort *) let[@inline] push_wait f = let q_idx = offset mod Array.length self.qs in let q = self.qs.(q_idx) in TS_queue.push q f in - let old_num_tasks = A.fetch_and_add self.num_tasks 1 in + (try + (* try each queue with a round-robin initial offset *) + for _retry = 1 to 10 do + for i = 0 to n_qs - 1 do + let q_idx = (i + offset) mod Array.length self.qs in + let q = self.qs.(q_idx) in - try - (* try each queue with a round-robin initial offset *) - for _retry = 1 to 10 do - for i = 0 to n_qs - 1 do - let q_idx = (i + offset) mod Array.length self.qs in - let q = self.qs.(q_idx) in - if TS_queue.try_push q task then raise_notrace Exit - done - done; - push_wait task - with Exit -> if old_num_tasks < size_ self then awake_workers_ self + if TS_queue.try_push q task then raise_notrace Exit + done + done; + push_wait task + with Exit -> ()); + + (* successfully pushed, now see if we need to wakeup workers *) + let old_num_tasks = A.fetch_and_add self.num_tasks 1 in + if old_num_tasks < size_ self then awake_workers_ self let rec run_async_ (self : state) (task : task) : unit = let task' () = @@ -157,13 +160,18 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task let num_qs = Array.length self.qs in let (AT_pair (before_task, after_task)) = around_task in - let get_task_without_blocking () : _ option = + (* try to get a task that is already in one of the queues. + @param force_lock if true, we force acquisition of the queue's mutex, + which is slower but always succeeds to get a task if there's one. *) + let get_task_already_in_queues ~force_lock () : _ option = try - for i = 0 to num_qs - 1 do - let q = self.qs.((offset + i) mod num_qs) in - match TS_queue.try_pop ~force_lock:false q with - | Some f -> raise_notrace (Got_task f) - | None -> () + for _retry = 1 to 3 do + for i = 0 to num_qs - 1 do + let q = self.qs.((offset + i) mod num_qs) in + match TS_queue.try_pop ~force_lock q with + | Some f -> raise_notrace (Got_task f) + | None -> () + done done; None with Got_task f -> @@ -171,17 +179,21 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task Some f in - (* last resort: block on condition or raise Closed *) + (* slow path: force locking when trying to get tasks, + and wait on [self.cond] if no task is currently available. *) let pop_blocking () : task = - Mutex.lock self.mutex; - try while A.get self.active do - match get_task_without_blocking () with - | Some t -> - Mutex.unlock self.mutex; - raise_notrace (Got_task t) - | None -> Condition.wait self.cond self.mutex + match get_task_already_in_queues ~force_lock:true () with + | Some t -> raise_notrace (Got_task t) + | None -> + Mutex.lock self.mutex; + (* NOTE: be careful about race conditions: we must only + block if the [shutdown] that sets [active] to [false] + has not broadcast over this condition first. Otherwise + we might miss the signal and wait here forever. *) + if A.get self.active then Condition.wait self.cond self.mutex; + Mutex.unlock self.mutex done; raise Closed with Got_task t -> t @@ -197,11 +209,12 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task after_task runner _ctx in - let run_tasks_already_present () = - (* drain the queues from existing tasks *) + (* drain the queues from existing tasks. If [force_lock=false] + then it is best effort. *) + let run_tasks_already_present ~force_lock () = let continue = ref true in while !continue do - match get_task_without_blocking () with + match get_task_already_in_queues ~force_lock () with | None -> continue := false | Some task -> run_task task done @@ -209,7 +222,7 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task let main_loop () = while A.get self.active do - run_tasks_already_present (); + run_tasks_already_present ~force_lock:false (); (* no task available, block until one comes *) match pop_blocking () with @@ -218,7 +231,7 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task done; (* cleanup *) - run_tasks_already_present () + run_tasks_already_present ~force_lock:true () in try