From 92df5610995fa176913ae863ffbea9759d6027ea Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 Jan 2026 16:41:15 -0500 Subject: [PATCH] feat: add `Fut.for_iter` makes it easy to parallelize work on any sort of iterator, even though we don't know its size yet. --- src/core/fut.ml | 36 ++++++++++++++++++++++++++++++++++++ src/core/fut.mli | 8 ++++++++ 2 files changed, 44 insertions(+) diff --git a/src/core/fut.ml b/src/core/fut.ml index 0c72752d..7c599d06 100644 --- a/src/core/fut.ml +++ b/src/core/fut.ml @@ -377,6 +377,42 @@ let for_list ~on l f : unit t = let futs = List.rev_map (fun x -> spawn ~on (fun () -> f x)) l in wait_list futs +type 'a iter = ('a -> unit) -> unit + +let for_iter ~on (it : _ iter) f : unit t = + let fut, promise = make () in + + (* start at one for the task that traverses [it] *) + let missing = A.make 1 in + + (* callback called when a future is resolved *) + let on_res = function + | None -> + let n = A.fetch_and_add missing (-1) in + if n = 1 then + (* last future, we know they all succeeded, so resolve [fut] *) + fulfill promise (Ok ()) + | Some e_bt -> + (* immediately cancel all other [on_res] *) + let n = A.exchange missing 0 in + if n > 0 then + (* we're the only one to set to 0, so we can fulfill [fut] + with an error. *) + fulfill promise (Error e_bt) + in + + let fut_iter = + spawn ~on (fun () -> + it (fun item -> + A.incr missing; + let fut = spawn ~on (fun () -> f item) in + on_result_ignore fut on_res)) + in + + on_result_ignore fut_iter on_res; + + fut + (* ### blocking ### *) let push_queue_ _tr q () = Bb_queue.push q () diff --git a/src/core/fut.mli b/src/core/fut.mli index d106a2e6..311234c3 100644 --- a/src/core/fut.mli +++ b/src/core/fut.mli @@ -249,6 +249,14 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t (** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f]. @since 0.2 *) +type 'a iter = ('a -> unit) -> unit +(** ['a iter] is an iterator on ['a]. + @since NEXT_RELEASE *) + +val for_iter : on:Runner.t -> 'a iter -> ('a -> unit) -> unit t +(** [for_iter ~on iter f] runs [f] on every item in [iter] in parallel. + @since NEXT_RELEASE *) + (** {2 Await} This suspends the current task using an OCaml 5 algebraic effect, and makes