From 68fe7221b871f7d4486babcd67c7bc5c7d055944 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 27 Oct 2023 14:47:30 -0400 Subject: [PATCH] suspend: remove additional parameter, always run tasks in handler --- src/fifo_pool.ml | 17 +++-------------- src/fork_join.ml | 6 +++--- src/fut.ml | 4 +--- src/suspend_.ml | 17 ++++++----------- src/suspend_.mli | 7 ++----- 5 files changed, 15 insertions(+), 36 deletions(-) diff --git a/src/fifo_pool.ml b/src/fifo_pool.ml index 920b36ed..044e0013 100644 --- a/src/fifo_pool.ml +++ b/src/fifo_pool.ml @@ -12,20 +12,9 @@ let[@inline] size_ (self : state) = Array.length self.threads let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q (** Run [task] as is, on the pool. *) -let run_direct_ (self : state) (task : task) : unit = +let schedule_ (self : state) (task : task) : unit = try Bb_queue.push self.q task with Bb_queue.Closed -> raise Shutdown -let rec run_async_ (self : state) (task : task) : unit = - let task' () = - (* run [f()] and handle [suspend] in it *) - Suspend_.with_suspend task ~run:(fun ~with_handler task -> - if with_handler then - run_async_ self task - else - run_direct_ self task) - in - run_direct_ self task' - type around_task = AT_pair : (t -> 'a) * (t -> 'a -> unit) -> around_task let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = @@ -34,7 +23,7 @@ let worker_thread_ (self : state) (runner : t) ~on_exn ~around_task : unit = let run_task task : unit = let _ctx = before_task runner in (* run the task now, catching errors *) - (try task () + (try Suspend_.with_suspend task ~run:(fun task' -> schedule_ self task') with e -> let bt = Printexc.get_raw_backtrace () in on_exn e bt); @@ -98,7 +87,7 @@ let create ?(on_init_thread = default_thread_init_exit_) let runner = Runner.For_runner_implementors.create ~shutdown:(fun ~wait () -> shutdown_ pool ~wait) - ~run_async:(fun f -> run_async_ pool f) + ~run_async:(fun f -> schedule_ pool f) ~size:(fun () -> size_ pool) ~num_tasks:(fun () -> num_tasks_ pool) () diff --git a/src/fork_join.ml b/src/fork_join.ml index f1733514..ac5ba5d7 100644 --- a/src/fork_join.ml +++ b/src/fork_join.ml @@ -62,7 +62,7 @@ let both f g : _ * _ = let st = A.make { suspension = None; left = St_none; right = St_none } in let start_tasks ~run () : unit = - run ~with_handler:true (fun () -> + run (fun () -> try let res = f () in set_left_ st (St_some res) @@ -70,7 +70,7 @@ let both f g : _ * _ = let bt = Printexc.get_raw_backtrace () in set_left_ st (St_fail (e, bt))); - run ~with_handler:true (fun () -> + run (fun () -> try let res = g () in set_right_ st (St_some res) @@ -126,7 +126,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit = let len_range = min chunk_size (n - offset) in assert (offset + len_range <= n); - run ~with_handler:true (fun () -> task_for ~offset ~len_range); + run (fun () -> task_for ~offset ~len_range); i := !i + len_range done in diff --git a/src/fut.ml b/src/fut.ml index 0a5332ed..639a503b 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -381,9 +381,7 @@ let await (fut : 'a t) : 'a = Suspend_.handle = (fun ~run k -> on_result fut (function - | Ok _ -> - (* run without handler, we're already in a deep effect *) - run ~with_handler:false (fun () -> k (Ok ())) + | Ok _ -> run (fun () -> k (Ok ())) | Error (exn, bt) -> (* fail continuation immediately *) k (Error (exn, bt)))); diff --git a/src/suspend_.ml b/src/suspend_.ml index 19accc9c..88eacb2c 100644 --- a/src/suspend_.ml +++ b/src/suspend_.ml @@ -3,9 +3,7 @@ module A = Atomic_ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type task = unit -> unit -type suspension_handler = { - handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit; -} +type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } [@@unboxed] [@@@ifge 5.0] @@ -15,8 +13,7 @@ type _ Effect.t += Suspend : suspension_handler -> unit Effect.t let[@inline] suspend h = Effect.perform (Suspend h) -let with_suspend ~(run : with_handler:bool -> task -> unit) (f : unit -> unit) : - unit = +let with_suspend ~(run : task -> unit) (f : unit -> unit) : unit = let module E = Effect.Deep in (* effect handler *) let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option = @@ -37,14 +34,12 @@ let with_suspend ~(run : with_handler:bool -> task -> unit) (f : unit -> unit) : (* DLA interop *) let prepare_for_await () : Dla_.t = (* current state *) - let st : ((with_handler:bool -> task -> unit) * suspension) option A.t = - A.make None - in + let st : ((task -> unit) * suspension) option A.t = A.make None in let release () : unit = match A.exchange st None with | None -> () - | Some (run, k) -> run ~with_handler:true (fun () -> k (Ok ())) + | Some (run, k) -> run (fun () -> k (Ok ())) and await () : unit = suspend { handle = (fun ~run k -> A.set st (Some (run, k))) } in @@ -55,7 +50,7 @@ let prepare_for_await () : Dla_.t = [@@@ocaml.alert "+unstable"] [@@@else_] -let with_suspend ~run:_ f = f () -let prepare_for_await () = { Dla_.release = ignore; await = ignore } +let[@inline] with_suspend ~run:_ f = f () +let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore } [@@@endif] diff --git a/src/suspend_.mli b/src/suspend_.mli index 716e9b8a..77cc06af 100644 --- a/src/suspend_.mli +++ b/src/suspend_.mli @@ -8,9 +8,7 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit type task = unit -> unit -type suspension_handler = { - handle: run:(with_handler:bool -> task -> unit) -> suspension -> unit; -} +type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } [@@unboxed] (** The handler that knows what to do with the suspended computation. @@ -53,8 +51,7 @@ val suspend : suspension_handler -> unit val prepare_for_await : unit -> Dla_.t (** Our stub for DLA. Unstable. *) -val with_suspend : - run:(with_handler:bool -> task -> unit) -> (unit -> unit) -> unit +val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit (** [with_suspend ~run f] runs [f()] in an environment where [suspend] will work. If [f()] suspends with suspension handler [h], this calls [h ~run k] where [k] is the suspension.