From 55f831bc8b551391097adf0259e4c8e49382ff96 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 10 Jul 2023 01:14:16 -0400 Subject: [PATCH] add `Fork_join.{for_,map_reduce_commutative}` --- Makefile | 2 +- benchs/pi.ml | 45 +++++++++++++-- src/fork_join.ml | 98 +++++++++++++++++++++++++++----- src/fork_join.mli | 87 +++++++++++++++++++++++++++- test/effect-based/t_fork_join.ml | 13 ++++- 5 files changed, 222 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index 3472b41a..16fa5b64 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ bench-fib: './_build/default/benchs/fib_rec.exe -cutoff $(BENCH_CUTOFF) -niter $(NITER) -psize={psize} -n $(N)' PI_NSTEPS?=100_000_000 -PI_MODES?=seq,par1 +PI_MODES?=seq,par1,forkjoin bench-pi: @echo running for N=$(PI_NSTEPS) dune build $(DUNE_OPTS_BENCH) benchs/pi.exe diff --git a/benchs/pi.ml b/benchs/pi.ml index 075573b5..1bba7d7c 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -2,6 +2,7 @@ open Moonpool +let ( let@ ) = ( @@ ) let j = ref 0 let spf = Printf.sprintf @@ -16,15 +17,15 @@ let run_sequential (num_steps : int) : float = pi (** Create a pool *) -let mk_pool () = +let with_pool f = if !j = 0 then - Pool.create ~per_domain:1 () + Pool.with_ ~per_domain:1 f else - Pool.create ~min:!j () + Pool.with_ ~min:!j f (** Run in parallel using {!Fut.for_} *) let run_par1 (num_steps : int) : float = - let pool = mk_pool () in + let@ pool = with_pool () in let num_tasks = Pool.size pool in @@ -51,12 +52,42 @@ let run_par1 (num_steps : int) : float = pi [@@@ifge 5.0] + +let run_fork_join num_steps : float = + let@ pool = with_pool () in + + let num_tasks = Pool.size pool in + + let step = 1. /. float num_steps in + let global_sum = Lock.create 0. in + + Pool.run_wait_block pool (fun () -> + Fork_join.for_ + ~chunk_size:(3 + (num_steps / num_tasks)) + num_steps + (fun range -> + let sum = ref 0. in + range (fun i -> + let x = (float i +. 0.5) *. step in + sum := !sum +. (4. /. (1. +. (x *. x)))); + + let sum = !sum in + Lock.update global_sum (fun n -> n +. sum))); + + let pi = step *. Lock.get global_sum in + pi + [@@@else_] + +let run_fork_join _ = + failwith "fork join not available on this version of OCaml" + [@@@endif] type mode = | Sequential | Par1 + | Fork_join let () = let mode = ref Sequential in @@ -66,13 +97,16 @@ let () = let set_mode = function | "seq" -> mode := Sequential | "par1" -> mode := Par1 + | "forkjoin" -> mode := Fork_join | _s -> failwith (spf "unknown mode %S" _s) in let opts = [ "-n", Arg.Set_int n, " number of steps"; - "-mode", Arg.Symbol ([ "seq"; "par1" ], set_mode), " mode of execution"; + ( "-mode", + Arg.Symbol ([ "seq"; "par1"; "forkjoin" ], set_mode), + " mode of execution" ); "-j", Arg.Set_int j, " number of threads"; "-t", Arg.Set time, " printing timing"; ] @@ -85,6 +119,7 @@ let () = match !mode with | Sequential -> run_sequential !n | Par1 -> run_par1 !n + | Fork_join -> run_fork_join !n in let elapsed : float = Unix.gettimeofday () -. t_start in diff --git a/src/fork_join.ml b/src/fork_join.ml index 66ee8dd8..10829864 100644 --- a/src/fork_join.ml +++ b/src/fork_join.ml @@ -2,6 +2,8 @@ module A = Atomic_ +type 'a iter = ('a -> unit) -> unit + module State_ = struct type 'a single_res = | St_none @@ -91,22 +93,34 @@ let both f g : _ * _ = let both_ignore f g = ignore (both f g : _ * _) -let all_list fs : _ list = - let len = List.length fs in - let arr = Array.make len None in +let for_ ?chunk_size n (f : int iter -> unit) : unit = let has_failed = A.make false in - let missing = A.make len in + let missing = A.make n in + + let chunk_size = + match chunk_size with + | Some cs -> max 1 (min n cs) + | None -> + (* guess: try to have roughly one task per core *) + max 1 (n / Domain_.recommended_number ()) + in let start_tasks ~run (suspension : Suspend_.suspension) = - let task_for i f = - try - let x = f () in - arr.(i) <- Some x; + let task_for ~offset ~len_range = + (* range to process within this task *) + let range : int iter = + fun yield -> + for j = offset to offset + len_range - 1 do + yield j + done + in - if A.fetch_and_add missing (-1) = 1 then + match f range with + | () -> + if A.fetch_and_add missing (-len_range) = len_range then (* all tasks done successfully *) suspension (Ok ()) - with exn -> + | exception exn -> let bt = Printexc.get_raw_backtrace () in if not (A.exchange has_failed true) then (* first one to fail, and [missing] must be >= 2 @@ -114,23 +128,79 @@ let all_list fs : _ list = suspension (Error (exn, bt)) in - List.iteri (fun i f -> run ~with_handler:true (fun () -> task_for i f)) fs + let i = ref 0 in + while !i < n do + let offset = !i in + + let len_range = min chunk_size (n - offset) in + assert (offset + len_range <= n); + + run ~with_handler:true (fun () -> task_for ~offset ~len_range); + i := !i + len_range + done in Suspend_.suspend { Suspend_.handle = (fun ~run suspension -> - (* nothing else is started, no race condition possible *) + (* run tasks, then we'll resume [suspension] *) start_tasks ~run suspension); }; + () + +let all_array ?chunk_size (fs : _ array) : _ array = + let len = Array.length fs in + let arr = Array.make len None in + + (* parallel for *) + for_ ?chunk_size len (fun range -> + range (fun i -> + let x = fs.(i) () in + arr.(i) <- Some x)); (* get all results *) - List.init len (fun i -> + Array.map + (function + | None -> assert false + | Some x -> x) + arr + +let all_list ?chunk_size fs : _ list = + Array.to_list @@ all_array ?chunk_size @@ Array.of_list fs + +let all_init ?chunk_size n f : _ list = + let arr = Array.make n None in + + for_ ?chunk_size n (fun range -> + range (fun i -> + let x = f i in + arr.(i) <- Some x)); + + (* get all results *) + List.init n (fun i -> match arr.(i) with | None -> assert false | Some x -> x) -let all_init n f = all_list @@ List.init n (fun i () -> f i) +type 'a commutative_monoid = { + neutral: unit -> 'a; (** Neutral element *) + combine: 'a -> 'a -> 'a; (** Combine two items. *) +} + +let map_reduce_commutative ?chunk_size ~gen ~map + ~(reduce : 'b commutative_monoid) n : 'b = + let res = Lock.create (reduce.neutral ()) in + + for_ ?chunk_size n (fun range -> + let local_acc = ref (reduce.neutral ()) in + range (fun i -> + let x = gen i in + let y = map x in + + local_acc := reduce.combine !local_acc y); + + Lock.update res (fun res -> reduce.combine res !local_acc)); + Lock.get res [@@@endif] diff --git a/src/fork_join.mli b/src/fork_join.mli index 4f9dffa5..dd28f5d7 100644 --- a/src/fork_join.mli +++ b/src/fork_join.mli @@ -6,6 +6,10 @@ [@@@ifge 5.0] +type 'a iter = ('a -> unit) -> unit +(** Iterators of type ['a]. + @since NEXT_RELEASE *) + val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b (** [both f g] runs [f()] and [g()], potentially in parallel, and returns their result when both are done. @@ -23,16 +27,95 @@ val both_ignore : (unit -> _) -> (unit -> _) -> unit @since 0.3 {b NOTE} this is only available on OCaml 5. *) -val all_list : (unit -> 'a) list -> 'a list +val for_ : ?chunk_size:int -> int -> (int iter -> unit) -> unit +(** [for_ n f] is the parallel version of [for i=0 to n-1 do f i done]. + + [f] is called with a [range] parameter, which is an iterator on indices + [f] should process. + + @param chunk_size controls the granularity of parallelism. + The default chunk size is not specified. + See {!all_array} or {!all_list} for more details. + + Example: + {[ + let total_sum = Atomic.make 0 + + let() = for_ ~chunk_size:5 100 + (fun range -> + (* iterate on the range sequentially. The range should have 5 items or less. *) + let local_sum = ref 0 in + range + (fun i -> local_sum := !local_sum + n); + ignore (Atomic.fetch_and_add total_sum !local_sum : int))) + + let() = assert (Atomic.get total_sum = 4950) + ]} + + @since NEXT_RELEASE + {b NOTE} this is only available on OCaml 5. *) + +val all_array : ?chunk_size:int -> (unit -> 'a) array -> 'a array +(** [all_array fs] runs all functions in [fs] in tasks, and waits for + all the results. + + @param chunk_size if equal to [n], groups items by [n] to be run in + a single task. Default is [1]. + + @since NEXT_RELEASE + {b NOTE} this is only available on OCaml 5. *) + +val all_list : ?chunk_size:int -> (unit -> 'a) list -> 'a list (** [all_list fs] runs all functions in [fs] in tasks, and waits for all the results. + + @param chunk_size if equal to [n], groups items by [n] to be run in + a single task. Default is not specified. + This parameter is available since NEXT_RELEASE. + @since 0.3 {b NOTE} this is only available on OCaml 5. *) -val all_init : int -> (int -> 'a) -> 'a list +val all_init : ?chunk_size:int -> int -> (int -> 'a) -> 'a list (** [all_init n f] runs functions [f 0], [f 1], … [f (n-1)] in tasks, and waits for all the results. + + @param chunk_size if equal to [n], groups items by [n] to be run in + a single task. Default is not specified. + This parameter is available since NEXT_RELEASE. + @since 0.3 {b NOTE} this is only available on OCaml 5. *) +type 'a commutative_monoid = { + neutral: unit -> 'a; (** Neutral element *) + combine: 'a -> 'a -> 'a; (** Combine two items. *) +} +(** A commutative monoid: order of operations does not matter. + @since NEXT_RELEASE *) + +val map_reduce_commutative : + ?chunk_size:int -> + gen:(int -> 'a) -> + map:('a -> 'b) -> + reduce:'b commutative_monoid -> + int -> + 'b +(** [map_reduce_commutative ~gen ~map ~reduce n] produces items of type ['a] + using [gen 0], [gen 1], …, [gen (n-1)]. Items are then mapped using [map] + in background tasks (each task processes up to [chunk_size] items at a time). + + Then, items of type ['b] obtained by mapping are reduced together using the + definition of the commutative monoid [reduce]. The order in which they + are reduced is not specified. + + @param chunk_size controls granularity of the mapping process + @param gen generates items to process based on an index + @param map takes an item and processes it, independently of other items + @param reduce is used to aggregate results of mapping. + + @since NEXT_RELEASE + {b NOTE} this is only available on OCaml 5. +*) + [@@@endif] diff --git a/test/effect-based/t_fork_join.ml b/test/effect-based/t_fork_join.ml index 9b050c21..9fcaa8eb 100644 --- a/test/effect-based/t_fork_join.ml +++ b/test/effect-based/t_fork_join.ml @@ -1,6 +1,6 @@ [@@@ifge 5.0] -open Moonpool +open! Moonpool let pool = Pool.create ~min:4 () @@ -39,4 +39,15 @@ let () = let exp_sum = List.init 42 (fun x -> x * x) |> List.fold_left ( + ) 0 in assert (par_sum = exp_sum) +let () = + let total_sum = Atomic.make 0 in + + Pool.run_wait_block pool (fun () -> + Fork_join.for_ ~chunk_size:5 100 (fun range -> + (* iterate on the range sequentially. The range should have 5 items or less. *) + let local_sum = ref 0 in + range (fun i -> local_sum := !local_sum + i); + ignore (Atomic.fetch_and_add total_sum !local_sum : int))); + assert (Atomic.get total_sum = 4950) + [@@@endif]