mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-11 05:28:33 -05:00
fix pool: suspension handler might run from a different thread!
this means we can't reuse the same worker state, it's neither thread safe nor deadlock-safe (the worker whose state it is might be waiting on the main queue)
This commit is contained in:
parent
1e3629bc67
commit
30035fa67d
2 changed files with 32 additions and 28 deletions
36
src/pool.ml
36
src/pool.ml
|
|
@ -75,21 +75,24 @@ let run_direct_ (self : state) (w : worker_state option) (task : task) : unit =
|
||||||
raise Shutdown
|
raise Shutdown
|
||||||
|
|
||||||
let run_async_ (self : state) (task : task) : unit =
|
let run_async_ (self : state) (task : task) : unit =
|
||||||
(* stay on current worker if possible *)
|
(* run [task] inside a suspension handler *)
|
||||||
let w = find_current_worker_ self in
|
let rec run_async_in_suspend_rec_ (task : task) =
|
||||||
|
|
||||||
let rec run_async_rec_ (task : task) =
|
|
||||||
let task_with_suspend_ () =
|
let task_with_suspend_ () =
|
||||||
(* run [f()] and handle [suspend] in it *)
|
(* run [f()] and handle [suspend] in it *)
|
||||||
Suspend_.with_suspend task ~run:(fun ~with_handler task' ->
|
Suspend_.with_suspend task ~run:(fun ~with_handler task' ->
|
||||||
if with_handler then
|
if with_handler then
|
||||||
run_async_rec_ task'
|
run_async_in_suspend_rec_ task'
|
||||||
else
|
else (
|
||||||
run_direct_ self w task')
|
let w = find_current_worker_ self in
|
||||||
|
run_direct_ self w task'
|
||||||
|
))
|
||||||
in
|
in
|
||||||
|
|
||||||
|
(* schedule on current worker, if run from a worker *)
|
||||||
|
let w = find_current_worker_ self in
|
||||||
run_direct_ self w task_with_suspend_
|
run_direct_ self w task_with_suspend_
|
||||||
in
|
in
|
||||||
run_async_rec_ task
|
run_async_in_suspend_rec_ task
|
||||||
|
|
||||||
let run = run_async
|
let run = run_async
|
||||||
|
|
||||||
|
|
@ -100,6 +103,7 @@ type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task
|
||||||
(** How many times in a row do we try to do work-stealing? *)
|
(** How many times in a row do we try to do work-stealing? *)
|
||||||
let steal_attempt_max_retry = 3
|
let steal_attempt_max_retry = 3
|
||||||
|
|
||||||
|
(** Main loop for a worker thread. *)
|
||||||
let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn
|
let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn
|
||||||
~around_task : unit =
|
~around_task : unit =
|
||||||
let (AT_pair (before_task, after_task)) = around_task in
|
let (AT_pair (before_task, after_task)) = around_task in
|
||||||
|
|
@ -143,22 +147,6 @@ let worker_thread_ (self : state) (runner : t) (w : worker_state) ~on_exn
|
||||||
WSQ.steal w'.q
|
WSQ.steal w'.q
|
||||||
in
|
in
|
||||||
|
|
||||||
(*
|
|
||||||
try
|
|
||||||
for _retry = 1 to 1 do
|
|
||||||
for i = 0 to Array.length self.workers - 1 do
|
|
||||||
let w' = self.workers.(i) in
|
|
||||||
if w != w' then (
|
|
||||||
match WSQ.steal w'.q with
|
|
||||||
| None -> ()
|
|
||||||
| Some task -> raise_notrace (Got_task task)
|
|
||||||
)
|
|
||||||
done
|
|
||||||
done;
|
|
||||||
None
|
|
||||||
with Got_task task -> Some task
|
|
||||||
*)
|
|
||||||
|
|
||||||
(* try to steal work multiple times *)
|
(* try to steal work multiple times *)
|
||||||
let try_to_steal_work_loop () : bool =
|
let try_to_steal_work_loop () : bool =
|
||||||
if size_ self = 1 then
|
if size_ self = 1 then
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,7 @@ open Moonpool
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
let run () =
|
let run ~pool () =
|
||||||
let@ pool = Pool.with_ ~min:4 () in
|
|
||||||
let t1 = Unix.gettimeofday () in
|
let t1 = Unix.gettimeofday () in
|
||||||
|
|
||||||
let n = 200_000 in
|
let n = 200_000 in
|
||||||
|
|
@ -15,7 +14,7 @@ let run () =
|
||||||
Fut.spawn ~on:pool (fun () ->
|
Fut.spawn ~on:pool (fun () ->
|
||||||
List.fold_left
|
List.fold_left
|
||||||
(fun n x ->
|
(fun n x ->
|
||||||
let _res = Fut.await x in
|
let _res = Sys.opaque_identity (Fut.await x) in
|
||||||
n + 1)
|
n + 1)
|
||||||
0 l)
|
0 l)
|
||||||
in
|
in
|
||||||
|
|
@ -29,6 +28,23 @@ let run () =
|
||||||
Printf.printf "in %.4fs\n%!" (Unix.gettimeofday () -. t1);
|
Printf.printf "in %.4fs\n%!" (Unix.gettimeofday () -. t1);
|
||||||
assert (List.for_all (fun s -> s = n) lens)
|
assert (List.for_all (fun s -> s = n) lens)
|
||||||
|
|
||||||
let () = run ()
|
let () =
|
||||||
|
(print_endline "with fifo";
|
||||||
|
let@ pool = Fifo_pool.with_ ~min:4 () in
|
||||||
|
run ~pool ());
|
||||||
|
|
||||||
|
(print_endline "with WS(1)";
|
||||||
|
let@ pool = Pool.with_ ~min:1 () in
|
||||||
|
run ~pool ());
|
||||||
|
|
||||||
|
(print_endline "with WS(2)";
|
||||||
|
let@ pool = Pool.with_ ~min:2 () in
|
||||||
|
run ~pool ());
|
||||||
|
|
||||||
|
(print_endline "with WS(4)";
|
||||||
|
let@ pool = Pool.with_ ~min:4 () in
|
||||||
|
run ~pool ());
|
||||||
|
|
||||||
|
()
|
||||||
|
|
||||||
[@@@endif]
|
[@@@endif]
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue