From aa0dea3e342ee001c31b18fc3dbce931c70fd378 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 26 Jul 2023 16:22:42 -0400 Subject: [PATCH] add `Fut.{reify_error,bind_reify_error}` --- src/d_pool_.ml | 2 ++ src/fut.ml | 9 +++++++++ src/fut.mli | 16 +++++++++++++++- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/d_pool_.ml b/src/d_pool_.ml index 1a391687..af0f7811 100644 --- a/src/d_pool_.ml +++ b/src/d_pool_.ml @@ -9,8 +9,10 @@ let work_ _i q : unit = (* A domain level worker. It should not do too much except for starting new threads for pools. *) +(* TODO: take [Run of (unit -> unit) | Incr | Decr] to handle refcounting? *) type worker = { q: (unit -> unit) Bb_queue.t } [@@unboxed] +(* TODO: use [worker option array], with a mechanism to start/stop them *) let domains_ : worker array lazy_t = lazy ((* number of domains we spawn. Note that we spawn n-1 domains diff --git a/src/fut.ml b/src/fut.ml index 821a77f0..42767b61 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -100,6 +100,14 @@ let spawn ~on f : _ t = Pool.run_async on task; fut +let reify_error (f : 'a t) : 'a or_error t = + match peek f with + | Some res -> return res + | None -> + let fut, promise = make () in + on_result f (fun r -> fulfill promise (Ok r)); + fut + let map ?on ~f fut : _ t = let map_res r = match r with @@ -161,6 +169,7 @@ let bind ?on ~f fut : _ t = fut2 +let bind_reify_error ?on ~f fut : _ t = bind ?on ~f (reify_error fut) let join ?on fut = bind ?on fut ~f:(fun x -> x) let update_ (st : 'a A.t) f : 'a = diff --git a/src/fut.mli b/src/fut.mli index 461b0e23..bc9c2658 100644 --- a/src/fut.mli +++ b/src/fut.mli @@ -85,6 +85,12 @@ val spawn : on:Runner.t -> (unit -> 'a) -> 'a t (** [spaw ~on f] runs [f()] on the given runner [on], and return a future that will hold its result. *) +val reify_error : 'a t -> 'a or_error t +(** [reify_error fut] turns a failing future into a non-failing + one that contain [Error (exn, bt)]. A non-failing future + returning [x] is turned into [Ok x] + @since NEXT_RELEASE *) + val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t (** [map ?on ~f fut] returns a new future [fut2] that resolves with [f x] if [fut] resolved with [x]; @@ -92,11 +98,19 @@ val map : ?on:Runner.t -> f:('a -> 'b) -> 'a t -> 'b t @param on if provided, [f] runs on the given runner *) val bind : ?on:Runner.t -> f:('a -> 'b t) -> 'a t -> 'b t -(** [map ?on ~f fut] returns a new future [fut2] that resolves +(** [bind ?on ~f fut] returns a new future [fut2] that resolves like the future [f x] if [fut] resolved with [x]; and fails with [e] if [fut] fails with [e] or [f x] raises [e]. @param on if provided, [f] runs on the given runner *) +val bind_reify_error : ?on:Runner.t -> f:('a or_error -> 'b t) -> 'a t -> 'b t +(** [bind_reify_error ?on ~f fut] returns a new future [fut2] that resolves + like the future [f (Ok x)] if [fut] resolved with [x]; + and resolves like the future [f (Error (exn, bt))] + if [fut] fails with [exn] and backtrace [bt]. + @param on if provided, [f] runs on the given runner + @since NEXT_RELEASE *) + val join : ?on:Runner.t -> 'a t t -> 'a t (** [join fut] is [fut >>= Fun.id]. It joins the inner layer of the future. @since 0.2 *)