mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-10 13:14:05 -05:00
feat fut: wrap picos computations
This commit is contained in:
parent
5cdda79098
commit
b9c3e1fc7e
1 changed files with 54 additions and 54 deletions
108
src/core/fut.ml
108
src/core/fut.ml
|
|
@ -33,9 +33,9 @@ let[@inline] peek self : _ option = C.peek self.st
|
|||
let[@inline] raise_if_failed self : unit = C.check self.st
|
||||
|
||||
let[@inline] is_success self =
|
||||
match C.peek self.st with
|
||||
| Some (Ok _) -> true
|
||||
| _ -> false
|
||||
match C.peek_exn self.st with
|
||||
| _ -> true
|
||||
| exception _ -> false
|
||||
|
||||
let[@inline] is_failed self = C.is_canceled self.st
|
||||
|
||||
|
|
@ -47,24 +47,21 @@ let[@inline] get_or_fail self =
|
|||
| None -> raise Not_ready
|
||||
|
||||
let[@inline] get_or_fail_exn self =
|
||||
match C.peek self.st with
|
||||
| Some (Ok x) -> x
|
||||
| Some (Error ebt) -> Exn_bt.raise ebt
|
||||
| None -> raise Not_ready
|
||||
match C.peek_exn self.st with
|
||||
| x -> x
|
||||
| exception C.Running -> raise Not_ready
|
||||
|
||||
let[@inline] peek_ok_assert_ (self : 'a t) : 'a =
|
||||
if C.is_running self.st then assert false;
|
||||
(* cannot block *)
|
||||
C.await self.st
|
||||
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
|
||||
match C.peek_exn self.st with
|
||||
| x -> x
|
||||
| exception C.Running -> assert false
|
||||
|
||||
let on_result_cb_ _tr f self : unit =
|
||||
let res =
|
||||
try Ok (peek_ok_assert_ self)
|
||||
with exn ->
|
||||
let ebt = Exn_bt.get exn in
|
||||
Error ebt
|
||||
in
|
||||
f res
|
||||
match peek_or_assert_ self with
|
||||
| x -> f (Ok x)
|
||||
| exception exn ->
|
||||
let ebt = Exn_bt.get exn in
|
||||
f (Error ebt)
|
||||
|
||||
let on_result (self : _ t) (f : _ waiter) : unit =
|
||||
let trigger =
|
||||
|
|
@ -303,7 +300,7 @@ let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont
|
|||
let n = A.fetch_and_add missing (-1) in
|
||||
if n = 1 then (
|
||||
(* last future, we know they all succeeded, so resolve [fut] *)
|
||||
let res = aggregate_results peek_ok_assert_ cont in
|
||||
let res = aggregate_results peek_or_assert_ cont in
|
||||
fulfill promise (Ok res)
|
||||
)
|
||||
| Error e_bt ->
|
||||
|
|
@ -369,62 +366,65 @@ let for_list ~on l f : unit t =
|
|||
|
||||
(* ### blocking ### *)
|
||||
|
||||
(* TODO: use a trigger directly? *)
|
||||
let wait_block (self : 'a t) : 'a or_error =
|
||||
match A.get self.st with
|
||||
| Done x -> x (* fast path *)
|
||||
| Waiting _ ->
|
||||
let push_queue_ _tr q () = Bb_queue.push q ()
|
||||
|
||||
let wait_block_exn (self : 'a t) : 'a =
|
||||
match C.peek_exn self.st with
|
||||
| x -> x (* fast path *)
|
||||
| exception C.Running ->
|
||||
let real_block () =
|
||||
(* use queue only once *)
|
||||
let q = Bb_queue.create () in
|
||||
on_result self (fun r -> Bb_queue.push q r);
|
||||
Bb_queue.pop q
|
||||
|
||||
let trigger = Trigger.create () in
|
||||
let attached =
|
||||
(Trigger.on_signal trigger q () push_queue_ [@alert "-handler"])
|
||||
in
|
||||
assert attached;
|
||||
|
||||
(* blockingly wait for trigger if computation didn't complete in the mean time *)
|
||||
if C.try_attach self.st trigger then Bb_queue.pop q;
|
||||
|
||||
(* trigger was signaled! computation must be done*)
|
||||
peek_or_assert_ self
|
||||
in
|
||||
|
||||
(* TODO: use backoff? *)
|
||||
(* a bit of spinning before we block *)
|
||||
let rec loop i =
|
||||
if i = 0 then
|
||||
real_block ()
|
||||
else (
|
||||
match A.get self.st with
|
||||
| Done x -> x
|
||||
| Waiting _ ->
|
||||
match C.peek_exn self.st with
|
||||
| x -> x
|
||||
| exception C.Running ->
|
||||
Domain_.relax ();
|
||||
(loop [@tailcall]) (i - 1)
|
||||
)
|
||||
in
|
||||
loop 50
|
||||
|
||||
let wait_block_exn self =
|
||||
match wait_block self with
|
||||
| Ok x -> x
|
||||
| Error { exn; bt } -> Printexc.raise_with_backtrace exn bt
|
||||
let wait_block self =
|
||||
match wait_block_exn self with
|
||||
| x -> Ok x
|
||||
| exception exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Error { Exn_bt.exn; bt }
|
||||
|
||||
[@@@ifge 5.0]
|
||||
|
||||
let await (fut : 'a t) : 'a =
|
||||
match peek fut with
|
||||
| Some res ->
|
||||
(* fast path: peek *)
|
||||
(match res with
|
||||
| Ok x -> x
|
||||
| Error { exn; bt } -> Printexc.raise_with_backtrace exn bt)
|
||||
| None ->
|
||||
let await (self : 'a t) : 'a =
|
||||
(* fast path: peek *)
|
||||
match C.peek_exn self.st with
|
||||
| res -> res
|
||||
| exception C.Running ->
|
||||
let trigger = Trigger.create () in
|
||||
(* suspend until the future is resolved *)
|
||||
Suspend_.suspend
|
||||
{
|
||||
Suspend_.handle =
|
||||
(fun ~run:_ ~resume k ->
|
||||
on_result fut (function
|
||||
| Ok _ ->
|
||||
(* schedule continuation with the same name *)
|
||||
resume k (Ok ())
|
||||
| Error ebt ->
|
||||
(* fail continuation immediately *)
|
||||
resume k (Error ebt)));
|
||||
};
|
||||
if C.try_attach self.st trigger then
|
||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||
|
||||
(* un-suspended: we should have a result! *)
|
||||
get_or_fail_exn fut
|
||||
get_or_fail_exn self
|
||||
|
||||
[@@@endif]
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue