diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index e7ba602..0d45ea0 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -99,13 +99,16 @@ let key_async_id : int Meta_map.Key.t = Meta_map.Key.create () let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t = Meta_map.Key.create () +(** Writer: knows how to write entries to a file in TEF format *) module Writer = struct type t = { oc: out_channel; mutable first: bool; (** first event? *) - must_close: bool; + must_close: bool; (** Do we have to close the underlying channel [oc]? *) pid: int; } + (** A writer to a [out_channel]. It writes JSON entries in an array + and closes the array at the end. *) let create ~out () : t = let oc, must_close = @@ -128,6 +131,10 @@ module Writer = struct flush self.oc; if self.must_close then close_out self.oc + let with_ ~out f = + let writer = create ~out () in + protect ~finally:(fun () -> close writer) (fun () -> f writer) + let[@inline] flush (self : t) : unit = flush self.oc let emit_sep_ (self : t) = @@ -246,9 +253,13 @@ module Writer = struct () end +(** Background thread, takes events from the queue, puts them + in context using local state, and writes fully resolved + TEF events to [out]. *) let bg_thread ~out (events : event B_queue.t) : unit = - let writer = Writer.create ~out () in - protect ~finally:(fun () -> Writer.close writer) @@ fun () -> + (* open a writer to [out] *) + Writer.with_ ~out @@ fun writer -> + (* local state, to keep track of span information and implicit stack context *) let spans : span_info Span_tbl.t = Span_tbl.create 32 in let local_q = Queue.create () in @@ -307,6 +318,8 @@ let bg_thread ~out (events : event B_queue.t) : unit = (Span_tbl.length spans); () +(** Thread that simply regularly "ticks", sending events to + the background thread so it has a chance to write to the file *) let tick_thread events : unit = try while true do @@ -341,6 +354,8 @@ let collector ~out () : collector = let shutdown () = if A.exchange active false then ( B_queue.close events; + (* wait for writer thread to be done. The writer thread will exit + after processing remaining events because the queue is now closed *) Thread.join t_write )