From f93248a867474970caefe4ce64977976941e3910 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 26 Aug 2024 14:25:35 -0400 Subject: [PATCH] wip: use picos computations --- src/core/exn_bt.mli | 4 +- src/core/fut.ml | 157 ++++++++++++++++++++------------------------ src/core/trigger.ml | 4 ++ src/core/types_.ml | 1 + 4 files changed, 78 insertions(+), 88 deletions(-) create mode 100644 src/core/trigger.ml diff --git a/src/core/exn_bt.mli b/src/core/exn_bt.mli index b78e7b4c..37d075a8 100644 --- a/src/core/exn_bt.mli +++ b/src/core/exn_bt.mli @@ -4,8 +4,10 @@ @since 0.6 *) -include module type of Picos_exn_bt (** An exception bundled with a backtrace *) +include module type of struct + include Picos_exn_bt +end val exn : t -> exn val bt : t -> Printexc.raw_backtrace diff --git a/src/core/fut.ml b/src/core/fut.ml index 00354bad..2cba1259 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -1,118 +1,105 @@ module A = Atomic_ +module C = Picos.Computation type 'a or_error = ('a, Exn_bt.t) result type 'a waiter = 'a or_error -> unit - -type 'a state = - | Done of 'a or_error - | Waiting of { waiters: 'a waiter list } - -type 'a t = { st: 'a state A.t } [@@unboxed] +type 'a t = { st: 'a C.t } [@@unboxed] type 'a promise = 'a t +let[@inline] make_ () : _ t = + let fut = { st = C.create ~mode:`LIFO () } in + fut + let make () = - let fut = { st = A.make (Waiting { waiters = [] }) } in + let fut = make_ () in fut, fut -let[@inline] of_result x : _ t = { st = A.make (Done x) } -let[@inline] return x : _ t = of_result (Ok x) -let[@inline] fail exn bt : _ t = of_result (Error { Exn_bt.exn; bt }) -let[@inline] fail_exn_bt ebt = of_result (Error ebt) +let[@inline] return x : _ t = { st = C.returned x } -let[@inline] is_resolved self : bool = - match A.get self.st with - | Done _ -> true - | Waiting _ -> false +let[@inline] fail_exn_bt ebt = + let st = C.create () in + C.cancel st ebt; + { st } -let[@inline] peek self : _ option = - match A.get self.st with - | Done x -> Some x - | Waiting _ -> None +let[@inline] fail exn bt : _ t = fail_exn_bt { Exn_bt.exn; bt } -let[@inline] raise_if_failed self : unit = - match A.get self.st with - | Done (Error ebt) -> Exn_bt.raise ebt - | _ -> () +let[@inline] of_result = function + | Ok x -> return x + | Error ebt -> fail_exn_bt ebt -let[@inline] is_done self : bool = - match A.get self.st with - | Done _ -> true - | Waiting _ -> false +let[@inline] is_resolved self : bool = not (C.is_running self.st) +let is_done = is_resolved +let[@inline] peek self : _ option = C.peek self.st +let[@inline] raise_if_failed self : unit = C.check self.st let[@inline] is_success self = - match A.get self.st with - | Done (Ok _) -> true + match C.peek self.st with + | Some (Ok _) -> true | _ -> false -let[@inline] is_failed self = - match A.get self.st with - | Done (Error _) -> true - | _ -> false +let[@inline] is_failed self = C.is_canceled self.st exception Not_ready let[@inline] get_or_fail self = - match A.get self.st with - | Done x -> x - | Waiting _ -> raise Not_ready + match C.peek self.st with + | Some x -> x + | None -> raise Not_ready let[@inline] get_or_fail_exn self = - match A.get self.st with - | Done (Ok x) -> x - | Done (Error { exn; bt }) -> Printexc.raise_with_backtrace exn bt - | Waiting _ -> raise Not_ready + match C.peek self.st with + | Some (Ok x) -> x + | Some (Error ebt) -> Exn_bt.raise ebt + | None -> raise Not_ready + +let[@inline] peek_ok_assert_ (self : 'a t) : 'a = + if C.is_running self.st then assert false; + (* cannot block *) + C.await self.st + +let on_result_cb_ _tr f self : unit = + let res = + try Ok (peek_ok_assert_ self) + with exn -> + let ebt = Exn_bt.get exn in + Error ebt + in + f res let on_result (self : _ t) (f : _ waiter) : unit = - while - let st = A.get self.st in - match st with - | Done x -> - f x; - false - | Waiting { waiters = l } -> - not (A.compare_and_set self.st st (Waiting { waiters = f :: l })) - do - Domain_.relax () - done + let trigger = + (Trigger.from_action f self on_result_cb_ [@alert "-handler"]) + in + ignore (C.try_attach self.st trigger : bool) + +let[@inline] fulfill_idempotent self r = + match r with + | Ok x -> C.return self.st x + | Error ebt -> C.cancel self.st ebt exception Already_fulfilled let fulfill (self : _ t) (r : _ result) : unit = - let fs = ref [] in - while - let st = A.get self.st in - match st with - | Done _ -> raise Already_fulfilled - | Waiting { waiters = l } -> - let did_swap = A.compare_and_set self.st st (Done r) in - if did_swap then ( - (* success, now call all the waiters *) - fs := l; - false - ) else - true - do - Domain_.relax () - done; - List.iter (fun f -> try f r with _ -> ()) !fs; - () - -let[@inline] fulfill_idempotent self r = - try fulfill self r with Already_fulfilled -> () + let ok = + match r with + | Ok x -> C.try_return self.st x + | Error ebt -> C.try_cancel self.st ebt + in + if not ok then raise Already_fulfilled (* ### combinators ### *) let spawn ~on f : _ t = - let fut, promise = make () in + let fut = make_ () in let task () = - let res = - try Ok (f ()) - with exn -> - let bt = Printexc.get_raw_backtrace () in - Error { Exn_bt.exn; bt } - in - fulfill promise res + try + let res = f () in + C.return fut.st res + with exn -> + let bt = Printexc.get_raw_backtrace () in + let ebt = { Exn_bt.exn; bt } in + C.cancel fut.st ebt in Runner.run_async on task; @@ -127,8 +114,8 @@ let reify_error (f : 'a t) : 'a or_error t = match peek f with | Some res -> return res | None -> - let fut, promise = make () in - on_result f (fun r -> fulfill promise (Ok r)); + let fut = make_ () in + on_result f (fun r -> fulfill fut (Ok r)); fut let[@inline] get_runner_ ?on () : Runner.t option = @@ -299,11 +286,6 @@ let choose_same a b : _ t = | Ok y -> fulfill_idempotent promise (Ok y)); fut -let peek_ok_assert_ (self : 'a t) : 'a = - match A.get self.st with - | Done (Ok x) -> x - | _ -> assert false - let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont : _ t = let n_items = len cont in @@ -387,6 +369,7 @@ let for_list ~on l f : unit t = (* ### blocking ### *) +(* TODO: use a trigger directly? *) let wait_block (self : 'a t) : 'a or_error = match A.get self.st with | Done x -> x (* fast path *) diff --git a/src/core/trigger.ml b/src/core/trigger.ml new file mode 100644 index 00000000..baad75ce --- /dev/null +++ b/src/core/trigger.ml @@ -0,0 +1,4 @@ +(** Triggers from picos + @since NEXT_RELEASE *) + +include Picos.Trigger diff --git a/src/core/types_.ml b/src/core/types_.ml index fbe010a6..141be6dd 100644 --- a/src/core/types_.ml +++ b/src/core/types_.ml @@ -1,6 +1,7 @@ module TLS = Thread_local_storage module Domain_pool_ = Moonpool_dpool +(* TODO: replace with Picos.Fiber.FLS *) type ls_value = .. (** Key for task local storage *)