wip: use picos computations

This commit is contained in:
Simon Cruanes 2024-08-26 14:25:35 -04:00
parent 465919ae34
commit f93248a867
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 78 additions and 88 deletions

View file

@ -4,8 +4,10 @@
@since 0.6 *) @since 0.6 *)
include module type of Picos_exn_bt
(** An exception bundled with a backtrace *) (** An exception bundled with a backtrace *)
include module type of struct
include Picos_exn_bt
end
val exn : t -> exn val exn : t -> exn
val bt : t -> Printexc.raw_backtrace val bt : t -> Printexc.raw_backtrace

View file

@ -1,118 +1,105 @@
module A = Atomic_ module A = Atomic_
module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
type 'a waiter = 'a or_error -> unit type 'a waiter = 'a or_error -> unit
type 'a t = { st: 'a C.t } [@@unboxed]
type 'a state =
| Done of 'a or_error
| Waiting of { waiters: 'a waiter list }
type 'a t = { st: 'a state A.t } [@@unboxed]
type 'a promise = 'a t type 'a promise = 'a t
let[@inline] make_ () : _ t =
let fut = { st = C.create ~mode:`LIFO () } in
fut
let make () = let make () =
let fut = { st = A.make (Waiting { waiters = [] }) } in let fut = make_ () in
fut, fut fut, fut
let[@inline] of_result x : _ t = { st = A.make (Done x) } let[@inline] return x : _ t = { st = C.returned x }
let[@inline] return x : _ t = of_result (Ok x)
let[@inline] fail exn bt : _ t = of_result (Error { Exn_bt.exn; bt })
let[@inline] fail_exn_bt ebt = of_result (Error ebt)
let[@inline] is_resolved self : bool = let[@inline] fail_exn_bt ebt =
match A.get self.st with let st = C.create () in
| Done _ -> true C.cancel st ebt;
| Waiting _ -> false { st }
let[@inline] peek self : _ option = let[@inline] fail exn bt : _ t = fail_exn_bt { Exn_bt.exn; bt }
match A.get self.st with
| Done x -> Some x
| Waiting _ -> None
let[@inline] raise_if_failed self : unit = let[@inline] of_result = function
match A.get self.st with | Ok x -> return x
| Done (Error ebt) -> Exn_bt.raise ebt | Error ebt -> fail_exn_bt ebt
| _ -> ()
let[@inline] is_done self : bool = let[@inline] is_resolved self : bool = not (C.is_running self.st)
match A.get self.st with let is_done = is_resolved
| Done _ -> true let[@inline] peek self : _ option = C.peek self.st
| Waiting _ -> false let[@inline] raise_if_failed self : unit = C.check self.st
let[@inline] is_success self = let[@inline] is_success self =
match A.get self.st with match C.peek self.st with
| Done (Ok _) -> true | Some (Ok _) -> true
| _ -> false | _ -> false
let[@inline] is_failed self = let[@inline] is_failed self = C.is_canceled self.st
match A.get self.st with
| Done (Error _) -> true
| _ -> false
exception Not_ready exception Not_ready
let[@inline] get_or_fail self = let[@inline] get_or_fail self =
match A.get self.st with match C.peek self.st with
| Done x -> x | Some x -> x
| Waiting _ -> raise Not_ready | None -> raise Not_ready
let[@inline] get_or_fail_exn self = let[@inline] get_or_fail_exn self =
match A.get self.st with match C.peek self.st with
| Done (Ok x) -> x | Some (Ok x) -> x
| Done (Error { exn; bt }) -> Printexc.raise_with_backtrace exn bt | Some (Error ebt) -> Exn_bt.raise ebt
| Waiting _ -> raise Not_ready | None -> raise Not_ready
let[@inline] peek_ok_assert_ (self : 'a t) : 'a =
if C.is_running self.st then assert false;
(* cannot block *)
C.await self.st
let on_result_cb_ _tr f self : unit =
let res =
try Ok (peek_ok_assert_ self)
with exn ->
let ebt = Exn_bt.get exn in
Error ebt
in
f res
let on_result (self : _ t) (f : _ waiter) : unit = let on_result (self : _ t) (f : _ waiter) : unit =
while let trigger =
let st = A.get self.st in (Trigger.from_action f self on_result_cb_ [@alert "-handler"])
match st with in
| Done x -> ignore (C.try_attach self.st trigger : bool)
f x;
false let[@inline] fulfill_idempotent self r =
| Waiting { waiters = l } -> match r with
not (A.compare_and_set self.st st (Waiting { waiters = f :: l })) | Ok x -> C.return self.st x
do | Error ebt -> C.cancel self.st ebt
Domain_.relax ()
done
exception Already_fulfilled exception Already_fulfilled
let fulfill (self : _ t) (r : _ result) : unit = let fulfill (self : _ t) (r : _ result) : unit =
let fs = ref [] in let ok =
while match r with
let st = A.get self.st in | Ok x -> C.try_return self.st x
match st with | Error ebt -> C.try_cancel self.st ebt
| Done _ -> raise Already_fulfilled in
| Waiting { waiters = l } -> if not ok then raise Already_fulfilled
let did_swap = A.compare_and_set self.st st (Done r) in
if did_swap then (
(* success, now call all the waiters *)
fs := l;
false
) else
true
do
Domain_.relax ()
done;
List.iter (fun f -> try f r with _ -> ()) !fs;
()
let[@inline] fulfill_idempotent self r =
try fulfill self r with Already_fulfilled -> ()
(* ### combinators ### *) (* ### combinators ### *)
let spawn ~on f : _ t = let spawn ~on f : _ t =
let fut, promise = make () in let fut = make_ () in
let task () = let task () =
let res = try
try Ok (f ()) let res = f () in
C.return fut.st res
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Error { Exn_bt.exn; bt } let ebt = { Exn_bt.exn; bt } in
in C.cancel fut.st ebt
fulfill promise res
in in
Runner.run_async on task; Runner.run_async on task;
@ -127,8 +114,8 @@ let reify_error (f : 'a t) : 'a or_error t =
match peek f with match peek f with
| Some res -> return res | Some res -> return res
| None -> | None ->
let fut, promise = make () in let fut = make_ () in
on_result f (fun r -> fulfill promise (Ok r)); on_result f (fun r -> fulfill fut (Ok r));
fut fut
let[@inline] get_runner_ ?on () : Runner.t option = let[@inline] get_runner_ ?on () : Runner.t option =
@ -299,11 +286,6 @@ let choose_same a b : _ t =
| Ok y -> fulfill_idempotent promise (Ok y)); | Ok y -> fulfill_idempotent promise (Ok y));
fut fut
let peek_ok_assert_ (self : 'a t) : 'a =
match A.get self.st with
| Done (Ok x) -> x
| _ -> assert false
let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont
: _ t = : _ t =
let n_items = len cont in let n_items = len cont in
@ -387,6 +369,7 @@ let for_list ~on l f : unit t =
(* ### blocking ### *) (* ### blocking ### *)
(* TODO: use a trigger directly? *)
let wait_block (self : 'a t) : 'a or_error = let wait_block (self : 'a t) : 'a or_error =
match A.get self.st with match A.get self.st with
| Done x -> x (* fast path *) | Done x -> x (* fast path *)

4
src/core/trigger.ml Normal file
View file

@ -0,0 +1,4 @@
(** Triggers from picos
@since NEXT_RELEASE *)
include Picos.Trigger

View file

@ -1,6 +1,7 @@
module TLS = Thread_local_storage module TLS = Thread_local_storage
module Domain_pool_ = Moonpool_dpool module Domain_pool_ = Moonpool_dpool
(* TODO: replace with Picos.Fiber.FLS *)
type ls_value = .. type ls_value = ..
(** Key for task local storage *) (** Key for task local storage *)