raytracer: add some parallelism

This commit is contained in:
Simon Cruanes 2023-06-08 16:19:23 -04:00
parent 0ace7726f4
commit bc7c6b253e
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 225 additions and 56 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
_build
_opam
*.ppm

View file

@ -0,0 +1,54 @@
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else (
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
)
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
(* check closed *)
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()

View file

@ -0,0 +1,20 @@
(* vendored here *)
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available.
Note that calls to [pop] on a closed queue that still contains elements
will succeed (until all elements are drained). *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)

View file

@ -1,6 +1,6 @@
open Vec3
open Ray
open Moonpool
module Fut = Moonpool.Fut
let ( let@ ) = ( @@ )
let fpf = Printf.fprintf
@ -36,19 +36,21 @@ type scatter = {
type hit = hit_rec option
module RS = Random.State
(* Produce a random point inside the unit sphere. Works by picking a
random point in the unit cube, rejecting if not inside the sphere. *)
let rec random_in_unit_sphere () =
let rec random_in_unit_sphere (rst : RS.t) =
let p =
Vec3.sub
(Vec3.mul 2.0
(Vec3.of_floats (Random.float 1.0, Random.float 1.0, Random.float 1.0)))
(Vec3.of_floats (RS.float rst 1.0, RS.float rst 1.0, RS.float rst 1.0)))
(Vec3.of_floats (1., 1., 1.))
in
if Vec3.dot p p >= 1.0 then
p
else
random_in_unit_sphere ()
random_in_unit_sphere rst
let reflect v n = Vec3.sub v (Vec3.mul (2. *. Vec3.dot v n) n)
@ -66,12 +68,12 @@ let refract v n ni_over_nt =
) else
None
let hit_scatter r_in hit_rec =
let hit_scatter (rst : RS.t) r_in hit_rec : scatter =
match hit_rec.mat with
(* reflect in random direction *)
| Some (Lambertian albedo) ->
let target =
Vec3.add (Vec3.add hit_rec.p hit_rec.normal) (random_in_unit_sphere ())
Vec3.add (Vec3.add hit_rec.p hit_rec.normal) (random_in_unit_sphere rst)
in
let scatter =
{
@ -86,7 +88,7 @@ let hit_scatter r_in hit_rec =
let reflected = reflect (Vec3.unit_vector r_in.dir) hit_rec.normal in
let scattered_ray =
Ray.create hit_rec.p
(Vec3.add reflected (Vec3.mul fuzz (random_in_unit_sphere ())))
(Vec3.add reflected (Vec3.mul fuzz (random_in_unit_sphere rst)))
in
let scattered =
{
@ -176,12 +178,12 @@ and hit h ray (tmin, tmax) : hit =
| Sphere s -> hit_sphere s ray (tmin, tmax)
| World w -> hit_world w ray (tmin, tmax)
let rec get_color world ray depth : vec3 =
let rec get_color (rst : RS.t) world ray depth : vec3 =
match hit world ray (0., Float.infinity) with
| Some hit_result ->
if depth < 50 then (
let s = hit_scatter ray hit_result in
Vec3.pmul s.color (get_color world s.ray (depth + 1))
let s = hit_scatter rst ray hit_result in
Vec3.pmul s.color (get_color rst world s.ray (depth + 1))
) else
Vec3.of_floats (0., 0., 0.)
| None ->
@ -232,66 +234,152 @@ let mk_world () =
type config = {
nx: int;
ny: int;
ns: int;
j: int;
ns: int; (** samples per pixel *)
j: int; (** Pool size *)
out: string;
progress: bool;
}
type queue_item =
| Pixel of (int * int * int) Fut.t
| Unblock_next_line of unit Fut.promise
type state = {
start: float;
active: bool Atomic.t;
n_done: int Atomic.t;
n_waiting: int Atomic.t;
n_lines: int Atomic.t;
results: queue_item Blocking_queue.t;
}
let reset_line_ansi = "\x1b[2K\r"
let progress_thread (st : state) : Thread.t =
let run () =
while Atomic.get st.active do
let elapsed = Unix.gettimeofday () -. st.start in
pf "%s[%.3fs] %d done, %d waiting%!" reset_line_ansi elapsed
(Atomic.get st.n_done) (Atomic.get st.n_waiting);
Thread.delay 0.1
done
in
Moonpool.start_thread_on_some_domain run ()
(** background thread that writes the results sequentially into the file *)
let writer_thread (st : state) oc : Thread.t =
let run () : unit =
try
while true do
let r = Blocking_queue.pop st.results in
match r with
| Pixel r ->
Atomic.incr st.n_done;
Atomic.decr st.n_waiting;
let ir, ig, ib = Fut.wait_block_exn r in
fpf oc "%d " ir;
fpf oc "%d " ig;
fpf oc "%d \n" ib
| Unblock_next_line prom ->
Atomic.incr st.n_lines;
Fut.fulfill prom (Ok ())
done
with Blocking_queue.Closed -> (* we are done *) Atomic.set st.active false
in
Moonpool.start_thread_on_some_domain run ()
let run (config : config) =
Random.self_init ();
let world = mk_world () in
let nx = 400 in
let ny = 200 in
let ns = 150 in
(* samples per pixel *)
let rst = Random.State.make_self_init () in
let pool = Moonpool.Pool.create ~min:config.j () in
let oc = open_out config.out in
let@ () =
Fun.protect ~finally:(fun () ->
flush oc;
close_out oc)
in
let world = mk_world () in
fpf oc "P3\n";
fpf oc "%d\n" nx;
fpf oc "%d\n" ny;
fpf oc "%d\n" config.nx;
fpf oc "%d\n" config.ny;
fpf oc "\n255\n";
let lower_left_corner = Vec3.of_floats (-2., -1., -1.) in
let horizontal = Vec3.of_floats (4., 0., 0.) in
let vertical = Vec3.of_floats (0., 2., 0.) in
let origin = Vec3.of_floats (0., 0., 0.) in
for j = ny downto 1 do
for i = 0 to nx - 1 do
let color = ref { x = 0.; y = 0.; z = 0. } in
for _step = 0 to ns - 1 do
(* NOTE: Random.float is bounds __inclusive__ *)
let u = (Float.of_int i +. Random.float 1.0) /. Float.of_int nx in
let v = (Float.of_int j +. Random.float 1.0) /. Float.of_int ny in
let st =
{
active = Atomic.make true;
start = Unix.gettimeofday ();
n_done = Atomic.make 0;
n_waiting = Atomic.make 0;
n_lines = Atomic.make 0;
results = Blocking_queue.create ();
}
in
let r =
{
origin;
dir =
Vec3.add lower_left_corner
(Vec3.add (Vec3.mul u horizontal) (Vec3.mul v vertical));
}
let t_writer = writer_thread st oc in
if config.progress then ignore (progress_thread st : Thread.t);
for j = config.ny downto 1 do
for i = 0 to config.nx - 1 do
(* get our own random generator *)
let rst = RS.split rst in
let run () =
let color = ref { x = 0.; y = 0.; z = 0. } in
for _step = 0 to config.ns - 1 do
(* NOTE: Random.float is bounds __inclusive__ *)
let u =
(Float.of_int i +. RS.float rst 1.0) /. Float.of_int config.nx
in
let v =
(Float.of_int j +. RS.float rst 1.0) /. Float.of_int config.ny
in
let r =
{
origin;
dir =
Vec3.add lower_left_corner
(Vec3.add (Vec3.mul u horizontal) (Vec3.mul v vertical));
}
in
color := Vec3.add !color (get_color rst world r 0)
done;
color := Vec3.mul (1. /. Float.of_int config.ns) !color;
(* gamma correction *)
color := Vec3.of_floats (sqrt !color.x, sqrt !color.y, sqrt !color.z);
let { x = r; y = g; z = b } = !color in
let ir, ig, ib =
( Int.of_float (r *. 255.99),
Int.of_float (g *. 255.99),
Int.of_float (b *. 255.99) )
in
color := Vec3.add !color (get_color world r 0)
done;
color := Vec3.mul (1. /. Float.of_int ns) !color;
(* gamma correction *)
color := Vec3.of_floats (sqrt !color.x, sqrt !color.y, sqrt !color.z);
let { x = r; y = g; z = b } = !color in
let ir, ig, ib =
( Int.of_float (r *. 255.99),
Int.of_float (g *. 255.99),
Int.of_float (b *. 255.99) )
ir, ig, ib
in
fpf oc "%d " ir;
fpf oc "%d " ig;
fpf oc "%d \n" ib
done
done
let fut = Fut.spawn ~on:pool run in
Atomic.incr st.n_waiting;
Blocking_queue.push st.results (Pixel fut)
done;
(* wait for all lines to be processed *)
let sync_line, prom = Fut.make () in
Blocking_queue.push st.results (Unblock_next_line prom);
Fut.wait_block_exn sync_line
done;
(* now close the queue *)
Blocking_queue.close st.results;
Thread.join t_writer
let () =
let nx = ref 400 in
@ -313,11 +401,13 @@ let () =
in
Arg.parse opts ignore "";
let config = { nx = !nx; ny = !ny; ns = !ns; out = !out; j = !j } in
let config =
{ nx = !nx; ny = !ny; ns = !ns; out = !out; j = !j; progress = !progress }
in
let t = Unix.gettimeofday () in
run config;
let elapsed = Unix.gettimeofday () -. t in
pf "done in %.4fs\n%!" elapsed;
pf "%sdone in %.4fs\n%!" reset_line_ansi elapsed;
()

View file

@ -37,10 +37,12 @@ let push (self : _ t) x : unit =
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else if Queue.is_empty self.q then (
if Queue.is_empty self.q then (
(* check closed *)
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (

View file

@ -12,7 +12,9 @@ val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
@raise Closed if the queue was closed before a new element was available.
Note that calls to [pop] on a closed queue that still contains elements
will succeed (until all elements are drained). *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] immediately pops the first element of [q], if any,