mirror of
https://github.com/c-cube/moonpool.git
synced 2026-01-29 12:44:51 -05:00
Merge 1c7f36007e into 189a95a514
This commit is contained in:
commit
6cc1a4170a
4 changed files with 66 additions and 0 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,3 +1,4 @@
|
||||||
_build
|
_build
|
||||||
_opam
|
_opam
|
||||||
*.tmp
|
*.tmp
|
||||||
|
*.pdf
|
||||||
|
|
|
||||||
|
|
@ -377,6 +377,42 @@ let for_list ~on l f : unit t =
|
||||||
let futs = List.rev_map (fun x -> spawn ~on (fun () -> f x)) l in
|
let futs = List.rev_map (fun x -> spawn ~on (fun () -> f x)) l in
|
||||||
wait_list futs
|
wait_list futs
|
||||||
|
|
||||||
|
type 'a iter = ('a -> unit) -> unit
|
||||||
|
|
||||||
|
let for_iter ~on (it : _ iter) f : unit t =
|
||||||
|
let fut, promise = make () in
|
||||||
|
|
||||||
|
(* start at one for the task that traverses [it] *)
|
||||||
|
let missing = A.make 1 in
|
||||||
|
|
||||||
|
(* callback called when a future is resolved *)
|
||||||
|
let on_res = function
|
||||||
|
| None ->
|
||||||
|
let n = A.fetch_and_add missing (-1) in
|
||||||
|
if n = 1 then
|
||||||
|
(* last future, we know they all succeeded, so resolve [fut] *)
|
||||||
|
fulfill promise (Ok ())
|
||||||
|
| Some e_bt ->
|
||||||
|
(* immediately cancel all other [on_res] *)
|
||||||
|
let n = A.exchange missing 0 in
|
||||||
|
if n > 0 then
|
||||||
|
(* we're the only one to set to 0, so we can fulfill [fut]
|
||||||
|
with an error. *)
|
||||||
|
fulfill promise (Error e_bt)
|
||||||
|
in
|
||||||
|
|
||||||
|
let fut_iter =
|
||||||
|
spawn ~on (fun () ->
|
||||||
|
it (fun item ->
|
||||||
|
A.incr missing;
|
||||||
|
let fut = spawn ~on (fun () -> f item) in
|
||||||
|
on_result_ignore fut on_res))
|
||||||
|
in
|
||||||
|
|
||||||
|
on_result_ignore fut_iter on_res;
|
||||||
|
|
||||||
|
fut
|
||||||
|
|
||||||
(* ### blocking ### *)
|
(* ### blocking ### *)
|
||||||
|
|
||||||
let push_queue_ _tr q () = Bb_queue.push q ()
|
let push_queue_ _tr q () = Bb_queue.push q ()
|
||||||
|
|
|
||||||
|
|
@ -249,6 +249,14 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
|
||||||
(** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f].
|
(** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f].
|
||||||
@since 0.2 *)
|
@since 0.2 *)
|
||||||
|
|
||||||
|
type 'a iter = ('a -> unit) -> unit
|
||||||
|
(** ['a iter] is an iterator on ['a].
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
val for_iter : on:Runner.t -> 'a iter -> ('a -> unit) -> unit t
|
||||||
|
(** [for_iter ~on iter f] runs [f] on every item in [iter] in parallel.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
(** {2 Await}
|
(** {2 Await}
|
||||||
|
|
||||||
This suspends the current task using an OCaml 5 algebraic effect, and makes
|
This suspends the current task using an OCaml 5 algebraic effect, and makes
|
||||||
|
|
|
||||||
|
|
@ -143,3 +143,24 @@ let () =
|
||||||
in
|
in
|
||||||
|
|
||||||
List.iter run_for [ 1; 10; 50; 1_000 ]
|
List.iter run_for [ 1; 10; 50; 1_000 ]
|
||||||
|
|
||||||
|
let () =
|
||||||
|
let n_items = Atomic.make 0 in
|
||||||
|
let fut =
|
||||||
|
Fut.for_iter ~on:pool (fun _yield -> ()) (fun _item -> Atomic.incr n_items)
|
||||||
|
in
|
||||||
|
Fut.wait_block_exn fut;
|
||||||
|
assert (Atomic.get n_items = 0)
|
||||||
|
|
||||||
|
let () =
|
||||||
|
let run_for n =
|
||||||
|
let l = List.init n (fun x -> x) in
|
||||||
|
let sum = Atomic.make 0 in
|
||||||
|
let iter = fun yield -> List.iter yield l in
|
||||||
|
Fut.for_iter ~on:pool iter (fun x ->
|
||||||
|
ignore (Atomic.fetch_and_add sum x : int))
|
||||||
|
|> Fut.wait_block_exn;
|
||||||
|
assert (Atomic.get sum = List.fold_left ( + ) 0 l)
|
||||||
|
in
|
||||||
|
|
||||||
|
List.iter run_for [ 0; 1; 2; 3; 10; 50; 1_000 ]
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue