trace-tef: add Mpsc_queue, adapted from picos; use it in trace_tef

This commit is contained in:
Simon Cruanes 2023-10-30 22:25:05 -04:00
parent 73ead3e369
commit c82fb362e8
6 changed files with 111 additions and 55 deletions

View file

@ -1,8 +1,9 @@
type 'a t = { type 'a t = {
mutex: Mutex.t; mutex: Mutex.t;
cond: Condition.t; cond: Condition.t;
q: 'a Queue.t; q: 'a Mpsc_queue.t;
mutable closed: bool; mutable closed: bool;
mutable consumer_waiting: bool;
} }
exception Closed exception Closed
@ -11,8 +12,9 @@ let create () : _ t =
{ {
mutex = Mutex.create (); mutex = Mutex.create ();
cond = Condition.create (); cond = Condition.create ();
q = Queue.create (); q = Mpsc_queue.create ();
closed = false; closed = false;
consumer_waiting = false;
} }
let close (self : _ t) = let close (self : _ t) =
@ -24,50 +26,36 @@ let close (self : _ t) =
Mutex.unlock self.mutex Mutex.unlock self.mutex
let push (self : _ t) x : unit = let push (self : _ t) x : unit =
Mutex.lock self.mutex; if self.closed then raise Closed;
if self.closed then ( Mpsc_queue.enqueue self.q x;
Mutex.unlock self.mutex; if self.closed then raise Closed;
raise Closed if self.consumer_waiting then (
) else ( (* wakeup consumer *)
let was_empty = Queue.is_empty self.q in Mutex.lock self.mutex;
Queue.push x self.q; Condition.broadcast self.cond;
if was_empty then Condition.broadcast self.cond;
Mutex.unlock self.mutex Mutex.unlock self.mutex
) )
let pop (self : 'a t) : 'a = let rec pop (self : 'a t) : 'a =
Mutex.lock self.mutex; match Mpsc_queue.dequeue self.q with
let rec loop () = | x -> x
if Queue.is_empty self.q then ( | exception Mpsc_queue.Empty ->
if self.closed then ( if self.closed then raise Closed;
Mutex.unlock self.mutex; Mutex.lock self.mutex;
raise Closed self.consumer_waiting <- true;
); Condition.wait self.cond self.mutex;
Condition.wait self.cond self.mutex; self.consumer_waiting <- false;
(loop [@tailcall]) () Mutex.unlock self.mutex;
) else ( pop self
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
let transfer (self : 'a t) q2 : unit = let rec pop_all (self : 'a t) : 'a list =
Mutex.lock self.mutex; match Mpsc_queue.dequeue_all self.q with
while | l -> l
if Queue.is_empty self.q then ( | exception Mpsc_queue.Empty ->
if self.closed then ( if self.closed then raise Closed;
Mutex.unlock self.mutex; Mutex.lock self.mutex;
raise Closed self.consumer_waiting <- true;
); Condition.wait self.cond self.mutex;
Condition.wait self.cond self.mutex; self.consumer_waiting <- false;
true Mutex.unlock self.mutex;
) else ( pop_all self
Queue.transfer self.q q2;
Mutex.unlock self.mutex;
false
)
do
()
done

View file

@ -14,7 +14,7 @@ val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes. (** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *) @raise Closed if the queue was closed before a new element was available. *)
val transfer : 'a t -> 'a Queue.t -> unit val pop_all : 'a t -> 'a list
(** [transfer bq q2] transfers all items presently (** [transfer bq q2] transfers all items presently
in [bq] into [q2], and clears [bq]. in [bq] into [q2], and clears [bq].
It blocks if no element is in [bq]. *) It blocks if no element is in [bq]. *)

View file

@ -3,4 +3,4 @@
(name trace_tef) (name trace_tef)
(public_name trace-tef) (public_name trace-tef)
(synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process")
(libraries trace.core mtime mtime.clock.os unix threads)) (libraries trace.core mtime mtime.clock.os atomic unix threads))

60
src/tef/mpsc_queue.ml Normal file
View file

@ -0,0 +1,60 @@
type 'a t = {
tail: 'a list Atomic.t;
head: 'a list ref;
}
let create () =
let tail = Atomic.make [] in
(* padding *)
ignore (Sys.opaque_identity (Array.make 16 ()));
let head = ref [] in
{ tail; head }
module Backoff = struct
type t = int
let default = 2
let once (b : t) : t =
let actual_b = b + Random.int 4 in
for _i = 1 to actual_b do
Domain.cpu_relax ()
done;
min (b * 2) 256
end
let rec enqueue backoff t x =
let before = Atomic.get t.tail in
let after = x :: before in
if not (Atomic.compare_and_set t.tail before after) then
enqueue (Backoff.once backoff) t x
let enqueue t x = enqueue Backoff.default t x
exception Empty
let dequeue t =
match !(t.head) with
| x :: xs ->
t.head := xs;
x
| [] ->
(match Atomic.exchange t.tail [] with
| [] -> raise_notrace Empty
| [ x ] -> x
| x :: xs ->
(match List.rev_append [ x ] xs with
| x :: xs ->
t.head := xs;
x
| [] -> assert false))
let dequeue_all t : _ list =
match !(t.head) with
| _ :: _ as l ->
t.head := [];
l
| [] ->
(match Atomic.exchange t.tail [] with
| [] -> raise_notrace Empty
| l -> List.rev l)

14
src/tef/mpsc_queue.mli Normal file
View file

@ -0,0 +1,14 @@
(** A multi-producer, single-consumer queue (from picos) *)
type !'a t
val create : unit -> 'a t
val enqueue : 'a t -> 'a -> unit
exception Empty
val dequeue : 'a t -> 'a
(** @raise Empty if empty *)
val dequeue_all : 'a t -> 'a list
(** @raise Empty if empty *)

View file

@ -273,7 +273,6 @@ let bg_thread ~out (events : event B_queue.t) : unit =
Writer.with_ ~out @@ fun writer -> Writer.with_ ~out @@ fun writer ->
(* local state, to keep track of span information and implicit stack context *) (* local state, to keep track of span information and implicit stack context *)
let spans : span_info Span_tbl.t = Span_tbl.create 32 in let spans : span_info Span_tbl.t = Span_tbl.create 32 in
let local_q = Queue.create () in
(* add function name, if provided, to the metadata *) (* add function name, if provided, to the metadata *)
let add_fun_name_ fun_name data : _ list = let add_fun_name_ fun_name data : _ list =
@ -319,15 +318,10 @@ let bg_thread ~out (events : event B_queue.t) : unit =
try try
while true do while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let ev = Queue.pop local_q in
handle_ev ev
done;
(* get all the events in the incoming blocking queue, in (* get all the events in the incoming blocking queue, in
one single critical section. *) one single critical section. *)
B_queue.transfer events local_q let local = B_queue.pop_all events in
List.iter handle_ev local
done done
with B_queue.Closed -> with B_queue.Closed ->
(* warn if app didn't close all spans *) (* warn if app didn't close all spans *)