From 40a586e36879ced9f59a707b0bdd8187308b983b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 8 Jun 2023 16:46:06 -0400 Subject: [PATCH] re-simplify raytracer bench --- benchs/raytracer/blocking_queue.ml | 54 ----------------------------- benchs/raytracer/blocking_queue.mli | 20 ----------- benchs/raytracer/raytracer.ml | 52 ++++++++++----------------- 3 files changed, 19 insertions(+), 107 deletions(-) delete mode 100644 benchs/raytracer/blocking_queue.ml delete mode 100644 benchs/raytracer/blocking_queue.mli diff --git a/benchs/raytracer/blocking_queue.ml b/benchs/raytracer/blocking_queue.ml deleted file mode 100644 index a8d1a8ac..00000000 --- a/benchs/raytracer/blocking_queue.ml +++ /dev/null @@ -1,54 +0,0 @@ -type 'a t = { - mutex: Mutex.t; - cond: Condition.t; - q: 'a Queue.t; - mutable closed: bool; -} - -exception Closed - -let create () : _ t = - { - mutex = Mutex.create (); - cond = Condition.create (); - q = Queue.create (); - closed = false; - } - -let close (self : _ t) = - Mutex.lock self.mutex; - if not self.closed then ( - self.closed <- true; - Condition.broadcast self.cond (* awake waiters so they fail *) - ); - Mutex.unlock self.mutex - -let push (self : _ t) x : unit = - Mutex.lock self.mutex; - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ) else ( - Queue.push x self.q; - Condition.signal self.cond; - Mutex.unlock self.mutex - ) - -let pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - let rec loop () = - if Queue.is_empty self.q then ( - (* check closed *) - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - Condition.wait self.cond self.mutex; - (loop [@tailcall]) () - ) else ( - let x = Queue.pop self.q in - Mutex.unlock self.mutex; - x - ) - in - loop () diff --git a/benchs/raytracer/blocking_queue.mli b/benchs/raytracer/blocking_queue.mli deleted file mode 100644 index 70ec10ea..00000000 --- a/benchs/raytracer/blocking_queue.mli +++ /dev/null @@ -1,20 +0,0 @@ -(* vendored here *) - -type 'a t - -val create : unit -> _ t - -exception Closed - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] into [q], and returns [()]. - @raise Closed if [close q] was previously called.*) - -val pop : 'a t -> 'a -(** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. - Note that calls to [pop] on a closed queue that still contains elements - will succeed (until all elements are drained). *) - -val close : _ t -> unit -(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/benchs/raytracer/raytracer.ml b/benchs/raytracer/raytracer.ml index 13c4cb4f..249d3413 100644 --- a/benchs/raytracer/raytracer.ml +++ b/benchs/raytracer/raytracer.ml @@ -240,9 +240,7 @@ type config = { progress: bool; } -type queue_item = - | Pixel of (int * int * int) Fut.t - | Unblock_next_line of unit Fut.promise +type queue_item = Pixel of (int * int * int) Fut.t type state = { config: config; @@ -250,8 +248,7 @@ type state = { active: bool Atomic.t; n_done: int Atomic.t; n_waiting: int Atomic.t; - n_lines: int Atomic.t; - results: queue_item Blocking_queue.t; + results: queue_item Queue.t; } let reset_line_ansi = "\x1b[2K\r" @@ -271,23 +268,19 @@ let progress_thread (st : state) : Thread.t = (** background thread that writes the results sequentially into the file *) let writer_thread (st : state) oc : Thread.t = let run () : unit = - try - while true do - let r = Blocking_queue.pop st.results in - match r with - | Pixel r -> - Atomic.incr st.n_done; - Atomic.decr st.n_waiting; + while not (Queue.is_empty st.results) do + let r = Queue.pop st.results in + match r with + | Pixel r -> + Atomic.incr st.n_done; + Atomic.decr st.n_waiting; - let ir, ig, ib = Fut.wait_block_exn r in - fpf oc "%d " ir; - fpf oc "%d " ig; - fpf oc "%d \n" ib - | Unblock_next_line prom -> - Atomic.incr st.n_lines; - Fut.fulfill prom (Ok ()) - done - with Blocking_queue.Closed -> (* we are done *) Atomic.set st.active false + let ir, ig, ib = Fut.wait_block_exn r in + fpf oc "%d " ir; + fpf oc "%d " ig; + fpf oc "%d \n" ib + done; + (* we are done *) Atomic.set st.active false in Moonpool.start_thread_on_some_domain run () @@ -320,12 +313,10 @@ let run (config : config) = start = Unix.gettimeofday (); n_done = Atomic.make 0; n_waiting = Atomic.make 0; - n_lines = Atomic.make 0; - results = Blocking_queue.create (); + results = Queue.create (); } in - let t_writer = writer_thread st oc in if config.progress then ignore (progress_thread st : Thread.t); for j = config.ny downto 1 do @@ -371,16 +362,11 @@ let run (config : config) = let fut = Fut.spawn ~on:pool run in Atomic.incr st.n_waiting; - Blocking_queue.push st.results (Pixel fut) - done; - - (* wait for all lines to be processed *) - let sync_line, prom = Fut.make () in - Blocking_queue.push st.results (Unblock_next_line prom); - Fut.wait_block_exn sync_line + Queue.push (Pixel fut) st.results + done done; - (* now close the queue *) - Blocking_queue.close st.results; + + let t_writer = writer_thread st oc in Thread.join t_writer