feat: add Fut.for_iter

makes it easy to parallelize work on any sort of iterator, even though
we don't know its size yet.
This commit is contained in:
Simon Cruanes 2026-01-02 16:41:15 -05:00
parent 9471ff2aff
commit 92df561099
2 changed files with 44 additions and 0 deletions

View file

@ -377,6 +377,42 @@ let for_list ~on l f : unit t =
let futs = List.rev_map (fun x -> spawn ~on (fun () -> f x)) l in
wait_list futs
type 'a iter = ('a -> unit) -> unit
let for_iter ~on (it : _ iter) f : unit t =
let fut, promise = make () in
(* start at one for the task that traverses [it] *)
let missing = A.make 1 in
(* callback called when a future is resolved *)
let on_res = function
| None ->
let n = A.fetch_and_add missing (-1) in
if n = 1 then
(* last future, we know they all succeeded, so resolve [fut] *)
fulfill promise (Ok ())
| Some e_bt ->
(* immediately cancel all other [on_res] *)
let n = A.exchange missing 0 in
if n > 0 then
(* we're the only one to set to 0, so we can fulfill [fut]
with an error. *)
fulfill promise (Error e_bt)
in
let fut_iter =
spawn ~on (fun () ->
it (fun item ->
A.incr missing;
let fut = spawn ~on (fun () -> f item) in
on_result_ignore fut on_res))
in
on_result_ignore fut_iter on_res;
fut
(* ### blocking ### *)
let push_queue_ _tr q () = Bb_queue.push q ()

View file

@ -249,6 +249,14 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
(** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f].
@since 0.2 *)
type 'a iter = ('a -> unit) -> unit
(** ['a iter] is an iterator on ['a].
@since NEXT_RELEASE *)
val for_iter : on:Runner.t -> 'a iter -> ('a -> unit) -> unit t
(** [for_iter ~on iter f] runs [f] on every item in [iter] in parallel.
@since NEXT_RELEASE *)
(** {2 Await}
This suspends the current task using an OCaml 5 algebraic effect, and makes