tef: docs and small refactor

This commit is contained in:
Simon Cruanes 2023-08-30 08:31:28 -04:00
parent b467678040
commit 20e2bf2f87
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -99,13 +99,16 @@ let key_async_id : int Meta_map.Key.t = Meta_map.Key.create ()
let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t = let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t =
Meta_map.Key.create () Meta_map.Key.create ()
(** Writer: knows how to write entries to a file in TEF format *)
module Writer = struct module Writer = struct
type t = { type t = {
oc: out_channel; oc: out_channel;
mutable first: bool; (** first event? *) mutable first: bool; (** first event? *)
must_close: bool; must_close: bool; (** Do we have to close the underlying channel [oc]? *)
pid: int; pid: int;
} }
(** A writer to a [out_channel]. It writes JSON entries in an array
and closes the array at the end. *)
let create ~out () : t = let create ~out () : t =
let oc, must_close = let oc, must_close =
@ -128,6 +131,10 @@ module Writer = struct
flush self.oc; flush self.oc;
if self.must_close then close_out self.oc if self.must_close then close_out self.oc
let with_ ~out f =
let writer = create ~out () in
protect ~finally:(fun () -> close writer) (fun () -> f writer)
let[@inline] flush (self : t) : unit = flush self.oc let[@inline] flush (self : t) : unit = flush self.oc
let emit_sep_ (self : t) = let emit_sep_ (self : t) =
@ -246,9 +253,13 @@ module Writer = struct
() ()
end end
(** Background thread, takes events from the queue, puts them
in context using local state, and writes fully resolved
TEF events to [out]. *)
let bg_thread ~out (events : event B_queue.t) : unit = let bg_thread ~out (events : event B_queue.t) : unit =
let writer = Writer.create ~out () in (* open a writer to [out] *)
protect ~finally:(fun () -> Writer.close writer) @@ fun () -> Writer.with_ ~out @@ fun writer ->
(* local state, to keep track of span information and implicit stack context *)
let spans : span_info Span_tbl.t = Span_tbl.create 32 in let spans : span_info Span_tbl.t = Span_tbl.create 32 in
let local_q = Queue.create () in let local_q = Queue.create () in
@ -307,6 +318,8 @@ let bg_thread ~out (events : event B_queue.t) : unit =
(Span_tbl.length spans); (Span_tbl.length spans);
() ()
(** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file *)
let tick_thread events : unit = let tick_thread events : unit =
try try
while true do while true do
@ -341,6 +354,8 @@ let collector ~out () : collector =
let shutdown () = let shutdown () =
if A.exchange active false then ( if A.exchange active false then (
B_queue.close events; B_queue.close events;
(* wait for writer thread to be done. The writer thread will exit
after processing remaining events because the queue is now closed *)
Thread.join t_write Thread.join t_write
) )