diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml index ce9add5..f7abac4 100644 --- a/src/tef/b_queue.ml +++ b/src/tef/b_queue.ml @@ -51,3 +51,22 @@ let pop (self : 'a t) : 'a = ) in loop () + +let transfer (self : 'a t) q2 : unit = + Mutex.lock self.mutex; + while + if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex; + true + ) else ( + Queue.transfer self.q q2; + Mutex.unlock self.mutex; + false + ) + do + () + done diff --git a/src/tef/b_queue.mli b/src/tef/b_queue.mli index e833c92..b0125b8 100644 --- a/src/tef/b_queue.mli +++ b/src/tef/b_queue.mli @@ -14,5 +14,10 @@ val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. @raise Closed if the queue was closed before a new element was available. *) +val transfer : 'a t -> 'a Queue.t -> unit +(** [transfer bq q2] transfers all items presently + in [bq] into [q2], and clears [bq]. + It blocks if no element is in [bq]. *) + val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index 83c92eb..16181e8 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -155,7 +155,7 @@ module Writer = struct let ts = start in emit_sep_ self; Printf.fprintf self.oc - {json|{"pid": %d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} + {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} self.pid tid dur ts str_val name (emit_args_o_ pp_user_data_) args; @@ -164,7 +164,7 @@ module Writer = struct let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = emit_sep_ self; Printf.fprintf self.oc - {json|{"pid": %d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} + {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} self.pid tid ts str_val name (emit_args_o_ pp_user_data_) args; @@ -173,7 +173,7 @@ module Writer = struct let emit_name_thread ~tid ~name (self : t) : unit = emit_sep_ self; Printf.fprintf self.oc - {json|{"pid": %d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid + {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid tid (emit_args_o_ pp_user_data_) [ "name", `String name ]; @@ -182,7 +182,7 @@ module Writer = struct let emit_name_process ~name (self : t) : unit = emit_sep_ self; Printf.fprintf self.oc - {json|{"pid": %d,"name":"process_name","ph":"M"%a}|json} self.pid + {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid (emit_args_o_ pp_user_data_) [ "name", `String name ]; () @@ -201,6 +201,7 @@ let bg_thread ~out (events : event B_queue.t) : unit = let writer = Writer.create ~out () in protect ~finally:(fun () -> Writer.close writer) @@ fun () -> let spans : span_info Span_tbl.t = Span_tbl.create 32 in + let local_q = Queue.create () in (* how to deal with an event *) let handle_ev (ev : event) : unit = @@ -225,8 +226,15 @@ let bg_thread ~out (events : event B_queue.t) : unit = try while true do - let ev = B_queue.pop events in - handle_ev ev + (* work on local events, already on this thread *) + while not (Queue.is_empty local_q) do + let ev = Queue.pop local_q in + handle_ev ev + done; + + (* get all the events in the incoming blocking queue, in + one single critical section. *) + B_queue.transfer events local_q done with B_queue.Closed -> (* warn if app didn't close all spans *)