From c82fb362e82dac39a558b9ed2d0c46a2f775a112 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 30 Oct 2023 22:25:05 -0400 Subject: [PATCH] trace-tef: add Mpsc_queue, adapted from picos; use it in trace_tef --- src/tef/b_queue.ml | 78 ++++++++++++++++++------------------------ src/tef/b_queue.mli | 2 +- src/tef/dune | 2 +- src/tef/mpsc_queue.ml | 60 ++++++++++++++++++++++++++++++++ src/tef/mpsc_queue.mli | 14 ++++++++ src/tef/trace_tef.ml | 10 ++---- 6 files changed, 111 insertions(+), 55 deletions(-) create mode 100644 src/tef/mpsc_queue.ml create mode 100644 src/tef/mpsc_queue.mli diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml index c2daa83..96324e7 100644 --- a/src/tef/b_queue.ml +++ b/src/tef/b_queue.ml @@ -1,8 +1,9 @@ type 'a t = { mutex: Mutex.t; cond: Condition.t; - q: 'a Queue.t; + q: 'a Mpsc_queue.t; mutable closed: bool; + mutable consumer_waiting: bool; } exception Closed @@ -11,8 +12,9 @@ let create () : _ t = { mutex = Mutex.create (); cond = Condition.create (); - q = Queue.create (); + q = Mpsc_queue.create (); closed = false; + consumer_waiting = false; } let close (self : _ t) = @@ -24,50 +26,36 @@ let close (self : _ t) = Mutex.unlock self.mutex let push (self : _ t) x : unit = - Mutex.lock self.mutex; - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ) else ( - let was_empty = Queue.is_empty self.q in - Queue.push x self.q; - if was_empty then Condition.broadcast self.cond; + if self.closed then raise Closed; + Mpsc_queue.enqueue self.q x; + if self.closed then raise Closed; + if self.consumer_waiting then ( + (* wakeup consumer *) + Mutex.lock self.mutex; + Condition.broadcast self.cond; Mutex.unlock self.mutex ) -let pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - let rec loop () = - if Queue.is_empty self.q then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - Condition.wait self.cond self.mutex; - (loop [@tailcall]) () - ) else ( - let x = Queue.pop self.q in - Mutex.unlock self.mutex; - x - ) - in - loop () +let rec pop (self : 'a t) : 'a = + match Mpsc_queue.dequeue self.q with + | x -> x + | exception Mpsc_queue.Empty -> + if self.closed then raise Closed; + Mutex.lock self.mutex; + self.consumer_waiting <- true; + Condition.wait self.cond self.mutex; + self.consumer_waiting <- false; + Mutex.unlock self.mutex; + pop self -let transfer (self : 'a t) q2 : unit = - Mutex.lock self.mutex; - while - if Queue.is_empty self.q then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - Condition.wait self.cond self.mutex; - true - ) else ( - Queue.transfer self.q q2; - Mutex.unlock self.mutex; - false - ) - do - () - done +let rec pop_all (self : 'a t) : 'a list = + match Mpsc_queue.dequeue_all self.q with + | l -> l + | exception Mpsc_queue.Empty -> + if self.closed then raise Closed; + Mutex.lock self.mutex; + self.consumer_waiting <- true; + Condition.wait self.cond self.mutex; + self.consumer_waiting <- false; + Mutex.unlock self.mutex; + pop_all self diff --git a/src/tef/b_queue.mli b/src/tef/b_queue.mli index b0125b8..1db779f 100644 --- a/src/tef/b_queue.mli +++ b/src/tef/b_queue.mli @@ -14,7 +14,7 @@ val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. @raise Closed if the queue was closed before a new element was available. *) -val transfer : 'a t -> 'a Queue.t -> unit +val pop_all : 'a t -> 'a list (** [transfer bq q2] transfers all items presently in [bq] into [q2], and clears [bq]. It blocks if no element is in [bq]. *) diff --git a/src/tef/dune b/src/tef/dune index 7fd3a4a..4e12b80 100644 --- a/src/tef/dune +++ b/src/tef/dune @@ -3,4 +3,4 @@ (name trace_tef) (public_name trace-tef) (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") - (libraries trace.core mtime mtime.clock.os unix threads)) + (libraries trace.core mtime mtime.clock.os atomic unix threads)) diff --git a/src/tef/mpsc_queue.ml b/src/tef/mpsc_queue.ml new file mode 100644 index 0000000..378fa71 --- /dev/null +++ b/src/tef/mpsc_queue.ml @@ -0,0 +1,60 @@ +type 'a t = { + tail: 'a list Atomic.t; + head: 'a list ref; +} + +let create () = + let tail = Atomic.make [] in + (* padding *) + ignore (Sys.opaque_identity (Array.make 16 ())); + let head = ref [] in + { tail; head } + +module Backoff = struct + type t = int + + let default = 2 + + let once (b : t) : t = + let actual_b = b + Random.int 4 in + for _i = 1 to actual_b do + Domain.cpu_relax () + done; + min (b * 2) 256 +end + +let rec enqueue backoff t x = + let before = Atomic.get t.tail in + let after = x :: before in + if not (Atomic.compare_and_set t.tail before after) then + enqueue (Backoff.once backoff) t x + +let enqueue t x = enqueue Backoff.default t x + +exception Empty + +let dequeue t = + match !(t.head) with + | x :: xs -> + t.head := xs; + x + | [] -> + (match Atomic.exchange t.tail [] with + | [] -> raise_notrace Empty + | [ x ] -> x + | x :: xs -> + (match List.rev_append [ x ] xs with + | x :: xs -> + t.head := xs; + x + | [] -> assert false)) + +let dequeue_all t : _ list = + match !(t.head) with + | _ :: _ as l -> + t.head := []; + l + | [] -> + (match Atomic.exchange t.tail [] with + | [] -> raise_notrace Empty + | l -> List.rev l) diff --git a/src/tef/mpsc_queue.mli b/src/tef/mpsc_queue.mli new file mode 100644 index 0000000..a14444b --- /dev/null +++ b/src/tef/mpsc_queue.mli @@ -0,0 +1,14 @@ +(** A multi-producer, single-consumer queue (from picos) *) + +type !'a t + +val create : unit -> 'a t +val enqueue : 'a t -> 'a -> unit + +exception Empty + +val dequeue : 'a t -> 'a +(** @raise Empty if empty *) + +val dequeue_all : 'a t -> 'a list +(** @raise Empty if empty *) diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index 159e3fb..ce6ff33 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -273,7 +273,6 @@ let bg_thread ~out (events : event B_queue.t) : unit = Writer.with_ ~out @@ fun writer -> (* local state, to keep track of span information and implicit stack context *) let spans : span_info Span_tbl.t = Span_tbl.create 32 in - let local_q = Queue.create () in (* add function name, if provided, to the metadata *) let add_fun_name_ fun_name data : _ list = @@ -319,15 +318,10 @@ let bg_thread ~out (events : event B_queue.t) : unit = try while true do - (* work on local events, already on this thread *) - while not (Queue.is_empty local_q) do - let ev = Queue.pop local_q in - handle_ev ev - done; - (* get all the events in the incoming blocking queue, in one single critical section. *) - B_queue.transfer events local_q + let local = B_queue.pop_all events in + List.iter handle_ev local done with B_queue.Closed -> (* warn if app didn't close all spans *)