From bc7c6b253e5c4666324c151e024884085c3ec79c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Jun 2023 16:19:23 -0400 Subject: [PATCH] raytracer: add some parallelism --- .gitignore | 1 + benchs/raytracer/blocking_queue.ml | 54 ++++++++ benchs/raytracer/blocking_queue.mli | 20 +++ benchs/raytracer/raytracer.ml | 192 ++++++++++++++++++++-------- src/bb_queue.ml | 10 +- src/bb_queue.mli | 4 +- 6 files changed, 225 insertions(+), 56 deletions(-) create mode 100644 benchs/raytracer/blocking_queue.ml create mode 100644 benchs/raytracer/blocking_queue.mli diff --git a/.gitignore b/.gitignore index 76301f0c..c68bc642 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ _build _opam +*.ppm diff --git a/benchs/raytracer/blocking_queue.ml b/benchs/raytracer/blocking_queue.ml new file mode 100644 index 00000000..a8d1a8ac --- /dev/null +++ b/benchs/raytracer/blocking_queue.ml @@ -0,0 +1,54 @@ +type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; + mutable closed: bool; +} + +exception Closed + +let create () : _ t = + { + mutex = Mutex.create (); + cond = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Condition.broadcast self.cond (* awake waiters so they fail *) + ); + Mutex.unlock self.mutex + +let push (self : _ t) x : unit = + Mutex.lock self.mutex; + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else ( + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + ) + +let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + (* check closed *) + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + x + ) + in + loop () diff --git a/benchs/raytracer/blocking_queue.mli b/benchs/raytracer/blocking_queue.mli new file mode 100644 index 00000000..70ec10ea --- /dev/null +++ b/benchs/raytracer/blocking_queue.mli @@ -0,0 +1,20 @@ +(* vendored here *) + +type 'a t + +val create : unit -> _ t + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + +val pop : 'a t -> 'a +(** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. + Note that calls to [pop] on a closed queue that still contains elements + will succeed (until all elements are drained). *) + +val close : _ t -> unit +(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/benchs/raytracer/raytracer.ml b/benchs/raytracer/raytracer.ml index a7d91974..caf4232b 100644 --- a/benchs/raytracer/raytracer.ml +++ b/benchs/raytracer/raytracer.ml @@ -1,6 +1,6 @@ open Vec3 open Ray -open Moonpool +module Fut = Moonpool.Fut let ( let@ ) = ( @@ ) let fpf = Printf.fprintf @@ -36,19 +36,21 @@ type scatter = { type hit = hit_rec option +module RS = Random.State + (* Produce a random point inside the unit sphere. Works by picking a random point in the unit cube, rejecting if not inside the sphere. *) -let rec random_in_unit_sphere () = +let rec random_in_unit_sphere (rst : RS.t) = let p = Vec3.sub (Vec3.mul 2.0 - (Vec3.of_floats (Random.float 1.0, Random.float 1.0, Random.float 1.0))) + (Vec3.of_floats (RS.float rst 1.0, RS.float rst 1.0, RS.float rst 1.0))) (Vec3.of_floats (1., 1., 1.)) in if Vec3.dot p p >= 1.0 then p else - random_in_unit_sphere () + random_in_unit_sphere rst let reflect v n = Vec3.sub v (Vec3.mul (2. *. Vec3.dot v n) n) @@ -66,12 +68,12 @@ let refract v n ni_over_nt = ) else None -let hit_scatter r_in hit_rec = +let hit_scatter (rst : RS.t) r_in hit_rec : scatter = match hit_rec.mat with (* reflect in random direction *) | Some (Lambertian albedo) -> let target = - Vec3.add (Vec3.add hit_rec.p hit_rec.normal) (random_in_unit_sphere ()) + Vec3.add (Vec3.add hit_rec.p hit_rec.normal) (random_in_unit_sphere rst) in let scatter = { @@ -86,7 +88,7 @@ let hit_scatter r_in hit_rec = let reflected = reflect (Vec3.unit_vector r_in.dir) hit_rec.normal in let scattered_ray = Ray.create hit_rec.p - (Vec3.add reflected (Vec3.mul fuzz (random_in_unit_sphere ()))) + (Vec3.add reflected (Vec3.mul fuzz (random_in_unit_sphere rst))) in let scattered = { @@ -176,12 +178,12 @@ and hit h ray (tmin, tmax) : hit = | Sphere s -> hit_sphere s ray (tmin, tmax) | World w -> hit_world w ray (tmin, tmax) -let rec get_color world ray depth : vec3 = +let rec get_color (rst : RS.t) world ray depth : vec3 = match hit world ray (0., Float.infinity) with | Some hit_result -> if depth < 50 then ( - let s = hit_scatter ray hit_result in - Vec3.pmul s.color (get_color world s.ray (depth + 1)) + let s = hit_scatter rst ray hit_result in + Vec3.pmul s.color (get_color rst world s.ray (depth + 1)) ) else Vec3.of_floats (0., 0., 0.) | None -> @@ -232,66 +234,152 @@ let mk_world () = type config = { nx: int; ny: int; - ns: int; - j: int; + ns: int; (** samples per pixel *) + j: int; (** Pool size *) out: string; + progress: bool; } +type queue_item = + | Pixel of (int * int * int) Fut.t + | Unblock_next_line of unit Fut.promise + +type state = { + start: float; + active: bool Atomic.t; + n_done: int Atomic.t; + n_waiting: int Atomic.t; + n_lines: int Atomic.t; + results: queue_item Blocking_queue.t; +} + +let reset_line_ansi = "\x1b[2K\r" + +let progress_thread (st : state) : Thread.t = + let run () = + while Atomic.get st.active do + let elapsed = Unix.gettimeofday () -. st.start in + pf "%s[%.3fs] %d done, %d waiting%!" reset_line_ansi elapsed + (Atomic.get st.n_done) (Atomic.get st.n_waiting); + Thread.delay 0.1 + done + in + Moonpool.start_thread_on_some_domain run () + +(** background thread that writes the results sequentially into the file *) +let writer_thread (st : state) oc : Thread.t = + let run () : unit = + try + while true do + let r = Blocking_queue.pop st.results in + match r with + | Pixel r -> + Atomic.incr st.n_done; + Atomic.decr st.n_waiting; + + let ir, ig, ib = Fut.wait_block_exn r in + fpf oc "%d " ir; + fpf oc "%d " ig; + fpf oc "%d \n" ib + | Unblock_next_line prom -> + Atomic.incr st.n_lines; + Fut.fulfill prom (Ok ()) + done + with Blocking_queue.Closed -> (* we are done *) Atomic.set st.active false + in + + Moonpool.start_thread_on_some_domain run () + let run (config : config) = - Random.self_init (); - let world = mk_world () in - let nx = 400 in - let ny = 200 in - let ns = 150 in - (* samples per pixel *) + let rst = Random.State.make_self_init () in + let pool = Moonpool.Pool.create ~min:config.j () in + let oc = open_out config.out in let@ () = Fun.protect ~finally:(fun () -> flush oc; close_out oc) in + + let world = mk_world () in fpf oc "P3\n"; - fpf oc "%d\n" nx; - fpf oc "%d\n" ny; + fpf oc "%d\n" config.nx; + fpf oc "%d\n" config.ny; fpf oc "\n255\n"; let lower_left_corner = Vec3.of_floats (-2., -1., -1.) in let horizontal = Vec3.of_floats (4., 0., 0.) in let vertical = Vec3.of_floats (0., 2., 0.) in let origin = Vec3.of_floats (0., 0., 0.) in - for j = ny downto 1 do - for i = 0 to nx - 1 do - let color = ref { x = 0.; y = 0.; z = 0. } in - for _step = 0 to ns - 1 do - (* NOTE: Random.float is bounds __inclusive__ *) - let u = (Float.of_int i +. Random.float 1.0) /. Float.of_int nx in - let v = (Float.of_int j +. Random.float 1.0) /. Float.of_int ny in + let st = + { + active = Atomic.make true; + start = Unix.gettimeofday (); + n_done = Atomic.make 0; + n_waiting = Atomic.make 0; + n_lines = Atomic.make 0; + results = Blocking_queue.create (); + } + in - let r = - { - origin; - dir = - Vec3.add lower_left_corner - (Vec3.add (Vec3.mul u horizontal) (Vec3.mul v vertical)); - } + let t_writer = writer_thread st oc in + if config.progress then ignore (progress_thread st : Thread.t); + + for j = config.ny downto 1 do + for i = 0 to config.nx - 1 do + (* get our own random generator *) + let rst = RS.split rst in + + let run () = + let color = ref { x = 0.; y = 0.; z = 0. } in + for _step = 0 to config.ns - 1 do + (* NOTE: Random.float is bounds __inclusive__ *) + let u = + (Float.of_int i +. RS.float rst 1.0) /. Float.of_int config.nx + in + let v = + (Float.of_int j +. RS.float rst 1.0) /. Float.of_int config.ny + in + + let r = + { + origin; + dir = + Vec3.add lower_left_corner + (Vec3.add (Vec3.mul u horizontal) (Vec3.mul v vertical)); + } + in + color := Vec3.add !color (get_color rst world r 0) + done; + + color := Vec3.mul (1. /. Float.of_int config.ns) !color; + (* gamma correction *) + color := Vec3.of_floats (sqrt !color.x, sqrt !color.y, sqrt !color.z); + let { x = r; y = g; z = b } = !color in + + let ir, ig, ib = + ( Int.of_float (r *. 255.99), + Int.of_float (g *. 255.99), + Int.of_float (b *. 255.99) ) in - color := Vec3.add !color (get_color world r 0) - done; - color := Vec3.mul (1. /. Float.of_int ns) !color; - (* gamma correction *) - color := Vec3.of_floats (sqrt !color.x, sqrt !color.y, sqrt !color.z); - let { x = r; y = g; z = b } = !color in - let ir, ig, ib = - ( Int.of_float (r *. 255.99), - Int.of_float (g *. 255.99), - Int.of_float (b *. 255.99) ) + ir, ig, ib in - fpf oc "%d " ir; - fpf oc "%d " ig; - fpf oc "%d \n" ib - done - done + + let fut = Fut.spawn ~on:pool run in + Atomic.incr st.n_waiting; + Blocking_queue.push st.results (Pixel fut) + done; + + (* wait for all lines to be processed *) + let sync_line, prom = Fut.make () in + Blocking_queue.push st.results (Unblock_next_line prom); + Fut.wait_block_exn sync_line + done; + (* now close the queue *) + Blocking_queue.close st.results; + + Thread.join t_writer let () = let nx = ref 400 in @@ -313,11 +401,13 @@ let () = in Arg.parse opts ignore ""; - let config = { nx = !nx; ny = !ny; ns = !ns; out = !out; j = !j } in + let config = + { nx = !nx; ny = !ny; ns = !ns; out = !out; j = !j; progress = !progress } + in let t = Unix.gettimeofday () in run config; let elapsed = Unix.gettimeofday () -. t in - pf "done in %.4fs\n%!" elapsed; + pf "%sdone in %.4fs\n%!" reset_line_ansi elapsed; () diff --git a/src/bb_queue.ml b/src/bb_queue.ml index c3d157e4..c939d9bf 100644 --- a/src/bb_queue.ml +++ b/src/bb_queue.ml @@ -37,10 +37,12 @@ let push (self : _ t) x : unit = let pop (self : 'a t) : 'a = Mutex.lock self.mutex; let rec loop () = - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ) else if Queue.is_empty self.q then ( + if Queue.is_empty self.q then ( + (* check closed *) + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); Condition.wait self.cond self.mutex; (loop [@tailcall]) () ) else ( diff --git a/src/bb_queue.mli b/src/bb_queue.mli index 4dee92b6..7fc36a56 100644 --- a/src/bb_queue.mli +++ b/src/bb_queue.mli @@ -12,7 +12,9 @@ val push : 'a t -> 'a -> unit val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) + @raise Closed if the queue was closed before a new element was available. + Note that calls to [pop] on a closed queue that still contains elements + will succeed (until all elements are drained). *) val try_pop : force_lock:bool -> 'a t -> 'a option (** [try_pop q] immediately pops the first element of [q], if any,