refactor tef: move event in a side file

This commit is contained in:
Simon Cruanes 2024-08-16 15:45:24 -04:00
parent 6383fcfff9
commit f8c1d2d972
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 102 additions and 67 deletions

54
src/tef/event.ml Normal file
View file

@ -0,0 +1,54 @@
open Trace_core
type t =
| E_tick
| E_message of {
tid: int;
msg: string;
time_us: float;
data: (string * user_data) list;
}
| E_define_span of {
tid: int;
name: string;
time_us: float;
id: span;
fun_name: string option;
data: (string * user_data) list;
}
| E_exit_span of {
id: span;
time_us: float;
}
| E_add_data of {
id: span;
data: (string * user_data) list;
}
| E_enter_manual_span of {
tid: int;
name: string;
time_us: float;
id: int;
flavor: [ `Sync | `Async ] option;
fun_name: string option;
data: (string * user_data) list;
}
| E_exit_manual_span of {
tid: int;
name: string;
time_us: float;
flavor: [ `Sync | `Async ] option;
data: (string * user_data) list;
id: int;
}
| E_counter of {
name: string;
tid: int;
time_us: float;
n: float;
}
| E_name_process of { name: string }
| E_name_thread of {
tid: int;
name: string;
}

View file

@ -1,7 +1,10 @@
open Trace_core
open Trace_private_util
open Event
module A = Trace_core.Internal_.Atomic_
let ( let@ ) = ( @@ )
module Mock_ = struct
let enabled = ref false
let now = ref 0
@ -25,59 +28,6 @@ let[@inline] now_us () : float =
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
type event =
| E_tick
| E_message of {
tid: int;
msg: string;
time_us: float;
data: (string * user_data) list;
}
| E_define_span of {
tid: int;
name: string;
time_us: float;
id: span;
fun_name: string option;
data: (string * user_data) list;
}
| E_exit_span of {
id: span;
time_us: float;
}
| E_add_data of {
id: span;
data: (string * user_data) list;
}
| E_enter_manual_span of {
tid: int;
name: string;
time_us: float;
id: int;
flavor: [ `Sync | `Async ] option;
fun_name: string option;
data: (string * user_data) list;
}
| E_exit_manual_span of {
tid: int;
name: string;
time_us: float;
flavor: [ `Sync | `Async ] option;
data: (string * user_data) list;
id: int;
}
| E_counter of {
name: string;
tid: int;
time_us: float;
n: float;
}
| E_name_process of { name: string }
| E_name_thread of {
tid: int;
name: string;
}
module Span_tbl = Hashtbl.Make (struct
include Int64
@ -104,7 +54,8 @@ let key_data : (string * user_data) list ref Meta_map.key =
module Writer = struct
type t = {
oc: out_channel;
mutable first: bool; (** first event? *)
jsonl: bool; (** JSONL mode, one json event per line *)
mutable first: bool; (** first event? useful in json mode *)
buf: Buffer.t; (** Buffer to write into *)
must_close: bool; (** Do we have to close the underlying channel [oc]? *)
pid: int;
@ -112,12 +63,15 @@ module Writer = struct
(** A writer to a [out_channel]. It writes JSON entries in an array
and closes the array at the end. *)
let create ~out () : t =
let create ~(mode : [ `Single | `Jsonl ]) ~out () : t =
let jsonl = mode = `Jsonl in
let oc, must_close =
match out with
| `Stdout -> stdout, false
| `Stderr -> stderr, false
| `File path -> open_out path, true
| `File_append path ->
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
in
let pid =
if !Mock_.enabled then
@ -125,16 +79,19 @@ module Writer = struct
else
Unix.getpid ()
in
output_char oc '[';
{ oc; first = true; pid; must_close; buf = Buffer.create 2_048 }
if not jsonl then output_char oc '[';
{ oc; jsonl; first = true; pid; must_close; buf = Buffer.create 2_048 }
let close (self : t) : unit =
output_char self.oc ']';
if self.jsonl then
output_char self.oc '\n'
else
output_char self.oc ']';
flush self.oc;
if self.must_close then close_out self.oc
let with_ ~out f =
let writer = create ~out () in
let with_ ~mode ~out f =
let writer = create ~mode ~out () in
Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer)
let[@inline] flush (self : t) : unit = flush self.oc
@ -142,7 +99,9 @@ module Writer = struct
(** Emit "," if we need, and get the buffer ready *)
let emit_sep_and_start_ (self : t) =
Buffer.reset self.buf;
if self.first then
if self.jsonl then
Buffer.add_char self.buf '\n'
else if self.first then
self.first <- false
else
Buffer.add_string self.buf ",\n"
@ -263,9 +222,9 @@ end
(** Background thread, takes events from the queue, puts them
in context using local state, and writes fully resolved
TEF events to [out]. *)
let bg_thread ~out (events : event B_queue.t) : unit =
let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit =
(* open a writer to [out] *)
Writer.with_ ~out @@ fun writer ->
Writer.with_ ~mode ~out @@ fun writer ->
(* local state, to keep track of span information and implicit stack context *)
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
@ -277,7 +236,7 @@ let bg_thread ~out (events : event B_queue.t) : unit =
in
(* how to deal with an event *)
let handle_ev (ev : event) : unit =
let handle_ev (ev : Event.t) : unit =
match ev with
| E_tick -> Writer.flush writer
| E_message { tid; msg; time_us; data } ->
@ -346,7 +305,8 @@ type output =
| `File of string
]
let collector ~out () : collector =
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
: collector =
let module M = struct
let active = A.make true
@ -354,10 +314,15 @@ let collector ~out () : collector =
let span_id_gen_ = A.make 0
(* queue of messages to write *)
let events : event B_queue.t = B_queue.create ()
let events : Event.t B_queue.t = B_queue.create ()
(** writer thread. It receives events and writes them to [oc]. *)
let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) ()
let t_write : Thread.t =
Thread.create
(fun () ->
let@ () = Fun.protect ~finally in
bg_thread ~mode ~out events)
()
(** ticker thread, regularly sends a message to the writer thread.
no need to join it. *)
@ -473,6 +438,9 @@ let collector ~out () : collector =
end in
(module M)
let[@inline] collector ~out () : collector =
collector_ ~finally:ignore ~mode:`Single ~out ()
let setup ?(out = `Env) () =
match out with
| `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr ()
@ -498,4 +466,9 @@ let with_setup ?out () f =
module Internal_ = struct
let mock_all_ () = Mock_.enabled := true
let on_tracing_error = on_tracing_error
let collector_jsonl ~finally ~out () : collector =
collector_ ~finally ~mode:`Jsonl ~out ()
module Event = Event
end

View file

@ -44,6 +44,14 @@ module Internal_ : sig
(** use fake, deterministic timestamps, TID, PID *)
val on_tracing_error : (string -> unit) ref
val collector_jsonl :
finally:(unit -> unit) ->
out:[ `File_append of string ] ->
unit ->
Trace_core.collector
module Event = Event
end
(**/**)