From 4fd6154b5681823069bf9d7d54e33cbd4732d48d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 19 Jun 2023 17:06:19 -0400 Subject: [PATCH] refactor suspend do not wrap each step in a new handler; using Effects.Deep we only wrap the entrypoint of the task, and subsequent `continue` get scheduled as-is. --- src/fut.ml | 8 +++++--- src/gen/gen.ml | 8 ++++---- src/pool.ml | 24 ++++++++++++------------ src/suspend_.mli | 13 ++++++++----- src/suspend_types_.ml | 6 +++--- 5 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/fut.ml b/src/fut.ml index 7da0d9ad..000060f2 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -366,10 +366,12 @@ let await_exn (fut : 'a t) : 'a = Suspend_.suspend { Suspend_types_.handle = - (fun runner k -> + (fun ~run k -> on_result fut (function - | Ok _ -> runner.run (fun () -> k (Ok ())) - | Error (exn, bt) -> k (Error (exn, bt)))); + | Ok _ -> run (fun () -> k (Ok ())) + | Error (exn, bt) -> + (* fail continuation immediately *) + k (Error (exn, bt)))); }; (* un-suspended: we should have a result! *) get_or_fail_exn fut diff --git a/src/gen/gen.ml b/src/gen/gen.ml index cb65b5d4..3128577f 100644 --- a/src/gen/gen.ml +++ b/src/gen/gen.ml @@ -88,21 +88,21 @@ type _ Effect.t += let[@inline] suspend h = Effect.perform (Suspend h) -let with_suspend ~run (f: unit -> unit) : unit = +let with_suspend ~(run:task -> unit) (f: unit -> unit) : unit = let module E = Effect.Deep in (* effect handler *) let effc - : type e. e Effect.t -> ((e, unit) E.continuation -> unit) option + : type e. e Effect.t -> ((e, _) E.continuation -> _) option = function | Suspend h -> Some (fun k -> - let k' = function + let k': suspension = function | Ok () -> E.continue k () | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt in - h.handle run k' + h.handle ~run k' ) | _ -> None in diff --git a/src/pool.ml b/src/pool.ml index ad07f0a5..b7779bb5 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -26,12 +26,12 @@ let add_global_thread_loop_wrapper f : unit = exception Shutdown -let run (self : t) (f : task) : unit = +let run_direct_ (self : t) (task : task) : unit = let n_qs = Array.length self.qs in let offset = A.fetch_and_add self.cur_q 1 in (* blocking push, last resort *) - let push_wait () = + let[@inline] push_wait f = let q_idx = offset mod Array.length self.qs in let q = self.qs.(q_idx) in Bb_queue.push q f @@ -43,14 +43,21 @@ let run (self : t) (f : task) : unit = for i = 0 to n_qs - 1 do let q_idx = (i + offset) mod Array.length self.qs in let q = self.qs.(q_idx) in - if Bb_queue.try_push q f then raise_notrace Exit + if Bb_queue.try_push q task then raise_notrace Exit done done; - push_wait () + push_wait task with | Exit -> () | Bb_queue.Closed -> raise Shutdown +let run (self : t) (task : task) : unit = + let task' () = + (* run [f()] and handle [suspend] in it *) + Suspend_.with_suspend task ~run:(run_direct_ self) + in + run_direct_ self task' + let[@inline] size self = Array.length self.threads let num_tasks (self : t) : int = @@ -67,11 +74,6 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) let num_qs = Array.length qs in let (AT_pair (before_task, after_task)) = around_task in - (* helper to re-schedule suspended tasks on this same pool *) - let suspend_run_ : Suspend_types_.runner = - { run = (fun f -> run pool (fun () -> ignore (f ()))) } - in - try while A.get active do (* last resort: block on my queue *) @@ -93,9 +95,7 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) in let _ctx = before_task pool in - (try - (* run [task()] and handle [suspend] in it *) - Suspend_.with_suspend ~run:suspend_run_ task + (try task () with e -> let bt = Printexc.get_raw_backtrace () in on_exn e bt); diff --git a/src/suspend_.mli b/src/suspend_.mli index 39d6b9ed..5247f597 100644 --- a/src/suspend_.mli +++ b/src/suspend_.mli @@ -6,10 +6,13 @@ open Suspend_types_ val suspend : suspension_handler -> unit -(** [suspend h] calls [h] with the current continuation [k]. - The suspension handler, [h], can decide to register [k] somewhere, - so it's called later. *) +(** [suspend h] jumps back to the nearest {!with_suspend} + and calls [h.handle] with the current continuation [k] + and a task runner function. +*) -val with_suspend : run:runner -> (unit -> unit) -> unit +val with_suspend : run:(task -> unit) -> (unit -> unit) -> unit (** [with_suspend ~run f] runs [f()] in an environment where [suspend] - will work. It passes [run] to suspension handlers. *) + will work. If [f()] suspends with suspension handler [h], + this calls [h ~run k] where [k] is the suspension. +*) diff --git a/src/suspend_types_.ml b/src/suspend_types_.ml index 765d8fe7..22fb9eff 100644 --- a/src/suspend_types_.ml +++ b/src/suspend_types_.ml @@ -6,8 +6,8 @@ type suspension = (unit, exn * Printexc.raw_backtrace) result -> unit (** A suspended computation *) -type runner = { run: (unit -> unit) -> unit } [@@unboxed] -(** A task runner (typically, {!Pool.t}) *) +type task = unit -> unit -type suspension_handler = { handle: runner -> suspension -> unit } [@@unboxed] +type suspension_handler = { handle: run:(task -> unit) -> suspension -> unit } +[@@unboxed] (** The handler that knows what to do with the suspended computation *)