feat: add Fut.Advanced.barrier_on_abstract_container_of_futures

this is a good building block for waiting on multiple futures.
This commit is contained in:
Simon Cruanes 2023-12-22 13:14:05 -05:00
parent 6aa8a2e7d2
commit 469cb89ecd
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 45 additions and 10 deletions

View file

@ -286,11 +286,12 @@ let peek_ok_assert_ (self : 'a t) : 'a =
| Done (Ok x) -> x
| _ -> assert false
let join_container_ ~iter ~map ~len cont : _ t =
let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont
: _ t =
let n_items = len cont in
if n_items = 0 then (
(* no items, return now. *)
let cont_empty = map (fun _ -> assert false) cont in
let cont_empty = aggregate_results (fun _ -> assert false) cont in
return cont_empty
) else (
let fut, promise = make () in
@ -302,7 +303,7 @@ let join_container_ ~iter ~map ~len cont : _ t =
let n = A.fetch_and_add missing (-1) in
if n = 1 then (
(* last future, we know they all succeeded, so resolve [fut] *)
let res = map peek_ok_assert_ cont in
let res = aggregate_results peek_ok_assert_ cont in
fulfill promise (Ok res)
)
| Error e_bt ->
@ -318,34 +319,45 @@ let join_container_ ~iter ~map ~len cont : _ t =
fut
)
module Advanced = struct
let barrier_on_abstract_container_of_futures =
barrier_on_abstract_container_of_futures
end
let join_array (a : _ t array) : _ array t =
match Array.length a with
| 0 -> return [||]
| 1 -> map ?on:None a.(0) ~f:(fun x -> [| x |])
| _ -> join_container_ ~len:Array.length ~map:Array.map ~iter:Array.iter a
| _ ->
barrier_on_abstract_container_of_futures ~len:Array.length
~aggregate_results:Array.map ~iter:Array.iter a
let join_list (l : _ t list) : _ list t =
match l with
| [] -> return []
| [ x ] -> map ?on:None x ~f:(fun x -> [ x ])
| _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l
| _ ->
barrier_on_abstract_container_of_futures ~len:List.length
~aggregate_results:List.map ~iter:List.iter l
let[@inline] map_list ~f l : _ list t = List.map f l |> join_list
let wait_array (a : _ t array) : unit t =
join_container_ a ~iter:Array.iter ~len:Array.length ~map:(fun _f _ -> ())
barrier_on_abstract_container_of_futures a ~iter:Array.iter ~len:Array.length
~aggregate_results:(fun _f _ -> ())
let wait_list (a : _ t list) : unit t =
join_container_ a ~iter:List.iter ~len:List.length ~map:(fun _f _ -> ())
barrier_on_abstract_container_of_futures a ~iter:List.iter ~len:List.length
~aggregate_results:(fun _f _ -> ())
let for_ ~on n f : unit t =
join_container_
barrier_on_abstract_container_of_futures
~len:(fun () -> n)
~iter:(fun yield () ->
for i = 0 to n - 1 do
yield (spawn ~on (fun () -> f i))
done)
~map:(fun _f () -> ())
~aggregate_results:(fun _f () -> ())
()
let for_array ~on arr f : unit t =

View file

@ -144,13 +144,36 @@ val join_array : 'a t array -> 'a array t
val join_list : 'a t list -> 'a list t
(** Wait for all the futures in the list. Fails if any future fails. *)
module Advanced : sig
val barrier_on_abstract_container_of_futures :
iter:(('a t -> unit) -> 'cont -> unit) ->
len:('cont -> int) ->
aggregate_results:(('a t -> 'a) -> 'cont -> 'res) ->
'cont ->
'res t
(** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len cont] takes a
container of futures ([cont]), with [len] elements,
and returns a future result of type [res]
(possibly another type of container).
This waits for all futures in [cont: 'cont] to be done
(futures obtained via [iter <some function> cont]). If they
all succeed, their results are aggregated into a new
result of type ['res] via [aggregate_results <some function> cont].
{b NOTE}: the behavior is not specified if [iter f cont] (for a function f)
doesn't call [f] on exactly [len cont] elements.
@since NEXT_RELEASE *)
end
val map_list : f:('a -> 'b t) -> 'a list -> 'b list t
(** [map_list ~f l] is like [join_list @@ List.map f l].
@since NEXT_RELEASE *)
val wait_array : _ t array -> unit t
(** [wait_array arr] waits for all futures in [arr] to resolve. It discards
the individual results of futures in [arr]. It fails if any future fails. *)
the individual results of futures in [arr]. It fails if any future fails. *)
val wait_list : _ t list -> unit t
(** [wait_list l] waits for all futures in [l] to resolve. It discards