mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
api(fut): public alias 'a Fut.t = 'a Picos.Computation.t
This commit is contained in:
parent
f0ea8c294d
commit
3a5eaaa44d
2 changed files with 31 additions and 31 deletions
|
|
@ -3,23 +3,23 @@ module C = Picos.Computation
|
||||||
|
|
||||||
type 'a or_error = ('a, Exn_bt.t) result
|
type 'a or_error = ('a, Exn_bt.t) result
|
||||||
type 'a waiter = 'a or_error -> unit
|
type 'a waiter = 'a or_error -> unit
|
||||||
type 'a t = { st: 'a C.t } [@@unboxed]
|
type 'a t = 'a C.t
|
||||||
type 'a promise = 'a t
|
type 'a promise = 'a t
|
||||||
|
|
||||||
let[@inline] make_promise () : _ t =
|
let[@inline] make_promise () : _ t =
|
||||||
let fut = { st = C.create ~mode:`LIFO () } in
|
let fut = C.create ~mode:`LIFO () in
|
||||||
fut
|
fut
|
||||||
|
|
||||||
let make () =
|
let make () =
|
||||||
let fut = make_promise () in
|
let fut = make_promise () in
|
||||||
fut, fut
|
fut, fut
|
||||||
|
|
||||||
let[@inline] return x : _ t = { st = C.returned x }
|
let[@inline] return x : _ t = C.returned x
|
||||||
|
|
||||||
let[@inline] fail exn bt : _ t =
|
let[@inline] fail exn bt : _ t =
|
||||||
let st = C.create () in
|
let fut = C.create () in
|
||||||
C.cancel st exn bt;
|
C.cancel fut exn bt;
|
||||||
{ st }
|
fut
|
||||||
|
|
||||||
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
let[@inline] fail_exn_bt ebt = fail (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
|
|
||||||
|
|
@ -27,32 +27,32 @@ let[@inline] of_result = function
|
||||||
| Ok x -> return x
|
| Ok x -> return x
|
||||||
| Error ebt -> fail_exn_bt ebt
|
| Error ebt -> fail_exn_bt ebt
|
||||||
|
|
||||||
let[@inline] is_resolved self : bool = not (C.is_running self.st)
|
let[@inline] is_resolved self : bool = not (C.is_running self)
|
||||||
let is_done = is_resolved
|
let is_done = is_resolved
|
||||||
let[@inline] peek self : _ option = C.peek self.st
|
let peek : 'a t -> _ option = C.peek
|
||||||
let[@inline] raise_if_failed self : unit = C.check self.st
|
let raise_if_failed : _ t -> unit = C.check
|
||||||
|
|
||||||
let[@inline] is_success self =
|
let[@inline] is_success self =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| _ -> true
|
| _ -> true
|
||||||
| exception _ -> false
|
| exception _ -> false
|
||||||
|
|
||||||
let[@inline] is_failed self = C.is_canceled self.st
|
let is_failed : _ t -> bool = C.is_canceled
|
||||||
|
|
||||||
exception Not_ready
|
exception Not_ready
|
||||||
|
|
||||||
let[@inline] get_or_fail self =
|
let[@inline] get_or_fail self =
|
||||||
match C.peek self.st with
|
match C.peek self with
|
||||||
| Some x -> x
|
| Some x -> x
|
||||||
| None -> raise Not_ready
|
| None -> raise Not_ready
|
||||||
|
|
||||||
let[@inline] get_or_fail_exn self =
|
let[@inline] get_or_fail_exn self =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x
|
| x -> x
|
||||||
| exception C.Running -> raise Not_ready
|
| exception C.Running -> raise Not_ready
|
||||||
|
|
||||||
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
|
let[@inline] peek_or_assert_ (self : 'a t) : 'a =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x
|
| x -> x
|
||||||
| exception C.Running -> assert false
|
| exception C.Running -> assert false
|
||||||
|
|
||||||
|
|
@ -67,32 +67,32 @@ let on_result (self : _ t) (f : _ waiter) : unit =
|
||||||
let trigger =
|
let trigger =
|
||||||
(Trigger.from_action f self on_result_cb_ [@alert "-handler"])
|
(Trigger.from_action f self on_result_cb_ [@alert "-handler"])
|
||||||
in
|
in
|
||||||
if not (C.try_attach self.st trigger) then on_result_cb_ () f self
|
if not (C.try_attach self trigger) then on_result_cb_ () f self
|
||||||
|
|
||||||
let on_result_ignore_cb_ _tr f (self : _ t) =
|
let on_result_ignore_cb_ _tr f (self : _ t) =
|
||||||
f (Picos.Computation.canceled self.st)
|
f (Picos.Computation.canceled self)
|
||||||
|
|
||||||
let on_result_ignore (self : _ t) f : unit =
|
let on_result_ignore (self : _ t) f : unit =
|
||||||
if Picos.Computation.is_running self.st then (
|
if Picos.Computation.is_running self then (
|
||||||
let trigger =
|
let trigger =
|
||||||
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
|
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
|
||||||
in
|
in
|
||||||
if not (C.try_attach self.st trigger) then on_result_ignore_cb_ () f self
|
if not (C.try_attach self trigger) then on_result_ignore_cb_ () f self
|
||||||
) else
|
) else
|
||||||
on_result_ignore_cb_ () f self
|
on_result_ignore_cb_ () f self
|
||||||
|
|
||||||
let[@inline] fulfill_idempotent self r =
|
let[@inline] fulfill_idempotent self r =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.return self.st x
|
| Ok x -> C.return self x
|
||||||
| Error ebt -> C.cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
| Error ebt -> C.cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
|
|
||||||
exception Already_fulfilled
|
exception Already_fulfilled
|
||||||
|
|
||||||
let fulfill (self : _ t) (r : _ result) : unit =
|
let fulfill (self : _ t) (r : _ result) : unit =
|
||||||
let ok =
|
let ok =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.try_return self.st x
|
| Ok x -> C.try_return self x
|
||||||
| Error ebt -> C.try_cancel self.st (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
| Error ebt -> C.try_cancel self (Exn_bt.exn ebt) (Exn_bt.bt ebt)
|
||||||
in
|
in
|
||||||
if not ok then raise Already_fulfilled
|
if not ok then raise Already_fulfilled
|
||||||
|
|
||||||
|
|
@ -104,10 +104,10 @@ let spawn ~on f : _ t =
|
||||||
let task () =
|
let task () =
|
||||||
try
|
try
|
||||||
let res = f () in
|
let res = f () in
|
||||||
C.return fut.st res
|
C.return fut res
|
||||||
with exn ->
|
with exn ->
|
||||||
let bt = Printexc.get_raw_backtrace () in
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
C.cancel fut.st exn bt
|
C.cancel fut exn bt
|
||||||
in
|
in
|
||||||
|
|
||||||
Runner.run_async on task;
|
Runner.run_async on task;
|
||||||
|
|
@ -380,7 +380,7 @@ let for_list ~on l f : unit t =
|
||||||
let push_queue_ _tr q () = Bb_queue.push q ()
|
let push_queue_ _tr q () = Bb_queue.push q ()
|
||||||
|
|
||||||
let wait_block_exn (self : 'a t) : 'a =
|
let wait_block_exn (self : 'a t) : 'a =
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x (* fast path *)
|
| x -> x (* fast path *)
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
let real_block () =
|
let real_block () =
|
||||||
|
|
@ -394,7 +394,7 @@ let wait_block_exn (self : 'a t) : 'a =
|
||||||
assert attached;
|
assert attached;
|
||||||
|
|
||||||
(* blockingly wait for trigger if computation didn't complete in the mean time *)
|
(* blockingly wait for trigger if computation didn't complete in the mean time *)
|
||||||
if C.try_attach self.st trigger then Bb_queue.pop q;
|
if C.try_attach self trigger then Bb_queue.pop q;
|
||||||
|
|
||||||
(* trigger was signaled! computation must be done*)
|
(* trigger was signaled! computation must be done*)
|
||||||
peek_or_assert_ self
|
peek_or_assert_ self
|
||||||
|
|
@ -406,7 +406,7 @@ let wait_block_exn (self : 'a t) : 'a =
|
||||||
if i = 0 then
|
if i = 0 then
|
||||||
real_block ()
|
real_block ()
|
||||||
else (
|
else (
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| x -> x
|
| x -> x
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
Domain_.relax ();
|
Domain_.relax ();
|
||||||
|
|
@ -426,12 +426,12 @@ let wait_block self =
|
||||||
|
|
||||||
let await (self : 'a t) : 'a =
|
let await (self : 'a t) : 'a =
|
||||||
(* fast path: peek *)
|
(* fast path: peek *)
|
||||||
match C.peek_exn self.st with
|
match C.peek_exn self with
|
||||||
| res -> res
|
| res -> res
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
let trigger = Trigger.create () in
|
let trigger = Trigger.create () in
|
||||||
(* suspend until the future is resolved *)
|
(* suspend until the future is resolved *)
|
||||||
if C.try_attach self.st trigger then
|
if C.try_attach self trigger then
|
||||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||||
|
|
||||||
(* un-suspended: we should have a result! *)
|
(* un-suspended: we should have a result! *)
|
||||||
|
|
@ -453,5 +453,5 @@ module Infix_local = Infix [@@deprecated "use Infix"]
|
||||||
|
|
||||||
module Private_ = struct
|
module Private_ = struct
|
||||||
let[@inline] unsafe_promise_of_fut x = x
|
let[@inline] unsafe_promise_of_fut x = x
|
||||||
let[@inline] as_computation self = self.st
|
let[@inline] as_computation self = self
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@
|
||||||
|
|
||||||
type 'a or_error = ('a, Exn_bt.t) result
|
type 'a or_error = ('a, Exn_bt.t) result
|
||||||
|
|
||||||
type 'a t
|
type 'a t = 'a Picos.Computation.t
|
||||||
(** A future with a result of type ['a]. *)
|
(** A future with a result of type ['a]. *)
|
||||||
|
|
||||||
type 'a promise = private 'a t
|
type 'a promise = private 'a t
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue