From 45838d9607827621eadf34d23aec8ddac8cc7762 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 23 Jun 2023 21:36:17 -0400 Subject: [PATCH] fix: in fork-join, start sub-tasks within a handler --- src/fork_join.ml | 4 ++-- src/fut.ml | 4 +++- src/gen/gen.ml | 2 +- src/pool.ml | 8 ++++++-- src/suspend_.mli | 3 ++- src/suspend_types_.ml | 4 +++- 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/fork_join.ml b/src/fork_join.ml index 8f859928..ad7b9ad0 100644 --- a/src/fork_join.ml +++ b/src/fork_join.ml @@ -60,7 +60,7 @@ let both f g : _ * _ = let st = A.make { suspension = None; left = St_none; right = St_none } in let start_tasks ~run () : unit = - run (fun () -> + run ~with_handler:true (fun () -> try let res = f () in set_left_ st (St_some res) @@ -68,7 +68,7 @@ let both f g : _ * _ = let bt = Printexc.get_raw_backtrace () in set_left_ st (St_fail (e, bt))); - run (fun () -> + run ~with_handler:true (fun () -> try let res = g () in set_right_ st (St_some res) diff --git a/src/fut.ml b/src/fut.ml index 668db7ea..e9401cb2 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -368,7 +368,9 @@ let await (fut : 'a t) : 'a = Suspend_types_.handle = (fun ~run k -> on_result fut (function - | Ok _ -> run (fun () -> k (Ok ())) + | Ok _ -> + (* run without handler, we're already in a deep effect *) + run ~with_handler:false (fun () -> k (Ok ())) | Error (exn, bt) -> (* fail continuation immediately *) k (Error (exn, bt)))); diff --git a/src/gen/gen.ml b/src/gen/gen.ml index 3128577f..790b95cd 100644 --- a/src/gen/gen.ml +++ b/src/gen/gen.ml @@ -88,7 +88,7 @@ type _ Effect.t += let[@inline] suspend h = Effect.perform (Suspend h) -let with_suspend ~(run:task -> unit) (f: unit -> unit) : unit = +let with_suspend ~(run:with_handler:bool -> task -> unit) (f: unit -> unit) : unit = let module E = Effect.Deep in (* effect handler *) diff --git a/src/pool.ml b/src/pool.ml index b9f511b1..a6b74dc6 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -54,10 +54,14 @@ let run_direct_ (self : t) (task : task) : unit = (** Run [task]. It will be wrapped with an effect handler to support {!Fut.await}. *) -let run_async (self : t) (task : task) : unit = +let rec run_async (self : t) (task : task) : unit = let task' () = (* run [f()] and handle [suspend] in it *) - Suspend_.with_suspend task ~run:(run_direct_ self) + Suspend_.with_suspend task ~run:(fun ~with_handler task -> + if with_handler then + run_async self task + else + run_direct_ self task) in run_direct_ self task' diff --git a/src/suspend_.mli b/src/suspend_.mli index 5247f597..9dbd3c6a 100644 --- a/src/suspend_.mli +++ b/src/suspend_.mli @@ -11,7 +11,8 @@ val suspend : suspension_handler -> unit and a task runner function. *) -val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit +val with_suspend : + run:(with_handler:bool -> task -> unit) -> (unit -> unit) -> unit (** [with_suspend ~run f] runs [f()] in an environment where [suspend] will work. If [f()] suspends with suspension handler [h], this calls [h ~run k] where [k] is the suspension. diff --git a/src/suspend_types_.ml b/src/suspend_types_.ml index 22fb9eff..df0f47b6 100644 --- a/src/suspend_types_.ml +++ b/src/suspend_types_.ml @@ -8,6 +8,8 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type task = unit -> unit -type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } +type suspension_handler = { + handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit; +} [@@unboxed] (** The handler that knows what to do with the suspended computation *)