add Fork_join.{for_,map_reduce_commutative}

This commit is contained in:
Simon Cruanes 2023-07-10 01:14:16 -04:00
parent 858755e812
commit 55f831bc8b
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 222 additions and 23 deletions

View file

@ -29,7 +29,7 @@ bench-fib:
'./_build/default/benchs/fib_rec.exe -cutoff $(BENCH_CUTOFF) -niter $(NITER) -psize={psize} -n $(N)'
PI_NSTEPS?=100_000_000
PI_MODES?=seq,par1
PI_MODES?=seq,par1,forkjoin
bench-pi:
@echo running for N=$(PI_NSTEPS)
dune build $(DUNE_OPTS_BENCH) benchs/pi.exe

View file

@ -2,6 +2,7 @@
open Moonpool
let ( let@ ) = ( @@ )
let j = ref 0
let spf = Printf.sprintf
@ -16,15 +17,15 @@ let run_sequential (num_steps : int) : float =
pi
(** Create a pool *)
let mk_pool () =
let with_pool f =
if !j = 0 then
Pool.create ~per_domain:1 ()
Pool.with_ ~per_domain:1 f
else
Pool.create ~min:!j ()
Pool.with_ ~min:!j f
(** Run in parallel using {!Fut.for_} *)
let run_par1 (num_steps : int) : float =
let pool = mk_pool () in
let@ pool = with_pool () in
let num_tasks = Pool.size pool in
@ -51,12 +52,42 @@ let run_par1 (num_steps : int) : float =
pi
[@@@ifge 5.0]
let run_fork_join num_steps : float =
let@ pool = with_pool () in
let num_tasks = Pool.size pool in
let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in
Pool.run_wait_block pool (fun () ->
Fork_join.for_
~chunk_size:(3 + (num_steps / num_tasks))
num_steps
(fun range ->
let sum = ref 0. in
range (fun i ->
let x = (float i +. 0.5) *. step in
sum := !sum +. (4. /. (1. +. (x *. x))));
let sum = !sum in
Lock.update global_sum (fun n -> n +. sum)));
let pi = step *. Lock.get global_sum in
pi
[@@@else_]
let run_fork_join _ =
failwith "fork join not available on this version of OCaml"
[@@@endif]
type mode =
| Sequential
| Par1
| Fork_join
let () =
let mode = ref Sequential in
@ -66,13 +97,16 @@ let () =
let set_mode = function
| "seq" -> mode := Sequential
| "par1" -> mode := Par1
| "forkjoin" -> mode := Fork_join
| _s -> failwith (spf "unknown mode %S" _s)
in
let opts =
[
"-n", Arg.Set_int n, " number of steps";
"-mode", Arg.Symbol ([ "seq"; "par1" ], set_mode), " mode of execution";
( "-mode",
Arg.Symbol ([ "seq"; "par1"; "forkjoin" ], set_mode),
" mode of execution" );
"-j", Arg.Set_int j, " number of threads";
"-t", Arg.Set time, " printing timing";
]
@ -85,6 +119,7 @@ let () =
match !mode with
| Sequential -> run_sequential !n
| Par1 -> run_par1 !n
| Fork_join -> run_fork_join !n
in
let elapsed : float = Unix.gettimeofday () -. t_start in

View file

