modify Fork_join.for_ to avoid the allocator.

The allocator means calling a closure at each step, which means local
a reference will have to be heap allocated (and worse, that floats will
be unboxed). Instead we give the function a pair of low,high bounds for
a local for.
This commit is contained in:
Simon Cruanes 2023-07-10 01:19:16 -04:00
parent 55f831bc8b
commit 2326ae0078
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 35 additions and 43 deletions

View file

@ -65,12 +65,12 @@ let run_fork_join num_steps : float =
Fork_join.for_
~chunk_size:(3 + (num_steps / num_tasks))
num_steps
(fun range ->
(fun low high ->
let sum = ref 0. in
range (fun i ->
let x = (float i +. 0.5) *. step in
sum := !sum +. (4. /. (1. +. (x *. x))));
for i = low to high do
let x = (float i +. 0.5) *. step in
sum := !sum +. (4. /. (1. +. (x *. x)))
done;
let sum = !sum in
Lock.update global_sum (fun n -> n +. sum)));

View file

@ -2,8 +2,6 @@
module A = Atomic_
type 'a iter = ('a -> unit) -> unit
module State_ = struct
type 'a single_res =
| St_none
@ -93,7 +91,7 @@ let both f g : _ * _ =
let both_ignore f g = ignore (both f g : _ * _)
let for_ ?chunk_size n (f : int iter -> unit) : unit =
let for_ ?chunk_size n (f : int -> int -> unit) : unit =
let has_failed = A.make false in
let missing = A.make n in
@ -107,15 +105,7 @@ let for_ ?chunk_size n (f : int iter -> unit) : unit =
let start_tasks ~run (suspension : Suspend_.suspension) =
let task_for ~offset ~len_range =
(* range to process within this task *)
let range : int iter =
fun yield ->
for j = offset to offset + len_range - 1 do
yield j
done
in
match f range with
match f offset (offset + len_range - 1) with
| () ->
if A.fetch_and_add missing (-len_range) = len_range then
(* all tasks done successfully *)
@ -154,10 +144,11 @@ let all_array ?chunk_size (fs : _ array) : _ array =
let arr = Array.make len None in
(* parallel for *)
for_ ?chunk_size len (fun range ->
range (fun i ->
let x = fs.(i) () in
arr.(i) <- Some x));
for_ ?chunk_size len (fun low high ->
for i = low to high do
let x = fs.(i) () in
arr.(i) <- Some x
done);
(* get all results *)
Array.map
@ -172,10 +163,11 @@ let all_list ?chunk_size fs : _ list =
let all_init ?chunk_size n f : _ list =
let arr = Array.make n None in
for_ ?chunk_size n (fun range ->
range (fun i ->
let x = f i in
arr.(i) <- Some x));
for_ ?chunk_size n (fun low high ->
for i = low to high do
let x = f i in
arr.(i) <- Some x
done);
(* get all results *)
List.init n (fun i ->
@ -192,13 +184,13 @@ let map_reduce_commutative ?chunk_size ~gen ~map
~(reduce : 'b commutative_monoid) n : 'b =
let res = Lock.create (reduce.neutral ()) in
for_ ?chunk_size n (fun range ->
for_ ?chunk_size n (fun low high ->
let local_acc = ref (reduce.neutral ()) in
range (fun i ->
let x = gen i in
let y = map x in
local_acc := reduce.combine !local_acc y);
for i = low to high do
let x = gen i in
let y = map x in
local_acc := reduce.combine !local_acc y
done;
Lock.update res (fun res -> reduce.combine res !local_acc));
Lock.get res

View file

@ -6,10 +6,6 @@
[@@@ifge 5.0]
type 'a iter = ('a -> unit) -> unit
(** Iterators of type ['a].
@since NEXT_RELEASE *)
val both : (unit -> 'a) -> (unit -> 'b) -> 'a * 'b
(** [both f g] runs [f()] and [g()], potentially in parallel,
and returns their result when both are done.
@ -27,11 +23,12 @@ val both_ignore : (unit -> _) -> (unit -> _) -> unit
@since 0.3
{b NOTE} this is only available on OCaml 5. *)
val for_ : ?chunk_size:int -> int -> (int iter -> unit) -> unit
val for_ : ?chunk_size:int -> int -> (int -> int -> unit) -> unit
(** [for_ n f] is the parallel version of [for i=0 to n-1 do f i done].
[f] is called with a [range] parameter, which is an iterator on indices
[f] should process.
[f] is called with parameters [low] and [high] and must use them like so:
{[ for j = low to high do (* … actual work *) done ]}.
If [chunk_size=1] then [low=high] and the loop is not actually needed.
@param chunk_size controls the granularity of parallelism.
The default chunk size is not specified.
@ -42,11 +39,12 @@ val for_ : ?chunk_size:int -> int -> (int iter -> unit) -> unit
let total_sum = Atomic.make 0
let() = for_ ~chunk_size:5 100
(fun range ->
(fun low high ->
(* iterate on the range sequentially. The range should have 5 items or less. *)
let local_sum = ref 0 in
range
(fun i -> local_sum := !local_sum + n);
for j=low to high do
local_sum := !local_sum + j
done;
ignore (Atomic.fetch_and_add total_sum !local_sum : int)))
let() = assert (Atomic.get total_sum = 4950)

View file

@ -43,10 +43,12 @@ let () =
let total_sum = Atomic.make 0 in
Pool.run_wait_block pool (fun () ->
Fork_join.for_ ~chunk_size:5 100 (fun range ->
Fork_join.for_ ~chunk_size:5 100 (fun low high ->
(* iterate on the range sequentially. The range should have 5 items or less. *)
let local_sum = ref 0 in
range (fun i -> local_sum := !local_sum + i);
for i = low to high do
local_sum := !local_sum + i
done;
ignore (Atomic.fetch_and_add total_sum !local_sum : int)));
assert (Atomic.get total_sum = 4950)