From 469cb89ecd74a8aacdccc0b1955060dc159e36e2 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 22 Dec 2023 13:14:05 -0500 Subject: [PATCH] feat: add `Fut.Advanced.barrier_on_abstract_container_of_futures` this is a good building block for waiting on multiple futures. --- src/fut.ml | 30 +++++++++++++++++++++--------- src/fut.mli | 25 ++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/src/fut.ml b/src/fut.ml index 727f9566..e1dd3f43 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -286,11 +286,12 @@ let peek_ok_assert_ (self : 'a t) : 'a = | Done (Ok x) -> x | _ -> assert false -let join_container_ ~iter ~map ~len cont : _ t = +let barrier_on_abstract_container_of_futures ~iter ~len ~aggregate_results cont + : _ t = let n_items = len cont in if n_items = 0 then ( (* no items, return now. *) - let cont_empty = map (fun _ -> assert false) cont in + let cont_empty = aggregate_results (fun _ -> assert false) cont in return cont_empty ) else ( let fut, promise = make () in @@ -302,7 +303,7 @@ let join_container_ ~iter ~map ~len cont : _ t = let n = A.fetch_and_add missing (-1) in if n = 1 then ( (* last future, we know they all succeeded, so resolve [fut] *) - let res = map peek_ok_assert_ cont in + let res = aggregate_results peek_ok_assert_ cont in fulfill promise (Ok res) ) | Error e_bt -> @@ -318,34 +319,45 @@ let join_container_ ~iter ~map ~len cont : _ t = fut ) +module Advanced = struct + let barrier_on_abstract_container_of_futures = + barrier_on_abstract_container_of_futures +end + let join_array (a : _ t array) : _ array t = match Array.length a with | 0 -> return [||] | 1 -> map ?on:None a.(0) ~f:(fun x -> [| x |]) - | _ -> join_container_ ~len:Array.length ~map:Array.map ~iter:Array.iter a + | _ -> + barrier_on_abstract_container_of_futures ~len:Array.length + ~aggregate_results:Array.map ~iter:Array.iter a let join_list (l : _ t list) : _ list t = match l with | [] -> return [] | [ x ] -> map ?on:None x ~f:(fun x -> [ x ]) - | _ -> join_container_ ~len:List.length ~map:List.map ~iter:List.iter l + | _ -> + barrier_on_abstract_container_of_futures ~len:List.length + ~aggregate_results:List.map ~iter:List.iter l let[@inline] map_list ~f l : _ list t = List.map f l |> join_list let wait_array (a : _ t array) : unit t = - join_container_ a ~iter:Array.iter ~len:Array.length ~map:(fun _f _ -> ()) + barrier_on_abstract_container_of_futures a ~iter:Array.iter ~len:Array.length + ~aggregate_results:(fun _f _ -> ()) let wait_list (a : _ t list) : unit t = - join_container_ a ~iter:List.iter ~len:List.length ~map:(fun _f _ -> ()) + barrier_on_abstract_container_of_futures a ~iter:List.iter ~len:List.length + ~aggregate_results:(fun _f _ -> ()) let for_ ~on n f : unit t = - join_container_ + barrier_on_abstract_container_of_futures ~len:(fun () -> n) ~iter:(fun yield () -> for i = 0 to n - 1 do yield (spawn ~on (fun () -> f i)) done) - ~map:(fun _f () -> ()) + ~aggregate_results:(fun _f () -> ()) () let for_array ~on arr f : unit t = diff --git a/src/fut.mli b/src/fut.mli index d365f265..2dc31f2a 100644 --- a/src/fut.mli +++ b/src/fut.mli @@ -144,13 +144,36 @@ val join_array : 'a t array -> 'a array t val join_list : 'a t list -> 'a list t (** Wait for all the futures in the list. Fails if any future fails. *) +module Advanced : sig + val barrier_on_abstract_container_of_futures : + iter:(('a t -> unit) -> 'cont -> unit) -> + len:('cont -> int) -> + aggregate_results:(('a t -> 'a) -> 'cont -> 'res) -> + 'cont -> + 'res t + (** [barrier_on_abstract_container_of_futures ~iter ~aggregate_results ~len cont] takes a + container of futures ([cont]), with [len] elements, + and returns a future result of type [res] + (possibly another type of container). + + This waits for all futures in [cont: 'cont] to be done + (futures obtained via [iter cont]). If they + all succeed, their results are aggregated into a new + result of type ['res] via [aggregate_results cont]. + + {b NOTE}: the behavior is not specified if [iter f cont] (for a function f) + doesn't call [f] on exactly [len cont] elements. + + @since NEXT_RELEASE *) +end + val map_list : f:('a -> 'b t) -> 'a list -> 'b list t (** [map_list ~f l] is like [join_list @@ List.map f l]. @since NEXT_RELEASE *) val wait_array : _ t array -> unit t (** [wait_array arr] waits for all futures in [arr] to resolve. It discards - the individual results of futures in [arr]. It fails if any future fails. *) + the individual results of futures in [arr]. It fails if any future fails. *) val wait_list : _ t list -> unit t (** [wait_list l] waits for all futures in [l] to resolve. It discards