@ -2,6 +2,8 @@
module A = Atomic_
type 'a iter = ('a -> unit) -> unit
module State_ = struct
type 'a single_res =
| St_none
@ -91,22 +93,34 @@ let both f g : _ * _ =
let both_ignore f g = ignore (both f g : _ * _)
let all_list fs : _ list =
let len = List.length fs in
let arr = Array.make len None in
let for_ ?chunk_size n (f : int iter -> unit) : unit =
let has_failed = A.make false in
let missing = A.make len in
let missing = A.make n in
let chunk_size =
match chunk_size with
| Some cs -> max 1 (min n cs)
| None ->
(* guess: try to have roughly one task per core *)
max 1 (n / Domain_.recommended_number ())
in
let start_tasks ~run (suspension : Suspend_.suspension) =
let task_for i f =
try
let x = f () in
arr.(i) <- Some x;
let task_for ~offset ~len_range =
(* range to process within this task *)
let range : int iter =
fun yield ->
for j = offset to offset + len_range - 1 do
yield j
done
in
if A.fetch_and_add missing (-1) = 1 then
match f range with
| () ->
if A.fetch_and_add missing (-len_range) = len_range then
(* all tasks done successfully *)
suspension (Ok ())
with exn ->
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
if not (A.exchange has_failed true) then
(* first one to fail, and [missing] must be >= 2
@ -114,23 +128,79 @@ let all_list fs : _ list =
suspension (Error (exn, bt))
in
List.iteri (fun i f -> run ~with_handler:true (fun () -> task_for i f)) fs
let i = ref 0 in
while !i < n do
let offset = !i in
let len_range = min chunk_size (n - offset) in
assert (offset + len_range <= n);
run ~with_handler:true (fun () -> task_for ~offset ~len_range);
i := !i + len_range
done
in
Suspend_.suspend
{
Suspend_.handle =
(fun ~run suspension ->
(* nothing else is started, no race condition possible *)
(* run tasks, then we'll resume [suspension] *)
start_tasks ~run suspension);
};
()
let all_array ?chunk_size (fs : _ array) : _ array =
let len = Array.length fs in
let arr = Array.make len None in
(* parallel for *)
for_ ?chunk_size len (fun range ->
range (fun i ->
let x = fs.(i) () in
arr.(i) <- Some x));
(* get all results *)
List.init len (fun i ->
Array.map
(function
| None -> assert false
| Some x -> x)
arr
let all_list ?chunk_size fs : _ list =
Array.to_list @@ all_array ?chunk_size @@ Array.of_list fs
let all_init ?chunk_size n f : _ list =
let arr = Array.make n None in
for_ ?chunk_size n (fun range ->
range (fun i ->
let x = f i in
arr.(i) <- Some x));
(* get all results *)
List.init n (fun i ->
match arr.(i) with
| None -> assert false
| Some x -> x)
let all_init n f = all_list @@ List.init n (fun i () -> f i)
type 'a commutative_monoid = {
neutral: unit -> 'a; (** Neutral element *)
combine: 'a -> 'a -> 'a; (** Combine two items. *)
}
let map_reduce_commutative ?chunk_size ~gen ~map
~(reduce : 'b commutative_monoid) n : 'b =
let res = Lock.create (reduce.neutral ()) in
for_ ?chunk_size n (fun range ->
let local_acc = ref (reduce.neutral ()) in
range (fun i ->
let x = gen i in
let y = map x in
local_acc := reduce.combine !local_acc y);
Lock.update res (fun res -> reduce.combine res !local_acc));
Lock.get res
[@@@endif]

View file

@ -6,6 +6,10 @@
[@@@ifge 5.0]
type 'a iter = ('a -> unit) -> unit
(** Iterators of type ['a].
@since NEXT_RELEASE *)
val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
(** [both f g] runs [f()] and [g()], potentially in parallel,
and returns their result when both are done.
@ -23,16 +27,95 @@ val both_ignore : (unit -> _) -> (unit -> _) -> unit
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val all_list : (unit -> 'a) list -> 'a list
val for_ : ?chunk_size:int -> int -> (int iter -> unit) -> unit
(** [for_ n f] is the parallel version of [for i=0 to n-1 do f i done].
[f] is called with a [range] parameter, which is an iterator on indices
[f] should process.
@param chunk_size controls the granularity of parallelism.
The default chunk size is not specified.
See {!all_array} or {!all_list} for more details.
Example:
{[
let total_sum = Atomic.make 0
let() = for_ ~chunk_size:5 100
(fun range ->
(* iterate on the range sequentially. The range should have 5 items or less. *)
let local_sum = ref 0 in
range
(fun i -> local_sum := !local_sum + n);
ignore (Atomic.fetch_and_add total_sum !local_sum : int)))
let() = assert (Atomic.get total_sum = 4950)
]}
@since NEXT_RELEASE
{b NOTE} this is only available on OCaml 5. *)
val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array
(** [all_array fs] runs all functions in [fs] in tasks, and waits for
all the results.
@param chunk_size if equal to [n], groups items by [n] to be run in
a single task. Default is [1].
@since NEXT_RELEASE
{b NOTE} this is only available on OCaml 5. *)
val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list
(** [all_list fs] runs all functions in [fs] in tasks, and waits for
all the results.
@param chunk_size if equal to [n], groups items by [n] to be run in
a single task. Default is not specified.
This parameter is available since NEXT_RELEASE.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val all_init : int -> (int -> 'a) -> 'a list
val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list
(** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits for
all the results.
@param chunk_size if equal to [n], groups items by [n] to be run in
a single task. Default is not specified.
This parameter is available since NEXT_RELEASE.
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
type 'a commutative_monoid = {
neutral: unit -> 'a; (** Neutral element *)
combine: 'a -> 'a -> 'a; (** Combine two items. *)
}
(** A commutative monoid: order of operations does not matter.
@since NEXT_RELEASE *)
val map_reduce_commutative :
?chunk_size:int ->
gen:(int -> 'a) ->
map:('a -> 'b) ->
reduce:'b commutative_monoid ->
int ->
'b
(** [map_reduce_commutative ~gen ~map ~reduce n] produces items of type ['a]
using [gen 0], [gen 1], , [gen (n-1)]. Items are then mapped using [map]
in background tasks (each task processes up to [chunk_size] items at a time).
Then, items of type ['b] obtained by mapping are reduced together using the
definition of the commutative monoid [reduce]. The order in which they
are reduced is not specified.
@param chunk_size controls granularity of the mapping process
@param gen generates items to process based on an index
@param map takes an item and processes it, independently of other items
@param reduce is used to aggregate results of mapping.
@since NEXT_RELEASE
{b NOTE} this is only available on OCaml 5.
*)
[@@@endif]

View file

@ -1,6 +1,6 @@
[@@@ifge 5.0]
open Moonpool
open! Moonpool
let pool = Pool.create ~min:4 ()
@ -39,4 +39,15 @@ let () =
let exp_sum = List.init 42 (fun x -> x * x) |> List.fold_left ( + ) 0 in
assert (par_sum = exp_sum)
let () =
let total_sum = Atomic.make 0 in
Pool.run_wait_block pool (fun () ->
Fork_join.for_ ~chunk_size:5 100 (fun range ->
(* iterate on the range sequentially. The range should have 5 items or less. *)
let local_sum = ref 0 in
range (fun i -> local_sum := !local_sum + i);
ignore (Atomic.fetch_and_add total_sum !local_sum : int)));
assert (Atomic.get total_sum = 4950)
[@@@endif]