mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-07 11:45:39 -05:00
refactor suspend
do not wrap each step in a new handler; using Effects.Deep we only wrap the entrypoint of the task, and subsequent `continue` get scheduled as-is.
This commit is contained in:
parent
f98bcf2f08
commit
4fd6154b56
5 changed files with 32 additions and 27 deletions
|
|
@ -366,10 +366,12 @@ let await_exn (fut : 'a t) : 'a =
|
||||||
Suspend_.suspend
|
Suspend_.suspend
|
||||||
{
|
{
|
||||||
Suspend_types_.handle =
|
Suspend_types_.handle =
|
||||||
(fun runner k ->
|
(fun ~run k ->
|
||||||
on_result fut (function
|
on_result fut (function
|
||||||
| Ok _ -> runner.run (fun () -> k (Ok ()))
|
| Ok _ -> run (fun () -> k (Ok ()))
|
||||||
| Error (exn, bt) -> k (Error (exn, bt))));
|
| Error (exn, bt) ->
|
||||||
|
(* fail continuation immediately *)
|
||||||
|
k (Error (exn, bt))));
|
||||||
};
|
};
|
||||||
(* un-suspended: we should have a result! *)
|
(* un-suspended: we should have a result! *)
|
||||||
get_or_fail_exn fut
|
get_or_fail_exn fut
|
||||||
|
|
|
||||||
|
|
@ -88,21 +88,21 @@ type _ Effect.t +=
|
||||||
|
|
||||||
let[@inline] suspend h = Effect.perform (Suspend h)
|
let[@inline] suspend h = Effect.perform (Suspend h)
|
||||||
|
|
||||||
let with_suspend ~run (f: unit -> unit) : unit =
|
let with_suspend ~(run:task -> unit) (f: unit -> unit) : unit =
|
||||||
let module E = Effect.Deep in
|
let module E = Effect.Deep in
|
||||||
|
|
||||||
(* effect handler *)
|
(* effect handler *)
|
||||||
let effc
|
let effc
|
||||||
: type e. e Effect.t -> ((e, unit) E.continuation -> unit) option
|
: type e. e Effect.t -> ((e, _) E.continuation -> _) option
|
||||||
= function
|
= function
|
||||||
| Suspend h ->
|
| Suspend h ->
|
||||||
Some (fun k ->
|
Some (fun k ->
|
||||||
let k' = function
|
let k': suspension = function
|
||||||
| Ok () -> E.continue k ()
|
| Ok () -> E.continue k ()
|
||||||
| Error (exn, bt) ->
|
| Error (exn, bt) ->
|
||||||
E.discontinue_with_backtrace k exn bt
|
E.discontinue_with_backtrace k exn bt
|
||||||
in
|
in
|
||||||
h.handle run k'
|
h.handle ~run k'
|
||||||
)
|
)
|
||||||
| _ -> None
|
| _ -> None
|
||||||
in
|
in
|
||||||
|
|
|
||||||
24
src/pool.ml
24
src/pool.ml
|
|
@ -26,12 +26,12 @@ let add_global_thread_loop_wrapper f : unit =
|
||||||
|
|
||||||
exception Shutdown
|
exception Shutdown
|
||||||
|
|
||||||
let run (self : t) (f : task) : unit =
|
let run_direct_ (self : t) (task : task) : unit =
|
||||||
let n_qs = Array.length self.qs in
|
let n_qs = Array.length self.qs in
|
||||||
let offset = A.fetch_and_add self.cur_q 1 in
|
let offset = A.fetch_and_add self.cur_q 1 in
|
||||||
|
|
||||||
(* blocking push, last resort *)
|
(* blocking push, last resort *)
|
||||||
let push_wait () =
|
let[@inline] push_wait f =
|
||||||
let q_idx = offset mod Array.length self.qs in
|
let q_idx = offset mod Array.length self.qs in
|
||||||
let q = self.qs.(q_idx) in
|
let q = self.qs.(q_idx) in
|
||||||
Bb_queue.push q f
|
Bb_queue.push q f
|
||||||
|
|
@ -43,14 +43,21 @@ let run (self : t) (f : task) : unit =
|
||||||
for i = 0 to n_qs - 1 do
|
for i = 0 to n_qs - 1 do
|
||||||
let q_idx = (i + offset) mod Array.length self.qs in
|
let q_idx = (i + offset) mod Array.length self.qs in
|
||||||
let q = self.qs.(q_idx) in
|
let q = self.qs.(q_idx) in
|
||||||
if Bb_queue.try_push q f then raise_notrace Exit
|
if Bb_queue.try_push q task then raise_notrace Exit
|
||||||
done
|
done
|
||||||
done;
|
done;
|
||||||
push_wait ()
|
push_wait task
|
||||||
with
|
with
|
||||||
| Exit -> ()
|
| Exit -> ()
|
||||||
| Bb_queue.Closed -> raise Shutdown
|
| Bb_queue.Closed -> raise Shutdown
|
||||||
|
|
||||||
|
let run (self : t) (task : task) : unit =
|
||||||
|
let task' () =
|
||||||
|
(* run [f()] and handle [suspend] in it *)
|
||||||
|
Suspend_.with_suspend task ~run:(run_direct_ self)
|
||||||
|
in
|
||||||
|
run_direct_ self task'
|
||||||
|
|
||||||
let[@inline] size self = Array.length self.threads
|
let[@inline] size self = Array.length self.threads
|
||||||
|
|
||||||
let num_tasks (self : t) : int =
|
let num_tasks (self : t) : int =
|
||||||
|
|
@ -67,11 +74,6 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
|
||||||
let num_qs = Array.length qs in
|
let num_qs = Array.length qs in
|
||||||
let (AT_pair (before_task, after_task)) = around_task in
|
let (AT_pair (before_task, after_task)) = around_task in
|
||||||
|
|
||||||
(* helper to re-schedule suspended tasks on this same pool *)
|
|
||||||
let suspend_run_ : Suspend_types_.runner =
|
|
||||||
{ run = (fun f -> run pool (fun () -> ignore (f ()))) }
|
|
||||||
in
|
|
||||||
|
|
||||||
try
|
try
|
||||||
while A.get active do
|
while A.get active do
|
||||||
(* last resort: block on my queue *)
|
(* last resort: block on my queue *)
|
||||||
|
|
@ -93,9 +95,7 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
|
||||||
in
|
in
|
||||||
|
|
||||||
let _ctx = before_task pool in
|
let _ctx = before_task pool in
|
||||||
(try
|
(try task ()
|
||||||
(* run [task()] and handle [suspend] in it *)
|
|
||||||
Suspend_.with_suspend ~run:suspend_run_ task
|
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
on_exn e bt);
|
on_exn e bt);
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,13 @@
|
||||||
open Suspend_types_
|
open Suspend_types_
|
||||||
|
|
||||||
val suspend : suspension_handler -> unit
|
val suspend : suspension_handler -> unit
|
||||||
(** [suspend h] calls [h] with the current continuation [k].
|
(** [suspend h] jumps back to the nearest {!with_suspend}
|
||||||
The suspension handler, [h], can decide to register [k] somewhere,
|
and calls [h.handle] with the current continuation [k]
|
||||||
so it's called later. *)
|
and a task runner function.
|
||||||
|
*)
|
||||||
|
|
||||||
val with_suspend : run:runner -> (unit -> unit) -> unit
|
val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit
|
||||||
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
||||||
will work. It passes [run] to suspension handlers. *)
|
will work. If [f()] suspends with suspension handler [h],
|
||||||
|
this calls [h ~run k] where [k] is the suspension.
|
||||||
|
*)
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,8 @@
|
||||||
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
||||||
(** A suspended computation *)
|
(** A suspended computation *)
|
||||||
|
|
||||||
type runner = { run: (unit -> unit) -> unit } [@@unboxed]
|
type task = unit -> unit
|
||||||
(** A task runner (typically, {!Pool.t}) *)
|
|
||||||
|
|
||||||
type suspension_handler = { handle: runner -> suspension -> unit } [@@unboxed]
|
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit }
|
||||||
|
[@@unboxed]
|
||||||
(** The handler that knows what to do with the suspended computation *)
|
(** The handler that knows what to do with the suspended computation *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue