feat: add Fut.await and Fut.await_exn for OCaml >= 5.0

this uses suspension (based on `Suspend_`) to wait for completion of the
future.
This commit is contained in:
Simon Cruanes 2023-06-19 15:33:28 -04:00
parent 52a04701ed
commit e26029ab90
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 51 additions and 1 deletions

View file

@ -354,6 +354,32 @@ let wait_block_exn self =
| Ok x -> x
| Error (e, bt) -> Printexc.raise_with_backtrace e bt
let await_exn (fut : 'a t) : 'a =
match peek fut with
| Some res ->
(* fast path: peek *)
(match res with
| Ok x -> x
| Error (exn, bt) -> Printexc.raise_with_backtrace exn bt)
| None ->
(* suspend until the future is resolved *)
Suspend_.suspend
{
Suspend_types_.handle =
(fun runner k ->
on_result fut (function
| Ok _ -> runner.run (fun () -> k (Ok ()))
| Error (exn, bt) -> k (Error (exn, bt))));
};
(* un-suspended: we should have a result! *)
get_or_fail_exn fut
let await fut =
try Ok (await_exn fut)
with exn ->
let bt = Printexc.get_raw_backtrace () in
Error (exn, bt)
module type INFIX = sig
val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t
val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t

View file

@ -145,6 +145,23 @@ val for_list : on:Pool.t -> 'a list -> ('a -> unit) -> unit t
(** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f].
@since 0.2 *)
(** {2 Await}
This is only available on OCaml 5. *)
val await : 'a t -> 'a or_error
(** [await fut] suspends the current tasks until [fut] is fulfilled, then
resumes the task on this same pool.
This must only be run from inside the pool itself.
@since 0.3
{b NOTE}: only on OCaml 5 *)
val await_exn : 'a t -> 'a
(** Same as {!await} but re-raises.
@since 0.3
{b NOTE}: only on OCaml 5
*)
(** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error

View file

@ -67,6 +67,11 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
let num_qs = Array.length qs in
let (AT_pair (before_task, after_task)) = around_task in
(* helper to re-schedule suspended tasks on this same pool *)
let suspend_run_ : Suspend_types_.runner =
{ run = (fun f -> run pool (fun () -> ignore (f ()))) }
in
try
while A.get active do
(* last resort: block on my queue *)
@ -88,7 +93,9 @@ let worker_thread_ pool ~on_exn ~around_task (active : bool A.t)
in
let _ctx = before_task pool in
(try task ()
(try
(* run [task()] and handle [suspend] in it *)
Suspend_.with_suspend ~run:suspend_run_ task
with e ->
let bt = Printexc.get_raw_backtrace () in
on_exn e bt);