From 5dcfab7cceed75e7c80595c448bafe6d1bb30fad Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 1 Jun 2023 21:34:27 -0400 Subject: [PATCH] require 4.08; add infix operators; add dep on Either --- .github/workflows/main.yml | 2 +- dune-project | 3 +- moonpool.opam | 3 +- src/dune | 2 +- src/moonpool.ml | 124 +++++++++++++++++++++++++++++++++++++ src/moonpool.mli | 60 ++++++++++++++++-- 6 files changed, 184 insertions(+), 10 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b82a1da0..dbe2ae40 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -17,7 +17,7 @@ jobs: #- macos-latest #- windows-latest ocaml-compiler: - - '4.05' + - '4.08' - '4.14' - '5.0' diff --git a/dune-project b/dune-project index 1d7a24f2..1be733f3 100644 --- a/dune-project +++ b/dune-project @@ -21,8 +21,9 @@ (name moonpool) (synopsis "Pools of threads supported by a pool of domains") (depends - (ocaml (>= 4.05)) + (ocaml (>= 4.08)) dune + either (mdx (and (>= 1.9.0) diff --git a/moonpool.opam b/moonpool.opam index a19a8a27..23ff8e1a 100644 --- a/moonpool.opam +++ b/moonpool.opam @@ -8,8 +8,9 @@ tags: ["thread" "pool" "domain"] homepage: "https://github.com/c-cube/moonpool" bug-reports: "https://github.com/c-cube/moonpool/issues" depends: [ - "ocaml" {>= "4.05"} + "ocaml" {>= "4.08"} "dune" {>= "3.0"} + "either" "mdx" {>= "1.9.0" & with-test} "odoc" {with-doc} ] diff --git a/src/dune b/src/dune index 765785b1..35d06f81 100644 --- a/src/dune +++ b/src/dune @@ -2,7 +2,7 @@ (public_name moonpool) (name moonpool) (private_modules atomic_ domain_) - (libraries threads)) + (libraries threads either)) (rule (targets atomic_.ml) diff --git a/src/moonpool.ml b/src/moonpool.ml index a9e22fdc..75af6d18 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -231,6 +231,9 @@ module Fut = struct () done + let[@inline] fulfill_idempotent self r = + try fulfill self r with Already_fulfilled -> () + (* ### combinators ### *) let spawn ~on f : _ t = @@ -304,6 +307,90 @@ module Fut = struct fut2 + let rec update_ (st : 'a A.t) f : 'a = + let x = A.get st in + let y = f x in + if A.compare_and_set st x y then + y + else + update_ st f + + let both a b : _ t = + match peek a, peek b with + | Some (Ok x), Some (Ok y) -> return (x, y) + | Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt + | _ -> + let fut, promise = make () in + + let st = A.make `Neither in + on_result a (function + | Error err -> fulfill_idempotent promise (Error err) + | Ok x -> + (match + update_ st (function + | `Neither -> `Left x + | `Right y -> `Both (x, y) + | _ -> assert false) + with + | `Both (x, y) -> fulfill promise (Ok (x, y)) + | _ -> ())); + on_result b (function + | Error err -> fulfill_idempotent promise (Error err) + | Ok y -> + (match + update_ st (function + | `Left x -> `Both (x, y) + | `Neither -> `Right y + | _ -> assert false) + with + | `Both (x, y) -> fulfill promise (Ok (x, y)) + | _ -> ())); + fut + + let choose a b : _ t = + match peek a, peek b with + | Some (Ok x), _ -> return (Either.Left x) + | _, Some (Ok y) -> return (Either.Right y) + | Some (Error (e, bt)), Some (Error _) -> fail e bt + | _ -> + let fut, promise = make () in + + let one_failure = A.make false in + on_result a (function + | Error err -> + if A.exchange one_failure true then + (* the other one failed already *) + fulfill_idempotent promise (Error err) + | Ok x -> fulfill_idempotent promise (Ok (Either.Left x))); + on_result b (function + | Error err -> + if A.exchange one_failure true then + (* the other one failed already *) + fulfill_idempotent promise (Error err) + | Ok y -> fulfill_idempotent promise (Ok (Either.Right y))); + fut + + let choose_same a b : _ t = + match peek a, peek b with + | Some (Ok x), _ -> return x + | _, Some (Ok y) -> return y + | Some (Error (e, bt)), Some (Error _) -> fail e bt + | _ -> + let fut, promise = make () in + + let one_failure = A.make false in + on_result a (function + | Error err -> + if A.exchange one_failure true then + fulfill_idempotent promise (Error err) + | Ok x -> fulfill_idempotent promise (Ok x)); + on_result b (function + | Error err -> + if A.exchange one_failure true then + fulfill_idempotent promise (Error err) + | Ok y -> fulfill_idempotent promise (Ok y)); + fut + let peek_ok_assert_ (self : 'a t) : 'a = match A.get self.st with | Done (Ok x) -> x @@ -346,6 +433,12 @@ module Fut = struct | [ x ] -> map ?on:None x ~f:(fun x -> [ x ]) | _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l + let wait_array (a : _ t array) : unit t = + join_container_ a ~iter:Array.iter ~len:Array.length ~map:(fun _f _ -> ()) + + let wait_list (a : _ t list) : unit t = + join_container_ a ~iter:List.iter ~len:List.length ~map:(fun _f _ -> ()) + let for_ ~on n f : unit t = let futs = Array.init n (fun i -> spawn ~on (fun () -> f i)) in join_container_ @@ -385,4 +478,35 @@ module Fut = struct match wait_block self with | Ok x -> x | Error (e, bt) -> Printexc.raise_with_backtrace e bt + + module type INFIX = sig + val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t + val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t + val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t + val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t + val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t + val ( and* ) : 'a t -> 'b t -> ('a * 'b) t + end + + module Infix_ (X : sig + val pool : Pool.t option + end) : INFIX = struct + let[@inline] ( >|= ) x f = map ?on:X.pool ~f x + let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x + let ( let+ ) = ( >|= ) + let ( let* ) = ( >>= ) + let ( and+ ) = both + let ( and* ) = both + end + + include Infix_ (struct + let pool = None + end) + + module Infix (X : sig + val pool : Pool.t + end) = + Infix_ (struct + let pool = Some X.pool + end) end diff --git a/src/moonpool.mli b/src/moonpool.mli index 8694ccf8..bd0cdc27 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -59,6 +59,10 @@ module Fut : sig (** Fullfill the promise, setting the future at the same time. @raise Already_fulfilled if the promise is already fulfilled. *) + val fulfill_idempotent : 'a promise -> 'a or_error -> unit + (** Fullfill the promise, setting the future at the same time. + Does nothing if the promise is already fulfilled. *) + val return : 'a -> 'a t (** Already settled future, with a result *) @@ -92,12 +96,34 @@ module Fut : sig and fails with [e] if [fut] fails with [e] or [f x] raises [e]. @param on if provided, [f] runs on the given pool *) + val both : 'a t -> 'b t -> ('a * 'b) t + (** [both a b] succeeds with [x, y] if [a] succeeds with [x] and + [b] succeeds with [y], or fails if any of them fails. *) + + val choose : 'a t -> 'b t -> ('a, 'b) Either.t t + (** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or + [b] succeeds with [y], or fails if both of them fails. + If they both succeed, it is not specified which result is used. *) + + val choose_same : 'a t -> 'a t -> 'a t + (** [choose_same a b] succeeds with the value of one of [a] or [b] if + they succeed, or fails if both fail. + If they both succeed, it is not specified which result is used. *) + val join_array : 'a t array -> 'a array t (** Wait for all the futures in the array. Fails if any future fails. *) val join_list : 'a t list -> 'a list t (** Wait for all the futures in the list. Fails if any future fails. *) + val wait_array : _ t array -> unit t + (** [wait_array arr] waits for all futures in [arr] to resolve. It discards + the individual results of futures in [arr]. It fails if any future fails. *) + + val wait_list : _ t list -> unit t + (** [wait_list l] waits for all futures in [l] to resolve. It discards + the individual results of futures in [l]. It fails if any future fails. *) + val for_ : on:Pool.t -> int -> (int -> unit) -> unit t (** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the pool, and returns a future that resolves when all the tasks have resolved, or fails @@ -109,14 +135,36 @@ module Fut : sig (** [wait_block fut] blocks the current thread until [fut] is resolved, and returns its value. - A word of warning: this can easily cause deadlocks. A good rule to avoid - deadlocks is to run this from outside of any pool, or to have an acyclic order - between pools where [wait_block] is only called from a pool on futures evaluated - in a pool that comes lower in the hierarchy. - If this rule is broken, it is possible for all threads in a pool to wait for - futures that can only make progress on these same threads, hence the deadlock. + A word of warning: this will monopolize the calling thread until the future + resolves. This can also easily cause deadlocks, if enough threads in a pool + call [wait_block] on futures running on the same pool or a pool depending on it. + + A good rule to avoid deadlocks is to run this from outside of any pool, + or to have an acyclic order between pools where [wait_block] + is only called from a pool on futures evaluated in a pool that comes lower + in the hierarchy. + If this rule is broken, it is possible for all threads in a pool to wait + for futures that can only make progress on these same threads, + hence the deadlock. *) val wait_block_exn : 'a t -> 'a (** Same as {!wait_block} but re-raises the exception if the future failed. *) + + module type INFIX = sig + val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t + val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t + val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t + val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t + val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t + val ( and* ) : 'a t -> 'b t -> ('a * 'b) t + end + + include INFIX + (** Operators that run on the same thread *) + + (** Make infix combinators *) + module Infix (_ : sig + val pool : Pool.t + end) : INFIX end