feat trace-tef: add a ticker thread to ensure we flush the file regularly

This commit is contained in:
Simon Cruanes 2023-06-23 14:56:05 -04:00
parent ddc9cce5af
commit 345dd4a163
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -33,6 +33,7 @@ let protect ~finally f =
Printexc.raise_with_backtrace exn bt
type event =
| E_tick
| E_message of {
tid: int;
msg: string;
@ -104,6 +105,8 @@ module Writer = struct
flush self.oc;
if self.must_close then close_out self.oc
let[@inline] flush (self : t) : unit = flush self.oc
let emit_sep_ (self : t) =
if self.first then
self.first <- false
@ -206,6 +209,7 @@ let bg_thread ~out (events : event B_queue.t) : unit =
(* how to deal with an event *)
let handle_ev (ev : event) : unit =
match ev with
| E_tick -> Writer.flush writer
| E_message { tid; msg; time_us; data } ->
Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer
| E_define_span { tid; name; id; time_us; data } ->
@ -243,6 +247,14 @@ let bg_thread ~out (events : event B_queue.t) : unit =
(Span_tbl.length spans);
()
let tick_thread events : unit =
try
while true do
Thread.delay 0.5;
B_queue.push events E_tick
done
with B_queue.Closed -> ()
type output =
[ `Stdout
| `Stderr
@ -262,6 +274,9 @@ let collector ~out () : collector =
(* writer thread. It receives events and writes them to [oc]. *)
let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) ()
(* ticker thread, regularly sends a message to the writer thread *)
let t_tick : Thread.t = Thread.create (fun () -> tick_thread events) ()
let shutdown () =
if A.exchange active false then (
B_queue.close events;