mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-11 21:48:46 -05:00
fix pool: only return No_more_tasks when local and global q empty
This commit is contained in:
parent
b0e4fa4563
commit
a36342dce2
1 changed files with 15 additions and 11 deletions
|
|
@ -77,7 +77,9 @@ let[@inline] try_wake_someone_ (self : state) : unit =
|
||||||
Mutex.unlock self.mutex
|
Mutex.unlock self.mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
let schedule_on_w (self : worker_state) task : unit =
|
(** Push into worker's local queue, open to work stealing.
|
||||||
|
precondition: this runs on the worker thread whose state is [self] *)
|
||||||
|
let schedule_on_current_worker (self : worker_state) task : unit =
|
||||||
(* we're on this same pool, schedule in the worker's state. Otherwise
|
(* we're on this same pool, schedule in the worker's state. Otherwise
|
||||||
we might also be on pool A but asking to schedule on pool B,
|
we might also be on pool A but asking to schedule on pool B,
|
||||||
so we have to check that identifiers match. *)
|
so we have to check that identifiers match. *)
|
||||||
|
|
@ -92,7 +94,8 @@ let schedule_on_w (self : worker_state) task : unit =
|
||||||
Mutex.unlock self.st.mutex
|
Mutex.unlock self.st.mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
let schedule_on_main (self : state) task : unit =
|
(** Push into the shared queue of this pool *)
|
||||||
|
let schedule_in_main_queue (self : state) task : unit =
|
||||||
if A.get self.active then (
|
if A.get self.active then (
|
||||||
(* push into the main queue *)
|
(* push into the main queue *)
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
|
|
@ -108,8 +111,8 @@ let schedule_from_w (self : worker_state) (task : WL.task_full) : unit =
|
||||||
match get_current_worker_ () with
|
match get_current_worker_ () with
|
||||||
| Some w when Id.equal self.st.id_ w.st.id_ ->
|
| Some w when Id.equal self.st.id_ w.st.id_ ->
|
||||||
(* use worker from the same pool *)
|
(* use worker from the same pool *)
|
||||||
schedule_on_w w task
|
schedule_on_current_worker w task
|
||||||
| _ -> schedule_on_main self.st task
|
| _ -> schedule_in_main_queue self.st task
|
||||||
|
|
||||||
exception Got_task of WL.task_full
|
exception Got_task of WL.task_full
|
||||||
|
|
||||||
|
|
@ -139,19 +142,19 @@ let[@inline] wait_for_condition_ (self : state) : unit =
|
||||||
if self.n_waiting = 0 then self.n_waiting_nonzero <- false
|
if self.n_waiting = 0 then self.n_waiting_nonzero <- false
|
||||||
|
|
||||||
let rec get_next_task (self : worker_state) : WL.task_full =
|
let rec get_next_task (self : worker_state) : WL.task_full =
|
||||||
if not (A.get self.st.active) then raise WL.No_more_tasks;
|
(* see if we can empty the local queue *)
|
||||||
match WSQ.pop_exn self.q with
|
match WSQ.pop_exn self.q with
|
||||||
| task ->
|
| task ->
|
||||||
try_wake_someone_ self.st;
|
try_wake_someone_ self.st;
|
||||||
task
|
task
|
||||||
| exception WSQ.Empty -> try_steal_from_other_workers_ self
|
| exception WSQ.Empty -> try_to_steal_from_other_workers_ self
|
||||||
|
|
||||||
and try_steal_from_other_workers_ (self : worker_state) =
|
and try_to_steal_from_other_workers_ (self : worker_state) =
|
||||||
match try_to_steal_work_once_ self with
|
match try_to_steal_work_once_ self with
|
||||||
| exception Got_task task -> task
|
| exception Got_task task -> task
|
||||||
| () -> wait_on_worker self
|
| () -> wait_on_main_queue self
|
||||||
|
|
||||||
and wait_on_worker (self : worker_state) : WL.task_full =
|
and wait_on_main_queue (self : worker_state) : WL.task_full =
|
||||||
Mutex.lock self.st.mutex;
|
Mutex.lock self.st.mutex;
|
||||||
match Queue.pop self.st.main_q with
|
match Queue.pop self.st.main_q with
|
||||||
| task ->
|
| task ->
|
||||||
|
|
@ -169,7 +172,7 @@ and wait_on_worker (self : worker_state) : WL.task_full =
|
||||||
task
|
task
|
||||||
| exception Queue.Empty ->
|
| exception Queue.Empty ->
|
||||||
Mutex.unlock self.st.mutex;
|
Mutex.unlock self.st.mutex;
|
||||||
try_steal_from_other_workers_ self
|
try_to_steal_from_other_workers_ self
|
||||||
) else (
|
) else (
|
||||||
(* do nothing more: no task in main queue, and we are shutting
|
(* do nothing more: no task in main queue, and we are shutting
|
||||||
down so no new task should arrive.
|
down so no new task should arrive.
|
||||||
|
|
@ -230,7 +233,8 @@ let shutdown_ ~wait (self : state) : unit =
|
||||||
let as_runner_ (self : state) : t =
|
let as_runner_ (self : state) : t =
|
||||||
Runner.For_runner_implementors.create
|
Runner.For_runner_implementors.create
|
||||||
~shutdown:(fun ~wait () -> shutdown_ self ~wait)
|
~shutdown:(fun ~wait () -> shutdown_ self ~wait)
|
||||||
~run_async:(fun ~fiber f -> schedule_on_main self @@ T_start { fiber; f })
|
~run_async:(fun ~fiber f ->
|
||||||
|
schedule_in_main_queue self @@ T_start { fiber; f })
|
||||||
~size:(fun () -> size_ self)
|
~size:(fun () -> size_ self)
|
||||||
~num_tasks:(fun () -> num_tasks_ self)
|
~num_tasks:(fun () -> num_tasks_ self)
|
||||||
()
|
()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue