mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
re-simplify raytracer bench
This commit is contained in:
parent
3858d64bc2
commit
40a586e368
3 changed files with 19 additions and 107 deletions
|
|
@ -1,54 +0,0 @@
|
|||
type 'a t = {
|
||||
mutex: Mutex.t;
|
||||
cond: Condition.t;
|
||||
q: 'a Queue.t;
|
||||
mutable closed: bool;
|
||||
}
|
||||
|
||||
exception Closed
|
||||
|
||||
let create () : _ t =
|
||||
{
|
||||
mutex = Mutex.create ();
|
||||
cond = Condition.create ();
|
||||
q = Queue.create ();
|
||||
closed = false;
|
||||
}
|
||||
|
||||
let close (self : _ t) =
|
||||
Mutex.lock self.mutex;
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
Condition.broadcast self.cond (* awake waiters so they fail *)
|
||||
);
|
||||
Mutex.unlock self.mutex
|
||||
|
||||
let push (self : _ t) x : unit =
|
||||
Mutex.lock self.mutex;
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
) else (
|
||||
Queue.push x self.q;
|
||||
Condition.signal self.cond;
|
||||
Mutex.unlock self.mutex
|
||||
)
|
||||
|
||||
let pop (self : 'a t) : 'a =
|
||||
Mutex.lock self.mutex;
|
||||
let rec loop () =
|
||||
if Queue.is_empty self.q then (
|
||||
(* check closed *)
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
Condition.wait self.cond self.mutex;
|
||||
(loop [@tailcall]) ()
|
||||
) else (
|
||||
let x = Queue.pop self.q in
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
)
|
||||
in
|
||||
loop ()
|
||||
|
|
@ -1,20 +0,0 @@
|
|||
(* vendored here *)
|
||||
|
||||
type 'a t
|
||||
|
||||
val create : unit -> _ t
|
||||
|
||||
exception Closed
|
||||
|
||||
val push : 'a t -> 'a -> unit
|
||||
(** [push q x] pushes [x] into [q], and returns [()].
|
||||
@raise Closed if [close q] was previously called.*)
|
||||
|
||||
val pop : 'a t -> 'a
|
||||
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
||||
@raise Closed if the queue was closed before a new element was available.
|
||||
Note that calls to [pop] on a closed queue that still contains elements
|
||||
will succeed (until all elements are drained). *)
|
||||
|
||||
val close : _ t -> unit
|
||||
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
||||
|
|
@ -240,9 +240,7 @@ type config = {
|
|||
progress: bool;
|
||||
}
|
||||
|
||||
type queue_item =
|
||||
| Pixel of (int * int * int) Fut.t
|
||||
| Unblock_next_line of unit Fut.promise
|
||||
type queue_item = Pixel of (int * int * int) Fut.t
|
||||
|
||||
type state = {
|
||||
config: config;
|
||||
|
|
@ -250,8 +248,7 @@ type state = {
|
|||
active: bool Atomic.t;
|
||||
n_done: int Atomic.t;
|
||||
n_waiting: int Atomic.t;
|
||||
n_lines: int Atomic.t;
|
||||
results: queue_item Blocking_queue.t;
|
||||
results: queue_item Queue.t;
|
||||
}
|
||||
|
||||
let reset_line_ansi = "\x1b[2K\r"
|
||||
|
|
@ -271,9 +268,8 @@ let progress_thread (st : state) : Thread.t =
|
|||
(** background thread that writes the results sequentially into the file *)
|
||||
let writer_thread (st : state) oc : Thread.t =
|
||||
let run () : unit =
|
||||
try
|
||||
while true do
|
||||
let r = Blocking_queue.pop st.results in
|
||||
while not (Queue.is_empty st.results) do
|
||||
let r = Queue.pop st.results in
|
||||
match r with
|
||||
| Pixel r ->
|
||||
Atomic.incr st.n_done;
|
||||
|
|
@ -283,11 +279,8 @@ let writer_thread (st : state) oc : Thread.t =
|
|||
fpf oc "%d " ir;
|
||||
fpf oc "%d " ig;
|
||||
fpf oc "%d \n" ib
|
||||
| Unblock_next_line prom ->
|
||||
Atomic.incr st.n_lines;
|
||||
Fut.fulfill prom (Ok ())
|
||||
done
|
||||
with Blocking_queue.Closed -> (* we are done *) Atomic.set st.active false
|
||||
done;
|
||||
(* we are done *) Atomic.set st.active false
|
||||
in
|
||||
|
||||
Moonpool.start_thread_on_some_domain run ()
|
||||
|
|
@ -320,12 +313,10 @@ let run (config : config) =
|
|||
start = Unix.gettimeofday ();
|
||||
n_done = Atomic.make 0;
|
||||
n_waiting = Atomic.make 0;
|
||||
n_lines = Atomic.make 0;
|
||||
results = Blocking_queue.create ();
|
||||
results = Queue.create ();
|
||||
}
|
||||
in
|
||||
|
||||
let t_writer = writer_thread st oc in
|
||||
if config.progress then ignore (progress_thread st : Thread.t);
|
||||
|
||||
for j = config.ny downto 1 do
|
||||
|
|
@ -371,16 +362,11 @@ let run (config : config) =
|
|||
|
||||
let fut = Fut.spawn ~on:pool run in
|
||||
Atomic.incr st.n_waiting;
|
||||
Blocking_queue.push st.results (Pixel fut)
|
||||
Queue.push (Pixel fut) st.results
|
||||
done
|
||||
done;
|
||||
|
||||
(* wait for all lines to be processed *)
|
||||
let sync_line, prom = Fut.make () in
|
||||
Blocking_queue.push st.results (Unblock_next_line prom);
|
||||
Fut.wait_block_exn sync_line
|
||||
done;
|
||||
(* now close the queue *)
|
||||
Blocking_queue.close st.results;
|
||||
let t_writer = writer_thread st oc in
|
||||
|
||||
Thread.join t_writer
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue