diff --git a/src/tef/event.ml b/src/tef/event.ml new file mode 100644 index 0000000..17d8fff --- /dev/null +++ b/src/tef/event.ml @@ -0,0 +1,54 @@ +open Trace_core + +type t = + | E_tick + | E_message of { + tid: int; + msg: string; + time_us: float; + data: (string * user_data) list; + } + | E_define_span of { + tid: int; + name: string; + time_us: float; + id: span; + fun_name: string option; + data: (string * user_data) list; + } + | E_exit_span of { + id: span; + time_us: float; + } + | E_add_data of { + id: span; + data: (string * user_data) list; + } + | E_enter_manual_span of { + tid: int; + name: string; + time_us: float; + id: int; + flavor: [ `Sync | `Async ] option; + fun_name: string option; + data: (string * user_data) list; + } + | E_exit_manual_span of { + tid: int; + name: string; + time_us: float; + flavor: [ `Sync | `Async ] option; + data: (string * user_data) list; + id: int; + } + | E_counter of { + name: string; + tid: int; + time_us: float; + n: float; + } + | E_name_process of { name: string } + | E_name_thread of { + tid: int; + name: string; + } diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index f4ffddd..03824ba 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -1,7 +1,10 @@ open Trace_core open Trace_private_util +open Event module A = Trace_core.Internal_.Atomic_ +let ( let@ ) = ( @@ ) + module Mock_ = struct let enabled = ref false let now = ref 0 @@ -25,59 +28,6 @@ let[@inline] now_us () : float = let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) -type event = - | E_tick - | E_message of { - tid: int; - msg: string; - time_us: float; - data: (string * user_data) list; - } - | E_define_span of { - tid: int; - name: string; - time_us: float; - id: span; - fun_name: string option; - data: (string * user_data) list; - } - | E_exit_span of { - id: span; - time_us: float; - } - | E_add_data of { - id: span; - data: (string * user_data) list; - } - | E_enter_manual_span of { - tid: int; - name: string; - time_us: float; - id: int; - flavor: [ `Sync | `Async ] option; - fun_name: string option; - data: (string * user_data) list; - } - | E_exit_manual_span of { - tid: int; - name: string; - time_us: float; - flavor: [ `Sync | `Async ] option; - data: (string * user_data) list; - id: int; - } - | E_counter of { - name: string; - tid: int; - time_us: float; - n: float; - } - | E_name_process of { name: string } - | E_name_thread of { - tid: int; - name: string; - } - module Span_tbl = Hashtbl.Make (struct include Int64 @@ -104,7 +54,8 @@ let key_data : (string * user_data) list ref Meta_map.key = module Writer = struct type t = { oc: out_channel; - mutable first: bool; (** first event? *) + jsonl: bool; (** JSONL mode, one json event per line *) + mutable first: bool; (** first event? useful in json mode *) buf: Buffer.t; (** Buffer to write into *) must_close: bool; (** Do we have to close the underlying channel [oc]? *) pid: int; @@ -112,12 +63,15 @@ module Writer = struct (** A writer to a [out_channel]. It writes JSON entries in an array and closes the array at the end. *) - let create ~out () : t = + let create ~(mode : [ `Single | `Jsonl ]) ~out () : t = + let jsonl = mode = `Jsonl in let oc, must_close = match out with | `Stdout -> stdout, false | `Stderr -> stderr, false | `File path -> open_out path, true + | `File_append path -> + open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true in let pid = if !Mock_.enabled then @@ -125,16 +79,19 @@ module Writer = struct else Unix.getpid () in - output_char oc '['; - { oc; first = true; pid; must_close; buf = Buffer.create 2_048 } + if not jsonl then output_char oc '['; + { oc; jsonl; first = true; pid; must_close; buf = Buffer.create 2_048 } let close (self : t) : unit = - output_char self.oc ']'; + if self.jsonl then + output_char self.oc '\n' + else + output_char self.oc ']'; flush self.oc; if self.must_close then close_out self.oc - let with_ ~out f = - let writer = create ~out () in + let with_ ~mode ~out f = + let writer = create ~mode ~out () in Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) let[@inline] flush (self : t) : unit = flush self.oc @@ -142,7 +99,9 @@ module Writer = struct (** Emit "," if we need, and get the buffer ready *) let emit_sep_and_start_ (self : t) = Buffer.reset self.buf; - if self.first then + if self.jsonl then + Buffer.add_char self.buf '\n' + else if self.first then self.first <- false else Buffer.add_string self.buf ",\n" @@ -263,9 +222,9 @@ end (** Background thread, takes events from the queue, puts them in context using local state, and writes fully resolved TEF events to [out]. *) -let bg_thread ~out (events : event B_queue.t) : unit = +let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit = (* open a writer to [out] *) - Writer.with_ ~out @@ fun writer -> + Writer.with_ ~mode ~out @@ fun writer -> (* local state, to keep track of span information and implicit stack context *) let spans : span_info Span_tbl.t = Span_tbl.create 32 in @@ -277,7 +236,7 @@ let bg_thread ~out (events : event B_queue.t) : unit = in (* how to deal with an event *) - let handle_ev (ev : event) : unit = + let handle_ev (ev : Event.t) : unit = match ev with | E_tick -> Writer.flush writer | E_message { tid; msg; time_us; data } -> @@ -346,7 +305,8 @@ type output = | `File of string ] -let collector ~out () : collector = +let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out () + : collector = let module M = struct let active = A.make true @@ -354,10 +314,15 @@ let collector ~out () : collector = let span_id_gen_ = A.make 0 (* queue of messages to write *) - let events : event B_queue.t = B_queue.create () + let events : Event.t B_queue.t = B_queue.create () (** writer thread. It receives events and writes them to [oc]. *) - let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) () + let t_write : Thread.t = + Thread.create + (fun () -> + let@ () = Fun.protect ~finally in + bg_thread ~mode ~out events) + () (** ticker thread, regularly sends a message to the writer thread. no need to join it. *) @@ -473,6 +438,9 @@ let collector ~out () : collector = end in (module M) +let[@inline] collector ~out () : collector = + collector_ ~finally:ignore ~mode:`Single ~out () + let setup ?(out = `Env) () = match out with | `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr () @@ -498,4 +466,9 @@ let with_setup ?out () f = module Internal_ = struct let mock_all_ () = Mock_.enabled := true let on_tracing_error = on_tracing_error + + let collector_jsonl ~finally ~out () : collector = + collector_ ~finally ~mode:`Jsonl ~out () + + module Event = Event end diff --git a/src/tef/trace_tef.mli b/src/tef/trace_tef.mli index 3aaf060..03866f0 100644 --- a/src/tef/trace_tef.mli +++ b/src/tef/trace_tef.mli @@ -44,6 +44,14 @@ module Internal_ : sig (** use fake, deterministic timestamps, TID, PID *) val on_tracing_error : (string -> unit) ref + + val collector_jsonl : + finally:(unit -> unit) -> + out:[ `File_append of string ] -> + unit -> + Trace_core.collector + + module Event = Event end (**/**)