diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index 9131613..3a15ec6 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -33,6 +33,7 @@ let protect ~finally f = Printexc.raise_with_backtrace exn bt type event = + | E_tick | E_message of { tid: int; msg: string; @@ -104,6 +105,8 @@ module Writer = struct flush self.oc; if self.must_close then close_out self.oc + let[@inline] flush (self : t) : unit = flush self.oc + let emit_sep_ (self : t) = if self.first then self.first <- false @@ -206,6 +209,7 @@ let bg_thread ~out (events : event B_queue.t) : unit = (* how to deal with an event *) let handle_ev (ev : event) : unit = match ev with + | E_tick -> Writer.flush writer | E_message { tid; msg; time_us; data } -> Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer | E_define_span { tid; name; id; time_us; data } -> @@ -243,6 +247,14 @@ let bg_thread ~out (events : event B_queue.t) : unit = (Span_tbl.length spans); () +let tick_thread events : unit = + try + while true do + Thread.delay 0.5; + B_queue.push events E_tick + done + with B_queue.Closed -> () + type output = [ `Stdout | `Stderr @@ -262,6 +274,9 @@ let collector ~out () : collector = (* writer thread. It receives events and writes them to [oc]. *) let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) () + (* ticker thread, regularly sends a message to the writer thread *) + let t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () + let shutdown () = if A.exchange active false then ( B_queue.close events;