From e26029ab9088dd4667484b82669d8851e38b9bf8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 19 Jun 2023 15:33:28 -0400 Subject: [PATCH] feat: add `Fut.await` and `Fut.await_exn` for OCaml >= 5.0 this uses suspension (based on `Suspend_`) to wait for completion of the future. --- src/fut.ml | 26 ++++++++++++++++++++++++++ src/fut.mli | 17 +++++++++++++++++ src/pool.ml | 9 ++++++++- 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/fut.ml b/src/fut.ml index 846b78da..7da0d9ad 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -354,6 +354,32 @@ let wait_block_exn self = | Ok x -> x | Error (e, bt) -> Printexc.raise_with_backtrace e bt +let await_exn (fut : 'a t) : 'a = + match peek fut with + | Some res -> + (* fast path: peek *) + (match res with + | Ok x -> x + | Error (exn, bt) -> Printexc.raise_with_backtrace exn bt) + | None -> + (* suspend until the future is resolved *) + Suspend_.suspend + { + Suspend_types_.handle = + (fun runner k -> + on_result fut (function + | Ok _ -> runner.run (fun () -> k (Ok ())) + | Error (exn, bt) -> k (Error (exn, bt)))); + }; + (* un-suspended: we should have a result! *) + get_or_fail_exn fut + +let await fut = + try Ok (await_exn fut) + with exn -> + let bt = Printexc.get_raw_backtrace () in + Error (exn, bt) + module type INFIX = sig val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t diff --git a/src/fut.mli b/src/fut.mli index 99dcecb3..ac0e787a 100644 --- a/src/fut.mli +++ b/src/fut.mli @@ -145,6 +145,23 @@ val for_list : on:Pool.t -> 'a list -> ('a -> unit) -> unit t (** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f]. @since 0.2 *) +(** {2 Await} + +This is only available on OCaml 5. *) + +val await : 'a t -> 'a or_error +(** [await fut] suspends the current tasks until [fut] is fulfilled, then + resumes the task on this same pool. + This must only be run from inside the pool itself. + @since 0.3 + {b NOTE}: only on OCaml 5 *) + +val await_exn : 'a t -> 'a +(** Same as {!await} but re-raises. + @since 0.3 + {b NOTE}: only on OCaml 5 + *) + (** {2 Blocking} *) val wait_block : 'a t -> 'a or_error diff --git a/src/pool.ml b/src/pool.ml index eb2e9366..ad07f0a5 100644 --- a/src/pool.ml +++ b/src/pool.ml @@ -67,6 +67,11 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) let num_qs = Array.length qs in let (AT_pair (before_task, after_task)) = around_task in + (* helper to re-schedule suspended tasks on this same pool *) + let suspend_run_ : Suspend_types_.runner = + { run = (fun f -> run pool (fun () -> ignore (f ()))) } + in + try while A.get active do (* last resort: block on my queue *) @@ -88,7 +93,9 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t) in let _ctx = before_task pool in - (try task () + (try + (* run [task()] and handle [suspend] in it *) + Suspend_.with_suspend ~run:suspend_run_ task with e -> let bt = Printexc.get_raw_backtrace () in on_exn e bt);