mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
feat fut: add on_result_ignore
This commit is contained in:
parent
6b81d1fca9
commit
e1105f4a88
2 changed files with 22 additions and 3 deletions
|
|
@ -69,6 +69,18 @@ let on_result (self : _ t) (f : _ waiter) : unit =
|
||||||
in
|
in
|
||||||
ignore (C.try_attach self.st trigger : bool)
|
ignore (C.try_attach self.st trigger : bool)
|
||||||
|
|
||||||
|
let on_result_ignore_cb_ _tr f (self : _ t) =
|
||||||
|
f (Picos.Computation.canceled self.st)
|
||||||
|
|
||||||
|
let on_result_ignore (self : _ t) f : unit =
|
||||||
|
if Picos.Computation.is_running self.st then (
|
||||||
|
let trigger =
|
||||||
|
(Trigger.from_action f self on_result_ignore_cb_ [@alert "-handler"])
|
||||||
|
in
|
||||||
|
ignore (C.try_attach self.st trigger : bool)
|
||||||
|
) else
|
||||||
|
on_result_ignore_cb_ () f self
|
||||||
|
|
||||||
let[@inline] fulfill_idempotent self r =
|
let[@inline] fulfill_idempotent self r =
|
||||||
match r with
|
match r with
|
||||||
| Ok x -> C.return self.st x
|
| Ok x -> C.return self.st x
|
||||||
|
|
@ -296,14 +308,14 @@ let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont
|
||||||
|
|
||||||
(* callback called when a future in [a] is resolved *)
|
(* callback called when a future in [a] is resolved *)
|
||||||
let on_res = function
|
let on_res = function
|
||||||
| Ok _ ->
|
| None ->
|
||||||
let n = A.fetch_and_add missing (-1) in
|
let n = A.fetch_and_add missing (-1) in
|
||||||
if n = 1 then (
|
if n = 1 then (
|
||||||
(* last future, we know they all succeeded, so resolve [fut] *)
|
(* last future, we know they all succeeded, so resolve [fut] *)
|
||||||
let res = aggregate_results peek_or_assert_ cont in
|
let res = aggregate_results peek_or_assert_ cont in
|
||||||
fulfill promise (Ok res)
|
fulfill promise (Ok res)
|
||||||
)
|
)
|
||||||
| Error e_bt ->
|
| Some e_bt ->
|
||||||
(* immediately cancel all other [on_res] *)
|
(* immediately cancel all other [on_res] *)
|
||||||
let n = A.exchange missing 0 in
|
let n = A.exchange missing 0 in
|
||||||
if n > 0 then
|
if n > 0 then
|
||||||
|
|
@ -312,7 +324,7 @@ let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont
|
||||||
fulfill promise (Error e_bt)
|
fulfill promise (Error e_bt)
|
||||||
in
|
in
|
||||||
|
|
||||||
iter (fun fut -> on_result fut on_res) cont;
|
iter (fun fut -> on_result_ignore fut on_res) cont;
|
||||||
fut
|
fut
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,13 @@ val on_result : 'a t -> ('a or_error -> unit) -> unit
|
||||||
when [fut] is set ;
|
when [fut] is set ;
|
||||||
or calls [f] immediately if [fut] is already set. *)
|
or calls [f] immediately if [fut] is already set. *)
|
||||||
|
|
||||||
|
val on_result_ignore : _ t -> (Exn_bt.t option -> unit) -> unit
|
||||||
|
(** [on_result_ignore fut f] registers [f] to be called in the future
|
||||||
|
when [fut] is set;
|
||||||
|
or calls [f] immediately if [fut] is already set.
|
||||||
|
It does not pass the result, only a success/error signal.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
exception Already_fulfilled
|
exception Already_fulfilled
|
||||||
|
|
||||||
val fulfill : 'a promise -> 'a or_error -> unit
|
val fulfill : 'a promise -> 'a or_error -> unit
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue