From 2326ae00782ffc06d01ef980a0b504c6a51a70d0 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 10 Jul 2023 01:19:16 -0400 Subject: [PATCH] modify `Fork_join.for_` to avoid the allocator. The allocator means calling a closure at each step, which means local a reference will have to be heap allocated (and worse, that floats will be unboxed). Instead we give the function a pair of low,high bounds for a local for. --- benchs/pi.ml | 10 ++++---- src/fork_join.ml | 44 +++++++++++++------------------- src/fork_join.mli | 18 ++++++------- test/effect-based/t_fork_join.ml | 6 +++-- 4 files changed, 35 insertions(+), 43 deletions(-) diff --git a/benchs/pi.ml b/benchs/pi.ml index 1bba7d7c..1dd55fb9 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -65,12 +65,12 @@ let run_fork_join num_steps : float = Fork_join.for_ ~chunk_size:(3 + (num_steps / num_tasks)) num_steps - (fun range -> + (fun low high -> let sum = ref 0. in - range (fun i -> - let x = (float i +. 0.5) *. step in - sum := !sum +. (4. /. (1. +. (x *. x)))); - + for i = low to high do + let x = (float i +. 0.5) *. step in + sum := !sum +. (4. /. (1. +. (x *. x))) + done; let sum = !sum in Lock.update global_sum (fun n -> n +. sum))); diff --git a/src/fork_join.ml b/src/fork_join.ml index 10829864..5b06ee79 100644 --- a/src/fork_join.ml +++ b/src/fork_join.ml @@ -2,8 +2,6 @@ module A = Atomic_ -type 'a iter = ('a -> unit) -> unit - module State_ = struct type 'a single_res = | St_none @@ -93,7 +91,7 @@ let both f g : _ * _ = let both_ignore f g = ignore (both f g : _ * _) -let for_ ?chunk_size n (f : int iter -> unit) : unit = +let for_ ?chunk_size n (f : int -> int -> unit) : unit = let has_failed = A.make false in let missing = A.make n in @@ -107,15 +105,7 @@ let for_ ?chunk_size n (f : int iter -> unit) : unit = let start_tasks ~run (suspension : Suspend_.suspension) = let task_for ~offset ~len_range = - (* range to process within this task *) - let range : int iter = - fun yield -> - for j = offset to offset + len_range - 1 do - yield j - done - in - - match f range with + match f offset (offset + len_range - 1) with | () -> if A.fetch_and_add missing (-len_range) = len_range then (* all tasks done successfully *) @@ -154,10 +144,11 @@ let all_array ?chunk_size (fs : _ array) : _ array = let arr = Array.make len None in (* parallel for *) - for_ ?chunk_size len (fun range -> - range (fun i -> - let x = fs.(i) () in - arr.(i) <- Some x)); + for_ ?chunk_size len (fun low high -> + for i = low to high do + let x = fs.(i) () in + arr.(i) <- Some x + done); (* get all results *) Array.map @@ -172,10 +163,11 @@ let all_list ?chunk_size fs : _ list = let all_init ?chunk_size n f : _ list = let arr = Array.make n None in - for_ ?chunk_size n (fun range -> - range (fun i -> - let x = f i in - arr.(i) <- Some x)); + for_ ?chunk_size n (fun low high -> + for i = low to high do + let x = f i in + arr.(i) <- Some x + done); (* get all results *) List.init n (fun i -> @@ -192,13 +184,13 @@ let map_reduce_commutative ?chunk_size ~gen ~map ~(reduce : 'b commutative_monoid) n : 'b = let res = Lock.create (reduce.neutral ()) in - for_ ?chunk_size n (fun range -> + for_ ?chunk_size n (fun low high -> let local_acc = ref (reduce.neutral ()) in - range (fun i -> - let x = gen i in - let y = map x in - - local_acc := reduce.combine !local_acc y); + for i = low to high do + let x = gen i in + let y = map x in + local_acc := reduce.combine !local_acc y + done; Lock.update res (fun res -> reduce.combine res !local_acc)); Lock.get res diff --git a/src/fork_join.mli b/src/fork_join.mli index dd28f5d7..ef8435dd 100644 --- a/src/fork_join.mli +++ b/src/fork_join.mli @@ -6,10 +6,6 @@ [@@@ifge 5.0] -type 'a iter = ('a -> unit) -> unit -(** Iterators of type ['a]. - @since NEXT_RELEASE *) - val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b (** [both f g] runs [f()] and [g()], potentially in parallel, and returns their result when both are done. @@ -27,11 +23,12 @@ val both_ignore : (unit -> _) -> (unit -> _) -> unit @since 0.3 {b NOTE} this is only available on OCaml 5. *) -val for_ : ?chunk_size:int -> int -> (int iter -> unit) -> unit +val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit (** [for_ n f] is the parallel version of [for i=0 to n-1 do f i done]. - [f] is called with a [range] parameter, which is an iterator on indices - [f] should process. + [f] is called with parameters [low] and [high] and must use them like so: + {[ for j = low to high do (* … actual work *) done ]}. + If [chunk_size=1] then [low=high] and the loop is not actually needed. @param chunk_size controls the granularity of parallelism. The default chunk size is not specified. @@ -42,11 +39,12 @@ val for_ : ?chunk_size:int -> int -> (int iter -> unit) -> unit let total_sum = Atomic.make 0 let() = for_ ~chunk_size:5 100 - (fun range -> + (fun low high -> (* iterate on the range sequentially. The range should have 5 items or less. *) let local_sum = ref 0 in - range - (fun i -> local_sum := !local_sum + n); + for j=low to high do + local_sum := !local_sum + j + done; ignore (Atomic.fetch_and_add total_sum !local_sum : int))) let() = assert (Atomic.get total_sum = 4950) diff --git a/test/effect-based/t_fork_join.ml b/test/effect-based/t_fork_join.ml index 9fcaa8eb..348c503e 100644 --- a/test/effect-based/t_fork_join.ml +++ b/test/effect-based/t_fork_join.ml @@ -43,10 +43,12 @@ let () = let total_sum = Atomic.make 0 in Pool.run_wait_block pool (fun () -> - Fork_join.for_ ~chunk_size:5 100 (fun range -> + Fork_join.for_ ~chunk_size:5 100 (fun low high -> (* iterate on the range sequentially. The range should have 5 items or less. *) let local_sum = ref 0 in - range (fun i -> local_sum := !local_sum + i); + for i = low to high do + local_sum := !local_sum + i + done; ignore (Atomic.fetch_and_add total_sum !local_sum : int))); assert (Atomic.get total_sum = 4950)