rename Fut.await_exn to Fut.await

This commit is contained in:
Simon Cruanes 2023-06-19 21:23:54 -04:00
parent 68d3487ca8
commit 1d23d2d7a1
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
7 changed files with 15 additions and 24 deletions

View file

@ -354,7 +354,7 @@ let wait_block_exn self =
| Ok x -> x | Ok x -> x
| Error (e, bt) -> Printexc.raise_with_backtrace e bt | Error (e, bt) -> Printexc.raise_with_backtrace e bt
let await_exn (fut : 'a t) : 'a = let await (fut : 'a t) : 'a =
match peek fut with match peek fut with
| Some res -> | Some res ->
(* fast path: peek *) (* fast path: peek *)
@ -376,12 +376,6 @@ let await_exn (fut : 'a t) : 'a =
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
get_or_fail_exn fut get_or_fail_exn fut
let await fut =
try Ok (await_exn fut)
with exn ->
let bt = Printexc.get_raw_backtrace () in
Error (exn, bt)
module type INFIX = sig module type INFIX = sig
val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t
val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t

View file

@ -149,19 +149,13 @@ val for_list : on:Pool.t -> 'a list -> ('a -> unit) -> unit t
This is only available on OCaml 5. *) This is only available on OCaml 5. *)
val await : 'a t -> 'a or_error val await : 'a t -> 'a
(** [await fut] suspends the current tasks until [fut] is fulfilled, then (** [await fut] suspends the current tasks until [fut] is fulfilled, then
resumes the task on this same pool. resumes the task on this same pool.
This must only be run from inside the pool itself. This must only be run from inside the pool itself.
@since 0.3 @since 0.3
{b NOTE}: only on OCaml 5 *) {b NOTE}: only on OCaml 5 *)
val await_exn : 'a t -> 'a
(** Same as {!await} but re-raises.
@since 0.3
{b NOTE}: only on OCaml 5
*)
(** {2 Blocking} *) (** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error val wait_block : 'a t -> 'a or_error

View file

@ -26,6 +26,7 @@ let add_global_thread_loop_wrapper f : unit =
exception Shutdown exception Shutdown
(** Run [task] as is, on the pool. *)
let run_direct_ (self : t) (task : task) : unit = let run_direct_ (self : t) (task : task) : unit =
let n_qs = Array.length self.qs in let n_qs = Array.length self.qs in
let offset = A.fetch_and_add self.cur_q 1 in let offset = A.fetch_and_add self.cur_q 1 in
@ -51,6 +52,8 @@ let run_direct_ (self : t) (task : task) : unit =
| Exit -> () | Exit -> ()
| Bb_queue.Closed -> raise Shutdown | Bb_queue.Closed -> raise Shutdown
(** Run [task]. It will be wrapped with an effect handler to
support {!Fut.await}. *)
let run (self : t) (task : task) : unit = let run (self : t) (task : task) : unit =
let task' () = let task' () =
(* run [f()] and handle [suspend] in it *) (* run [f()] and handle [suspend] in it *)

View file

@ -13,7 +13,7 @@ let fib ~on x : int Fut.t =
else ( else (
let t1 = Fut.spawn ~on (fun () -> fib_rec (x - 1)) let t1 = Fut.spawn ~on (fun () -> fib_rec (x - 1))
and t2 = Fut.spawn ~on (fun () -> fib_rec (x - 2)) in and t2 = Fut.spawn ~on (fun () -> fib_rec (x - 2)) in
Fut.await_exn t1 + Fut.await_exn t2 Fut.await t1 + Fut.await t2
) )
in in
Fut.spawn ~on (fun () -> fib_rec x) Fut.spawn ~on (fun () -> fib_rec x)

View file

@ -4,7 +4,7 @@ let pool = Pool.create ~min:4 ()
let () = let () =
let fut = Array.init 10 (fun i -> Fut.spawn ~on:pool (fun () -> i)) in let fut = Array.init 10 (fun i -> Fut.spawn ~on:pool (fun () -> i)) in
let fut2 = Fut.spawn ~on:pool (fun () -> Array.map Fut.await_exn fut) in let fut2 = Fut.spawn ~on:pool (fun () -> Array.map Fut.await fut) in
assert (Fut.wait_block fut2 = Ok (Array.init 10 (fun x -> x))) assert (Fut.wait_block fut2 = Ok (Array.init 10 (fun x -> x)))
let () = let () =
@ -16,7 +16,7 @@ let () =
else else
raise Not_found)) raise Not_found))
in in
let fut2 = Fut.spawn ~on:pool (fun () -> Array.map Fut.await_exn fut) in let fut2 = Fut.spawn ~on:pool (fun () -> Array.map Fut.await fut) in
(* must fail *) (* must fail *)
assert (Fut.wait_block fut2 |> Result.is_error) assert (Fut.wait_block fut2 |> Result.is_error)
@ -28,7 +28,7 @@ let mk_ret_delay ?(on = pool) n x =
let () = let () =
let f1 = mk_ret_delay 0.01 1 in let f1 = mk_ret_delay 0.01 1 in
let f2 = mk_ret_delay 0.01 2 in let f2 = mk_ret_delay 0.01 2 in
let fut = Fut.spawn ~on:pool (fun () -> Fut.await_exn f1, Fut.await_exn f2) in let fut = Fut.spawn ~on:pool (fun () -> Fut.await f1, Fut.await f2) in
assert (Fut.wait_block_exn fut = (1, 2)) assert (Fut.wait_block_exn fut = (1, 2))
let () = let () =
@ -38,7 +38,7 @@ let () =
Thread.delay 0.01; Thread.delay 0.01;
1) 1)
in in
Fut.spawn ~on:pool (fun () -> Fut.await_exn f + 1) Fut.spawn ~on:pool (fun () -> Fut.await f + 1)
and f2 = and f2 =
let f = let f =
Fut.spawn ~on:pool (fun () -> Fut.spawn ~on:pool (fun () ->
@ -47,7 +47,7 @@ let () =
in in
Fut.spawn ~on:pool (fun () -> Fut.spawn ~on:pool (fun () ->
Thread.delay 0.01; Thread.delay 0.01;
Fut.await_exn f * 2) Fut.await f * 2)
in in
let fut = Fut.both f1 f2 in let fut = Fut.both f1 f2 in
assert (Fut.wait_block fut = Ok (2, 20)) assert (Fut.wait_block fut = Ok (2, 20))

View file

@ -6,13 +6,13 @@ let run () =
let t1 = Unix.gettimeofday () in let t1 = Unix.gettimeofday () in
let n = 1_000_000 in let n = 1_000_000 in
let n_tasks = 10 in let n_tasks = 3 in
let task () = let task () =
let l = List.init n (fun x -> Fut.spawn ~on:pool (fun () -> x)) in let l = List.init n (fun x -> Fut.spawn ~on:pool (fun () -> x)) in
Fut.spawn ~on:pool (fun () -> Fut.spawn ~on:pool (fun () ->
List.fold_left List.fold_left
(fun n x -> (fun n x ->
let _res = Fut.await_exn x in let _res = Fut.await x in
n + 1) n + 1)
0 l) 0 l)
in in

View file

@ -36,8 +36,8 @@ let rec mk_train n ic : _ Chan.t =
let run () = let run () =
let start = Unix.gettimeofday () in let start = Unix.gettimeofday () in
let n_trains = 5 in let n_trains = 4 in
let len_train = 100 in let len_train = 80 in
let n_events = 1_000 in let n_events = 1_000 in
let range = 5 in let range = 5 in