require 4.08; add infix operators; add dep on Either

This commit is contained in:
Simon Cruanes 2023-06-01 21:34:27 -04:00
parent d7220c75f5
commit 5dcfab7cce
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 184 additions and 10 deletions

View file

@ -17,7 +17,7 @@ jobs:
#- macos-latest #- macos-latest
#- windows-latest #- windows-latest
ocaml-compiler: ocaml-compiler:
- '4.05' - '4.08'
- '4.14' - '4.14'
- '5.0' - '5.0'

View file

@ -21,8 +21,9 @@
(name moonpool) (name moonpool)
(synopsis "Pools of threads supported by a pool of domains") (synopsis "Pools of threads supported by a pool of domains")
(depends (depends
(ocaml (>= 4.05)) (ocaml (>= 4.08))
dune dune
either
(mdx (mdx
(and (and
(>= 1.9.0) (>= 1.9.0)

View file

@ -8,8 +8,9 @@ tags: ["thread" "pool" "domain"]
homepage: "https://github.com/c-cube/moonpool" homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues" bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [ depends: [
"ocaml" {>= "4.05"} "ocaml" {>= "4.08"}
"dune" {>= "3.0"} "dune" {>= "3.0"}
"either"
"mdx" {>= "1.9.0" & with-test} "mdx" {>= "1.9.0" & with-test}
"odoc" {with-doc} "odoc" {with-doc}
] ]

View file

@ -2,7 +2,7 @@
(public_name moonpool) (public_name moonpool)
(name moonpool) (name moonpool)
(private_modules atomic_ domain_) (private_modules atomic_ domain_)
(libraries threads)) (libraries threads either))
(rule (rule
(targets atomic_.ml) (targets atomic_.ml)

View file

@ -231,6 +231,9 @@ module Fut = struct
() ()
done done
let[@inline] fulfill_idempotent self r =
try fulfill self r with Already_fulfilled -> ()
(* ### combinators ### *) (* ### combinators ### *)
let spawn ~on f : _ t = let spawn ~on f : _ t =
@ -304,6 +307,90 @@ module Fut = struct
fut2 fut2
let rec update_ (st : 'a A.t) f : 'a =
let x = A.get st in
let y = f x in
if A.compare_and_set st x y then
y
else
update_ st f
let both a b : _ t =
match peek a, peek b with
| Some (Ok x), Some (Ok y) -> return (x, y)
| Some (Error (e, bt)), _ | _, Some (Error (e, bt)) -> fail e bt
| _ ->
let fut, promise = make () in
let st = A.make `Neither in
on_result a (function
| Error err -> fulfill_idempotent promise (Error err)
| Ok x ->
(match
update_ st (function
| `Neither -> `Left x
| `Right y -> `Both (x, y)
| _ -> assert false)
with
| `Both (x, y) -> fulfill promise (Ok (x, y))
| _ -> ()));
on_result b (function
| Error err -> fulfill_idempotent promise (Error err)
| Ok y ->
(match
update_ st (function
| `Left x -> `Both (x, y)
| `Neither -> `Right y
| _ -> assert false)
with
| `Both (x, y) -> fulfill promise (Ok (x, y))
| _ -> ()));
fut
let choose a b : _ t =
match peek a, peek b with
| Some (Ok x), _ -> return (Either.Left x)
| _, Some (Ok y) -> return (Either.Right y)
| Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ ->
let fut, promise = make () in
let one_failure = A.make false in
on_result a (function
| Error err ->
if A.exchange one_failure true then
(* the other one failed already *)
fulfill_idempotent promise (Error err)
| Ok x -> fulfill_idempotent promise (Ok (Either.Left x)));
on_result b (function
| Error err ->
if A.exchange one_failure true then
(* the other one failed already *)
fulfill_idempotent promise (Error err)
| Ok y -> fulfill_idempotent promise (Ok (Either.Right y)));
fut
let choose_same a b : _ t =
match peek a, peek b with
| Some (Ok x), _ -> return x
| _, Some (Ok y) -> return y
| Some (Error (e, bt)), Some (Error _) -> fail e bt
| _ ->
let fut, promise = make () in
let one_failure = A.make false in
on_result a (function
| Error err ->
if A.exchange one_failure true then
fulfill_idempotent promise (Error err)
| Ok x -> fulfill_idempotent promise (Ok x));
on_result b (function
| Error err ->
if A.exchange one_failure true then
fulfill_idempotent promise (Error err)
| Ok y -> fulfill_idempotent promise (Ok y));
fut
let peek_ok_assert_ (self : 'a t) : 'a = let peek_ok_assert_ (self : 'a t) : 'a =
match A.get self.st with match A.get self.st with
| Done (Ok x) -> x | Done (Ok x) -> x
@ -346,6 +433,12 @@ module Fut = struct
| [ x ] -> map ?on:None x ~f:(fun x -> [ x ]) | [ x ] -> map ?on:None x ~f:(fun x -> [ x ])
| _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l | _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l
let wait_array (a : _ t array) : unit t =
join_container_ a ~iter:Array.iter ~len:Array.length ~map:(fun _f _ -> ())
let wait_list (a : _ t list) : unit t =
join_container_ a ~iter:List.iter ~len:List.length ~map:(fun _f _ -> ())
let for_ ~on n f : unit t = let for_ ~on n f : unit t =
let futs = Array.init n (fun i -> spawn ~on (fun () -> f i)) in let futs = Array.init n (fun i -> spawn ~on (fun () -> f i)) in
join_container_ join_container_
@ -385,4 +478,35 @@ module Fut = struct
match wait_block self with match wait_block self with
| Ok x -> x | Ok x -> x
| Error (e, bt) -> Printexc.raise_with_backtrace e bt | Error (e, bt) -> Printexc.raise_with_backtrace e bt
module type INFIX = sig
val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t
val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t
val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t
val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t
val ( and* ) : 'a t -> 'b t -> ('a * 'b) t
end
module Infix_ (X : sig
val pool : Pool.t option
end) : INFIX = struct
let[@inline] ( >|= ) x f = map ?on:X.pool ~f x
let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x
let ( let+ ) = ( >|= )
let ( let* ) = ( >>= )
let ( and+ ) = both
let ( and* ) = both
end
include Infix_ (struct
let pool = None
end)
module Infix (X : sig
val pool : Pool.t
end) =
Infix_ (struct
let pool = Some X.pool
end)
end end

View file

@ -59,6 +59,10 @@ module Fut : sig
(** Fullfill the promise, setting the future at the same time. (** Fullfill the promise, setting the future at the same time.
@raise Already_fulfilled if the promise is already fulfilled. *) @raise Already_fulfilled if the promise is already fulfilled. *)
val fulfill_idempotent : 'a promise -> 'a or_error -> unit
(** Fullfill the promise, setting the future at the same time.
Does nothing if the promise is already fulfilled. *)
val return : 'a -> 'a t val return : 'a -> 'a t
(** Already settled future, with a result *) (** Already settled future, with a result *)
@ -92,12 +96,34 @@ module Fut : sig
and fails with [e] if [fut] fails with [e] or [f x] raises [e]. and fails with [e] if [fut] fails with [e] or [f x] raises [e].
@param on if provided, [f] runs on the given pool *) @param on if provided, [f] runs on the given pool *)
val both : 'a t -> 'b t -> ('a * 'b) t
(** [both a b] succeeds with [x, y] if [a] succeeds with [x] and
[b] succeeds with [y], or fails if any of them fails. *)
val choose : 'a t -> 'b t -> ('a, 'b) Either.t t
(** [choose a b] succeeds [Left x] or [Right y] if [a] succeeds with [x] or
[b] succeeds with [y], or fails if both of them fails.
If they both succeed, it is not specified which result is used. *)
val choose_same : 'a t -> 'a t -> 'a t
(** [choose_same a b] succeeds with the value of one of [a] or [b] if
they succeed, or fails if both fail.
If they both succeed, it is not specified which result is used. *)
val join_array : 'a t array -> 'a array t val join_array : 'a t array -> 'a array t
(** Wait for all the futures in the array. Fails if any future fails. *) (** Wait for all the futures in the array. Fails if any future fails. *)
val join_list : 'a t list -> 'a list t val join_list : 'a t list -> 'a list t
(** Wait for all the futures in the list. Fails if any future fails. *) (** Wait for all the futures in the list. Fails if any future fails. *)
val wait_array : _ t array -> unit t
(** [wait_array arr] waits for all futures in [arr] to resolve. It discards
the individual results of futures in [arr]. It fails if any future fails. *)
val wait_list : _ t list -> unit t
(** [wait_list l] waits for all futures in [l] to resolve. It discards
the individual results of futures in [l]. It fails if any future fails. *)
val for_ : on:Pool.t -> int -> (int -> unit) -> unit t val for_ : on:Pool.t -> int -> (int -> unit) -> unit t
(** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the pool, and returns (** [for_ ~on n f] runs [f 0], [f 1], …, [f (n-1)] on the pool, and returns
a future that resolves when all the tasks have resolved, or fails a future that resolves when all the tasks have resolved, or fails
@ -109,14 +135,36 @@ module Fut : sig
(** [wait_block fut] blocks the current thread until [fut] is resolved, (** [wait_block fut] blocks the current thread until [fut] is resolved,
and returns its value. and returns its value.
A word of warning: this can easily cause deadlocks. A good rule to avoid A word of warning: this will monopolize the calling thread until the future
deadlocks is to run this from outside of any pool, or to have an acyclic order resolves. This can also easily cause deadlocks, if enough threads in a pool
between pools where [wait_block] is only called from a pool on futures evaluated call [wait_block] on futures running on the same pool or a pool depending on it.
in a pool that comes lower in the hierarchy.
If this rule is broken, it is possible for all threads in a pool to wait for A good rule to avoid deadlocks is to run this from outside of any pool,
futures that can only make progress on these same threads, hence the deadlock. or to have an acyclic order between pools where [wait_block]
is only called from a pool on futures evaluated in a pool that comes lower
in the hierarchy.
If this rule is broken, it is possible for all threads in a pool to wait
for futures that can only make progress on these same threads,
hence the deadlock.
*) *)
val wait_block_exn : 'a t -> 'a val wait_block_exn : 'a t -> 'a
(** Same as {!wait_block} but re-raises the exception if the future failed. *) (** Same as {!wait_block} but re-raises the exception if the future failed. *)
module type INFIX = sig
val ( >|= ) : 'a t -> ('a -> 'b) -> 'b t
val ( >>= ) : 'a t -> ('a -> 'b t) -> 'b t
val ( let+ ) : 'a t -> ('a -> 'b) -> 'b t
val ( and+ ) : 'a t -> 'b t -> ('a * 'b) t
val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t
val ( and* ) : 'a t -> 'b t -> ('a * 'b) t
end
include INFIX
(** Operators that run on the same thread *)
(** Make infix combinators *)
module Infix (_ : sig
val pool : Pool.t
end) : INFIX
end end