mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-07 11:45:39 -05:00
fix: in fork-join, start sub-tasks within a handler
This commit is contained in:
parent
3b9f56a138
commit
45838d9607
6 changed files with 17 additions and 8 deletions
|
|
@ -60,7 +60,7 @@ let both f g : _ * _ =
|
||||||
let st = A.make { suspension = None; left = St_none; right = St_none } in
|
let st = A.make { suspension = None; left = St_none; right = St_none } in
|
||||||
|
|
||||||
let start_tasks ~run () : unit =
|
let start_tasks ~run () : unit =
|
||||||
run (fun () ->
|
run ~with_handler:true (fun () ->
|
||||||
try
|
try
|
||||||
let res = f () in
|
let res = f () in
|
||||||
set_left_ st (St_some res)
|
set_left_ st (St_some res)
|
||||||
|
|
@ -68,7 +68,7 @@ let both f g : _ * _ =
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
set_left_ st (St_fail (e, bt)));
|
set_left_ st (St_fail (e, bt)));
|
||||||
|
|
||||||
run (fun () ->
|
run ~with_handler:true (fun () ->
|
||||||
try
|
try
|
||||||
let res = g () in
|
let res = g () in
|
||||||
set_right_ st (St_some res)
|
set_right_ st (St_some res)
|
||||||
|
|
|
||||||
|
|
@ -368,7 +368,9 @@ let await (fut : 'a t) : 'a =
|
||||||
Suspend_types_.handle =
|
Suspend_types_.handle =
|
||||||
(fun ~run k ->
|
(fun ~run k ->
|
||||||
on_result fut (function
|
on_result fut (function
|
||||||
| Ok _ -> run (fun () -> k (Ok ()))
|
| Ok _ ->
|
||||||
|
(* run without handler, we're already in a deep effect *)
|
||||||
|
run ~with_handler:false (fun () -> k (Ok ()))
|
||||||
| Error (exn, bt) ->
|
| Error (exn, bt) ->
|
||||||
(* fail continuation immediately *)
|
(* fail continuation immediately *)
|
||||||
k (Error (exn, bt))));
|
k (Error (exn, bt))));
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ type _ Effect.t +=
|
||||||
|
|
||||||
let[@inline] suspend h = Effect.perform (Suspend h)
|
let[@inline] suspend h = Effect.perform (Suspend h)
|
||||||
|
|
||||||
let with_suspend ~(run:task -> unit) (f: unit -> unit) : unit =
|
let with_suspend ~(run:with_handler:bool -> task -> unit) (f: unit -> unit) : unit =
|
||||||
let module E = Effect.Deep in
|
let module E = Effect.Deep in
|
||||||
|
|
||||||
(* effect handler *)
|
(* effect handler *)
|
||||||
|
|
|
||||||
|
|
@ -54,10 +54,14 @@ let run_direct_ (self : t) (task : task) : unit =
|
||||||
|
|
||||||
(** Run [task]. It will be wrapped with an effect handler to
|
(** Run [task]. It will be wrapped with an effect handler to
|
||||||
support {!Fut.await}. *)
|
support {!Fut.await}. *)
|
||||||
let run_async (self : t) (task : task) : unit =
|
let rec run_async (self : t) (task : task) : unit =
|
||||||
let task' () =
|
let task' () =
|
||||||
(* run [f()] and handle [suspend] in it *)
|
(* run [f()] and handle [suspend] in it *)
|
||||||
Suspend_.with_suspend task ~run:(run_direct_ self)
|
Suspend_.with_suspend task ~run:(fun ~with_handler task ->
|
||||||
|
if with_handler then
|
||||||
|
run_async self task
|
||||||
|
else
|
||||||
|
run_direct_ self task)
|
||||||
in
|
in
|
||||||
run_direct_ self task'
|
run_direct_ self task'
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,8 @@ val suspend : suspension_handler -> unit
|
||||||
and a task runner function.
|
and a task runner function.
|
||||||
*)
|
*)
|
||||||
|
|
||||||
val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit
|
val with_suspend :
|
||||||
|
run:(with_handler:bool -> task -> unit) -> (unit -> unit) -> unit
|
||||||
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
(** [with_suspend ~run f] runs [f()] in an environment where [suspend]
|
||||||
will work. If [f()] suspends with suspension handler [h],
|
will work. If [f()] suspends with suspension handler [h],
|
||||||
this calls [h ~run k] where [k] is the suspension.
|
this calls [h ~run k] where [k] is the suspension.
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,8 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit
|
||||||
|
|
||||||
type task = unit -> unit
|
type task = unit -> unit
|
||||||
|
|
||||||
type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit }
|
type suspension_handler = {
|
||||||
|
handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit;
|
||||||
|
}
|
||||||
[@@unboxed]
|
[@@unboxed]
|
||||||
(** The handler that knows what to do with the suspended computation *)
|
(** The handler that knows what to do with the suspended computation *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue