mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-08 03:47:57 -04:00
refactor TEF: split into exporter,writer,subscriber
code is a lot cleaner now.
This commit is contained in:
parent
4454975a61
commit
81096e0d3c
10 changed files with 501 additions and 438 deletions
4
src/tef/common_.ml
Normal file
4
src/tef/common_.ml
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
module Sub = Trace_subscriber
|
||||
module A = Trace_core.Internal_.Atomic_
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
0
src/tef/emit_tef.ml
Normal file
0
src/tef/emit_tef.ml
Normal file
|
|
@ -1,56 +0,0 @@
|
|||
open Trace_core
|
||||
module Sub = Trace_subscriber
|
||||
|
||||
(** An event, specialized for TEF *)
|
||||
type t =
|
||||
| E_tick
|
||||
| E_message of {
|
||||
tid: int;
|
||||
msg: string;
|
||||
time_us: float;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_define_span of {
|
||||
tid: int;
|
||||
name: string;
|
||||
time_us: float;
|
||||
id: span;
|
||||
fun_name: string option;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_exit_span of {
|
||||
id: span;
|
||||
time_us: float;
|
||||
}
|
||||
| E_add_data of {
|
||||
id: span;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_enter_manual_span of {
|
||||
tid: int;
|
||||
name: string;
|
||||
time_us: float;
|
||||
id: trace_id;
|
||||
flavor: Sub.flavor option;
|
||||
fun_name: string option;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_exit_manual_span of {
|
||||
tid: int;
|
||||
name: string;
|
||||
time_us: float;
|
||||
flavor: Sub.flavor option;
|
||||
data: (string * Sub.user_data) list;
|
||||
id: trace_id;
|
||||
}
|
||||
| E_counter of {
|
||||
name: string;
|
||||
tid: int;
|
||||
time_us: float;
|
||||
n: float;
|
||||
}
|
||||
| E_name_process of { name: string }
|
||||
| E_name_thread of {
|
||||
tid: int;
|
||||
name: string;
|
||||
}
|
||||
85
src/tef/exporter.ml
Normal file
85
src/tef/exporter.ml
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
(** An exporter, takes JSON objects and writes them somewhere *)
|
||||
|
||||
open Common_
|
||||
|
||||
type t = {
|
||||
on_json: Buffer.t -> unit;
|
||||
(** Takes a buffer and writes it somewhere. The buffer is only valid
|
||||
during this call and must not be stored. *)
|
||||
flush: unit -> unit; (** Force write *)
|
||||
close: unit -> unit; (** Close underlying resources *)
|
||||
}
|
||||
(** An exporter, takes JSON objects and writes them somewhere.
|
||||
|
||||
This should be thread-safe if used in a threaded environment. *)
|
||||
|
||||
open struct
|
||||
let with_lock lock f =
|
||||
Mutex.lock lock;
|
||||
try
|
||||
let res = f () in
|
||||
Mutex.unlock lock;
|
||||
res
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Mutex.unlock lock;
|
||||
Printexc.raise_with_backtrace e bt
|
||||
end
|
||||
|
||||
(** Export to the channel
|
||||
@param jsonl
|
||||
if true, export as a JSON object per line, otherwise export as a single
|
||||
big JSON array.
|
||||
@param close_channel if true, closing the exporter will close the channel *)
|
||||
let of_out_channel ~close_channel ~jsonl oc : t =
|
||||
let lock = Mutex.create () in
|
||||
let first = ref true in
|
||||
let closed = ref false in
|
||||
let flush () =
|
||||
let@ () = with_lock lock in
|
||||
flush oc
|
||||
in
|
||||
let close () =
|
||||
let@ () = with_lock lock in
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
if not jsonl then output_char oc ']';
|
||||
if close_channel then close_out_noerr oc
|
||||
)
|
||||
in
|
||||
let on_json buf =
|
||||
let@ () = with_lock lock in
|
||||
if not jsonl then
|
||||
if !first then (
|
||||
if not jsonl then output_char oc '[';
|
||||
first := false
|
||||
) else
|
||||
output_string oc ",\n";
|
||||
Buffer.output_buffer oc buf;
|
||||
if jsonl then output_char oc '\n'
|
||||
in
|
||||
{ flush; close; on_json }
|
||||
|
||||
let of_buffer ~jsonl (buf : Buffer.t) : t =
|
||||
let lock = Mutex.create () in
|
||||
let first = ref true in
|
||||
let closed = ref false in
|
||||
let close () =
|
||||
let@ () = with_lock lock in
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
if not jsonl then Buffer.add_char buf ']'
|
||||
)
|
||||
in
|
||||
let on_json json =
|
||||
let@ () = with_lock lock in
|
||||
if not jsonl then
|
||||
if !first then (
|
||||
if not jsonl then Buffer.add_char buf '[';
|
||||
first := false
|
||||
) else
|
||||
Buffer.add_string buf ",\n";
|
||||
Buffer.add_buffer buf json;
|
||||
if jsonl then Buffer.add_char buf '\n'
|
||||
in
|
||||
{ flush = ignore; close; on_json }
|
||||
173
src/tef/subscriber.ml
Normal file
173
src/tef/subscriber.ml
Normal file
|
|
@ -0,0 +1,173 @@
|
|||
open Common_
|
||||
open Trace_core
|
||||
open Trace_private_util
|
||||
module Span_tbl = Sub.Span_tbl
|
||||
|
||||
module Buf_pool = struct
|
||||
type t = Buffer.t Rpool.t
|
||||
|
||||
let create ?(max_size = 32) ?(buf_size = 256) () : t =
|
||||
Rpool.create ~max_size ~clear:Buffer.reset
|
||||
~create:(fun () -> Buffer.create buf_size)
|
||||
()
|
||||
end
|
||||
|
||||
open struct
|
||||
let[@inline] time_us_of_time_ns (t : int64) : float =
|
||||
Int64.div t 1_000L |> Int64.to_float
|
||||
|
||||
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
|
||||
if id == Trace_core.Collector.dummy_trace_id then
|
||||
0L
|
||||
else
|
||||
Bytes.get_int64_le (Bytes.unsafe_of_string id) 0
|
||||
end
|
||||
|
||||
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
|
||||
|
||||
type span_info = {
|
||||
tid: int;
|
||||
name: string;
|
||||
start_us: float;
|
||||
mutable data: (string * Sub.user_data) list;
|
||||
(* NOTE: thread safety: this is supposed to only be modified by the thread
|
||||
that's running this (synchronous, stack-abiding) span. *)
|
||||
}
|
||||
(** Information we store about a span begin event, to emit a complete event when
|
||||
we meet the corresponding span end event *)
|
||||
|
||||
type t = {
|
||||
active: bool A.t;
|
||||
pid: int;
|
||||
spans: span_info Span_tbl.t;
|
||||
buf_pool: Buf_pool.t;
|
||||
exporter: Exporter.t;
|
||||
}
|
||||
(** Subscriber state *)
|
||||
|
||||
open struct
|
||||
let print_non_closed_spans_warning spans =
|
||||
let module Str_set = Set.Make (String) in
|
||||
let spans = Span_tbl.to_list spans in
|
||||
if spans <> [] then (
|
||||
!on_tracing_error
|
||||
@@ Printf.sprintf "trace-tef: warning: %d spans were not closed\n"
|
||||
(List.length spans);
|
||||
let names =
|
||||
List.fold_left
|
||||
(fun set (_, span) -> Str_set.add span.name set)
|
||||
Str_set.empty spans
|
||||
in
|
||||
Str_set.iter
|
||||
(fun name ->
|
||||
!on_tracing_error @@ Printf.sprintf " span %S was not closed\n" name)
|
||||
names;
|
||||
flush stderr
|
||||
)
|
||||
end
|
||||
|
||||
let close (self : t) : unit =
|
||||
if A.exchange self.active false then (
|
||||
print_non_closed_spans_warning self.spans;
|
||||
self.exporter.close ()
|
||||
)
|
||||
|
||||
let[@inline] active self = A.get self.active
|
||||
let[@inline] flush (self : t) : unit = self.exporter.flush ()
|
||||
|
||||
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
||||
{ active = A.make true; exporter; buf_pool; pid; spans = Span_tbl.create () }
|
||||
|
||||
module Callbacks = struct
|
||||
type st = t
|
||||
|
||||
let on_init _ ~time_ns:_ = ()
|
||||
let on_shutdown (self : st) ~time_ns:_ = close self
|
||||
|
||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_name_process ~pid:self.pid ~name buf;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
|
||||
self.exporter.on_json buf
|
||||
|
||||
(* add function name, if provided, to the metadata *)
|
||||
let add_fun_name_ fun_name data : _ list =
|
||||
match fun_name with
|
||||
| None -> data
|
||||
| Some f -> ("function", Sub.U_string f) :: data
|
||||
|
||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
let data = add_fun_name_ fun_name data in
|
||||
let info = { tid; name; start_us = time_us; data } in
|
||||
(* save the span so we find it at exit *)
|
||||
Span_tbl.add self.spans span info
|
||||
|
||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
|
||||
match Span_tbl.find_exn self.spans span with
|
||||
| exception Not_found ->
|
||||
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
|
||||
| { tid; name; start_us; data } ->
|
||||
Span_tbl.remove self.spans span;
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_duration_event buf ~pid:self.pid ~tid ~name ~start:start_us
|
||||
~end_:time_us ~args:data;
|
||||
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_add_data (self : st) ~data span =
|
||||
if data <> [] then (
|
||||
try
|
||||
let info = Span_tbl.find_exn self.spans span in
|
||||
info.data <- List.rev_append data info.data
|
||||
with Not_found ->
|
||||
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
|
||||
)
|
||||
|
||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
|
||||
~args:data;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name n : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span :
|
||||
unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
|
||||
let data = add_fun_name_ fun_name data in
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_manual_begin buf ~pid:self.pid ~tid ~name
|
||||
~id:(int64_of_trace_id_ trace_id)
|
||||
~ts:time_us ~args:data ~flavor;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
|
||||
~trace_id (_ : span) : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_manual_end buf ~pid:self.pid ~tid ~name
|
||||
~id:(int64_of_trace_id_ trace_id)
|
||||
~ts:time_us ~flavor ~args:data;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
|
||||
end
|
||||
|
||||
let subscriber (self : t) : Sub.t =
|
||||
Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }
|
||||
28
src/tef/subscriber.mli
Normal file
28
src/tef/subscriber.mli
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
open Common_
|
||||
|
||||
module Buf_pool : sig
|
||||
type t
|
||||
|
||||
val create : ?max_size:int -> ?buf_size:int -> unit -> t
|
||||
end
|
||||
|
||||
type t
|
||||
(** Main subscriber state. *)
|
||||
|
||||
val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t
|
||||
(** Create a subscriber state. *)
|
||||
|
||||
val flush : t -> unit
|
||||
val close : t -> unit
|
||||
val active : t -> bool
|
||||
|
||||
module Callbacks : Sub.Callbacks.S with type st = t
|
||||
|
||||
val subscriber : t -> Sub.t
|
||||
(** Subscriber that writes json into this writer *)
|
||||
|
||||
(**/**)
|
||||
|
||||
val on_tracing_error : (string -> unit) ref
|
||||
|
||||
(**/**)
|
||||
|
|
@ -1,216 +1,7 @@
|
|||
open Trace_core
|
||||
open Trace_private_util
|
||||
open Event
|
||||
module Sub = Trace_subscriber
|
||||
module A = Trace_core.Internal_.Atomic_
|
||||
|
||||
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
|
||||
|
||||
let[@inline] time_us_of_time_ns (t : int64) : float =
|
||||
Int64.div t 1_000L |> Int64.to_float
|
||||
|
||||
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
|
||||
if id == Trace_core.Collector.dummy_trace_id then
|
||||
0L
|
||||
else
|
||||
Bytes.get_int64_le (Bytes.unsafe_of_string id) 0
|
||||
|
||||
module Mock_ = struct
|
||||
let now = ref 0
|
||||
|
||||
(* used to mock timing *)
|
||||
let get_now_ns () : int64 =
|
||||
let x = !now in
|
||||
incr now;
|
||||
Int64.(mul (of_int x) 1000L)
|
||||
|
||||
let get_tid_ () : int = 3
|
||||
end
|
||||
|
||||
module Span_tbl = Hashtbl.Make (struct
|
||||
include Int64
|
||||
|
||||
let hash : t -> int = Hashtbl.hash
|
||||
end)
|
||||
|
||||
type span_info = {
|
||||
tid: int;
|
||||
name: string;
|
||||
start_us: float;
|
||||
mutable data: (string * Sub.user_data) list;
|
||||
}
|
||||
|
||||
(** Writer: knows how to write entries to a file in TEF format *)
|
||||
module Writer = struct
|
||||
type t = {
|
||||
oc: out_channel;
|
||||
jsonl: bool; (** JSONL mode, one json event per line *)
|
||||
mutable first: bool; (** first event? useful in json mode *)
|
||||
buf: Buffer.t; (** Buffer to write into *)
|
||||
must_close: bool; (** Do we have to close the underlying channel [oc]? *)
|
||||
pid: int;
|
||||
}
|
||||
(** A writer to a [out_channel]. It writes JSON entries in an array and closes
|
||||
the array at the end. *)
|
||||
|
||||
let create ~(mode : [ `Single | `Jsonl ]) ~out () : t =
|
||||
let jsonl = mode = `Jsonl in
|
||||
let oc, must_close =
|
||||
match out with
|
||||
| `Stdout -> stdout, false
|
||||
| `Stderr -> stderr, false
|
||||
| `File path -> open_out path, true
|
||||
| `File_append path ->
|
||||
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
|
||||
| `Output oc -> oc, false
|
||||
in
|
||||
let pid =
|
||||
if !Sub.Private_.mock then
|
||||
2
|
||||
else
|
||||
Unix.getpid ()
|
||||
in
|
||||
if not jsonl then output_char oc '[';
|
||||
{ oc; jsonl; first = true; pid; must_close; buf = Buffer.create 2_048 }
|
||||
|
||||
let close (self : t) : unit =
|
||||
if self.jsonl then
|
||||
output_char self.oc '\n'
|
||||
else
|
||||
output_char self.oc ']';
|
||||
flush self.oc;
|
||||
if self.must_close then close_out self.oc
|
||||
|
||||
let with_ ~mode ~out f =
|
||||
let writer = create ~mode ~out () in
|
||||
Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer)
|
||||
|
||||
let[@inline] flush (self : t) : unit = flush self.oc
|
||||
|
||||
(** Emit "," if we need, and get the buffer ready *)
|
||||
let emit_sep_and_start_ (self : t) =
|
||||
Buffer.reset self.buf;
|
||||
if self.jsonl then
|
||||
Buffer.add_char self.buf '\n'
|
||||
else if self.first then
|
||||
self.first <- false
|
||||
else
|
||||
Buffer.add_string self.buf ",\n"
|
||||
|
||||
let char = Buffer.add_char
|
||||
let raw_string = Buffer.add_string
|
||||
|
||||
let str_val (buf : Buffer.t) (s : string) =
|
||||
char buf '"';
|
||||
let encode_char c =
|
||||
match c with
|
||||
| '"' -> raw_string buf {|\"|}
|
||||
| '\\' -> raw_string buf {|\\|}
|
||||
| '\n' -> raw_string buf {|\n|}
|
||||
| '\b' -> raw_string buf {|\b|}
|
||||
| '\r' -> raw_string buf {|\r|}
|
||||
| '\t' -> raw_string buf {|\t|}
|
||||
| _ when Char.code c <= 0x1f ->
|
||||
raw_string buf {|\u00|};
|
||||
Printf.bprintf buf "%02x" (Char.code c)
|
||||
| c -> char buf c
|
||||
in
|
||||
String.iter encode_char s;
|
||||
char buf '"'
|
||||
|
||||
let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function
|
||||
| U_none -> raw_string out "null"
|
||||
| U_int i -> Printf.bprintf out "%d" i
|
||||
| U_bool b -> Printf.bprintf out "%b" b
|
||||
| U_string s -> str_val out s
|
||||
| U_float f -> Printf.bprintf out "%g" f
|
||||
|
||||
(* emit args, if not empty. [ppv] is used to print values. *)
|
||||
let emit_args_o_ ppv (out : Buffer.t) args : unit =
|
||||
if args <> [] then (
|
||||
Printf.bprintf out {json|,"args": {|json};
|
||||
List.iteri
|
||||
(fun i (n, value) ->
|
||||
if i > 0 then raw_string out ",";
|
||||
Printf.bprintf out {json|"%s":%a|json} n ppv value)
|
||||
args;
|
||||
char out '}'
|
||||
)
|
||||
|
||||
let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit =
|
||||
let dur = end_ -. start in
|
||||
let ts = start in
|
||||
|
||||
emit_sep_and_start_ self;
|
||||
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json}
|
||||
self.pid tid dur ts str_val name
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_manual_begin ~tid ~name ~(id : trace_id) ~ts ~args
|
||||
~(flavor : Sub.flavor option) (self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||
self.pid (int64_of_trace_id_ id) tid ts str_val name
|
||||
(match flavor with
|
||||
| None | Some Async -> 'b'
|
||||
| Some Sync -> 'B')
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_manual_end ~tid ~name ~(id : trace_id) ~ts
|
||||
~(flavor : Sub.flavor option) ~args (self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||
self.pid (int64_of_trace_id_ id) tid ts str_val name
|
||||
(match flavor with
|
||||
| None | Some Async -> 'e'
|
||||
| Some Sync -> 'E')
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json}
|
||||
self.pid tid ts str_val name
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_name_thread ~tid ~name (self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid
|
||||
tid
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ "name", U_string name ];
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_name_process ~name (self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ "name", U_string name ];
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_counter ~name ~tid ~ts (self : t) f : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid
|
||||
tid ts
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ name, U_float f ];
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
end
|
||||
module Subscriber = Subscriber
|
||||
module Exporter = Exporter
|
||||
module Writer = Writer
|
||||
|
||||
let block_signals () =
|
||||
try
|
||||
|
|
@ -228,97 +19,14 @@ let block_signals () =
|
|||
: _ list)
|
||||
with _ -> ()
|
||||
|
||||
let print_non_closed_spans_warning spans =
|
||||
let module Str_set = Set.Make (String) in
|
||||
Printf.eprintf "trace-tef: warning: %d spans were not closed\n"
|
||||
(Span_tbl.length spans);
|
||||
let names = ref Str_set.empty in
|
||||
Span_tbl.iter (fun _ span -> names := Str_set.add span.name !names) spans;
|
||||
Str_set.iter
|
||||
(fun name -> Printf.eprintf " span %S was not closed\n" name)
|
||||
!names;
|
||||
flush stderr
|
||||
|
||||
(** Background thread, takes events from the queue, puts them in context using
|
||||
local state, and writes fully resolved TEF events to [out]. *)
|
||||
let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit =
|
||||
block_signals ();
|
||||
|
||||
(* open a writer to [out] *)
|
||||
Writer.with_ ~mode ~out @@ fun writer ->
|
||||
(* local state, to keep track of span information and implicit stack context *)
|
||||
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
|
||||
|
||||
(* add function name, if provided, to the metadata *)
|
||||
let add_fun_name_ fun_name data : _ list =
|
||||
match fun_name with
|
||||
| None -> data
|
||||
| Some f -> ("function", Sub.U_string f) :: data
|
||||
in
|
||||
|
||||
(* how to deal with an event *)
|
||||
let handle_ev (ev : Event.t) : unit =
|
||||
match ev with
|
||||
| E_tick -> Writer.flush writer
|
||||
| E_message { tid; msg; time_us; data } ->
|
||||
Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer
|
||||
| E_define_span { tid; name; id; time_us; fun_name; data } ->
|
||||
let data = add_fun_name_ fun_name data in
|
||||
let info = { tid; name; start_us = time_us; data } in
|
||||
(* save the span so we find it at exit *)
|
||||
Span_tbl.add spans id info
|
||||
| E_exit_span { id; time_us = stop_us } ->
|
||||
(match Span_tbl.find_opt spans id with
|
||||
| None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id)
|
||||
| Some { tid; name; start_us; data } ->
|
||||
Span_tbl.remove spans id;
|
||||
Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us
|
||||
~args:data writer)
|
||||
| E_add_data { id; data } ->
|
||||
(match Span_tbl.find_opt spans id with
|
||||
| None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id)
|
||||
| Some info -> info.data <- List.rev_append data info.data)
|
||||
| E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } ->
|
||||
let data = add_fun_name_ fun_name data in
|
||||
Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor
|
||||
writer
|
||||
| E_exit_manual_span { tid; time_us; name; id; flavor; data } ->
|
||||
Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data
|
||||
writer
|
||||
| E_counter { tid; name; time_us; n } ->
|
||||
Writer.emit_counter ~name ~tid ~ts:time_us writer n
|
||||
| E_name_process { name } -> Writer.emit_name_process ~name writer
|
||||
| E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer
|
||||
in
|
||||
|
||||
try
|
||||
while true do
|
||||
(* get all the events in the incoming blocking queue, in
|
||||
one single critical section. *)
|
||||
let local = B_queue.pop_all events in
|
||||
List.iter handle_ev local
|
||||
done
|
||||
with B_queue.Closed ->
|
||||
(* write a message about us closing *)
|
||||
Writer.emit_instant_event ~name:"tef-worker.exit"
|
||||
~tid:(Thread.id @@ Thread.self ())
|
||||
~ts:(time_us_of_time_ns @@ Sub.Private_.now_ns ())
|
||||
~args:[] writer;
|
||||
|
||||
(* warn if app didn't close all spans *)
|
||||
if Span_tbl.length spans > 0 then print_non_closed_spans_warning spans;
|
||||
()
|
||||
|
||||
(** Thread that simply regularly "ticks", sending events to the background
|
||||
thread so it has a chance to write to the file *)
|
||||
let tick_thread events : unit =
|
||||
let tick_thread (sub : Subscriber.t) : unit =
|
||||
block_signals ();
|
||||
try
|
||||
while true do
|
||||
while Subscriber.active sub do
|
||||
Thread.delay 0.5;
|
||||
B_queue.push events E_tick
|
||||
Subscriber.flush sub
|
||||
done
|
||||
with B_queue.Closed -> ()
|
||||
|
||||
type output =
|
||||
[ `Stdout
|
||||
|
|
@ -326,91 +34,45 @@ type output =
|
|||
| `File of string
|
||||
]
|
||||
|
||||
module Internal_st = struct
|
||||
type t = {
|
||||
active: bool A.t;
|
||||
events: Event.t B_queue.t;
|
||||
t_write: Thread.t;
|
||||
}
|
||||
end
|
||||
|
||||
let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t =
|
||||
let module M : Sub.Callbacks.S with type st = Internal_st.t = struct
|
||||
type st = Internal_st.t
|
||||
|
||||
let on_init _ ~time_ns:_ = ()
|
||||
|
||||
let on_shutdown (self : st) ~time_ns:_ =
|
||||
if A.exchange self.active false then (
|
||||
B_queue.close self.events;
|
||||
(* wait for writer thread to be done. The writer thread will exit
|
||||
after processing remaining events because the queue is now closed *)
|
||||
Thread.join self.t_write
|
||||
)
|
||||
|
||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
||||
B_queue.push self.events @@ E_name_process { name }
|
||||
|
||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
||||
B_queue.push self.events @@ E_name_thread { tid; name }
|
||||
|
||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
B_queue.push self.events
|
||||
@@ E_define_span { tid; name; time_us; id = span; fun_name; data }
|
||||
|
||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
B_queue.push self.events @@ E_exit_span { id = span; time_us }
|
||||
|
||||
let on_add_data (self : st) ~data span =
|
||||
if data <> [] then
|
||||
B_queue.push self.events @@ E_add_data { id = span; data }
|
||||
|
||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
B_queue.push self.events @@ E_message { tid; time_us; msg; data }
|
||||
|
||||
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
B_queue.push self.events @@ E_counter { name; n = f; time_us; tid }
|
||||
|
||||
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span
|
||||
: unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
B_queue.push self.events
|
||||
@@ E_enter_manual_span
|
||||
{ id = trace_id; time_us; tid; data; name; fun_name; flavor }
|
||||
|
||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
|
||||
~trace_id (_ : span) : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
B_queue.push self.events
|
||||
@@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor }
|
||||
|
||||
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
|
||||
end in
|
||||
let events = B_queue.create () in
|
||||
let t_write =
|
||||
Thread.create
|
||||
(fun () -> Fun.protect ~finally @@ fun () -> bg_thread ~mode ~out events)
|
||||
()
|
||||
let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () :
|
||||
Trace_subscriber.t =
|
||||
let jsonl = mode = `Jsonl in
|
||||
let oc, must_close =
|
||||
match out with
|
||||
| `Stdout -> stdout, false
|
||||
| `Stderr -> stderr, false
|
||||
| `File path -> open_out path, true
|
||||
| `File_append path ->
|
||||
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
|
||||
| `Output oc -> oc, false
|
||||
in
|
||||
let pid =
|
||||
if !Trace_subscriber.Private_.mock then
|
||||
2
|
||||
else
|
||||
Unix.getpid ()
|
||||
in
|
||||
|
||||
(* ticker thread, regularly sends a message to the writer thread.
|
||||
no need to join it. *)
|
||||
let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () in
|
||||
let st : Internal_st.t = { active = A.make true; events; t_write } in
|
||||
Sub.Subscriber.Sub { st; callbacks = (module M) }
|
||||
let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in
|
||||
let exporter =
|
||||
{
|
||||
exporter with
|
||||
close =
|
||||
(fun () ->
|
||||
exporter.close ();
|
||||
finally ());
|
||||
}
|
||||
in
|
||||
let sub = Subscriber.create ~pid ~exporter () in
|
||||
let _t_tick : Thread.t = Thread.create tick_thread sub in
|
||||
Subscriber.subscriber sub
|
||||
|
||||
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
||||
: collector =
|
||||
let sub = subscriber_ ~finally ~mode ~out () in
|
||||
Sub.collector sub
|
||||
Trace_subscriber.collector sub
|
||||
|
||||
let[@inline] subscriber ~out () : Sub.t =
|
||||
let[@inline] subscriber ~out () : Trace_subscriber.t =
|
||||
subscriber_ ~finally:ignore ~mode:`Single ~out ()
|
||||
|
||||
let[@inline] collector ~out () : collector =
|
||||
|
|
@ -438,14 +100,26 @@ let with_setup ?out () f =
|
|||
setup ?out ();
|
||||
Fun.protect ~finally:Trace_core.shutdown f
|
||||
|
||||
module Mock_ = struct
|
||||
let now = ref 0
|
||||
|
||||
(* used to mock timing *)
|
||||
let get_now_ns () : int64 =
|
||||
let x = !now in
|
||||
incr now;
|
||||
Int64.(mul (of_int x) 1000L)
|
||||
|
||||
let get_tid_ () : int = 3
|
||||
end
|
||||
|
||||
module Private_ = struct
|
||||
let mock_all_ () =
|
||||
Sub.Private_.mock := true;
|
||||
Sub.Private_.get_now_ns_ := Mock_.get_now_ns;
|
||||
Sub.Private_.get_tid_ := Mock_.get_tid_;
|
||||
Trace_subscriber.Private_.mock := true;
|
||||
Trace_subscriber.Private_.get_now_ns_ := Mock_.get_now_ns;
|
||||
Trace_subscriber.Private_.get_tid_ := Mock_.get_tid_;
|
||||
()
|
||||
|
||||
let on_tracing_error = on_tracing_error
|
||||
let on_tracing_error = Subscriber.on_tracing_error
|
||||
|
||||
let subscriber_jsonl ~finally ~out () =
|
||||
subscriber_ ~finally ~mode:`Jsonl ~out ()
|
||||
|
|
|
|||
|
|
@ -1,3 +1,7 @@
|
|||
module Subscriber = Subscriber
|
||||
module Exporter = Exporter
|
||||
module Writer = Writer
|
||||
|
||||
type output =
|
||||
[ `Stdout
|
||||
| `Stderr
|
||||
|
|
|
|||
97
src/tef/writer.ml
Normal file
97
src/tef/writer.ml
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
open Common_
|
||||
|
||||
let char = Buffer.add_char
|
||||
let raw_string = Buffer.add_string
|
||||
|
||||
let str_val (buf : Buffer.t) (s : string) =
|
||||
char buf '"';
|
||||
let encode_char c =
|
||||
match c with
|
||||
| '"' -> raw_string buf {|\"|}
|
||||
| '\\' -> raw_string buf {|\\|}
|
||||
| '\n' -> raw_string buf {|\n|}
|
||||
| '\b' -> raw_string buf {|\b|}
|
||||
| '\r' -> raw_string buf {|\r|}
|
||||
| '\t' -> raw_string buf {|\t|}
|
||||
| _ when Char.code c <= 0x1f ->
|
||||
raw_string buf {|\u00|};
|
||||
Printf.bprintf buf "%02x" (Char.code c)
|
||||
| c -> char buf c
|
||||
in
|
||||
String.iter encode_char s;
|
||||
char buf '"'
|
||||
|
||||
let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function
|
||||
| U_none -> raw_string out "null"
|
||||
| U_int i -> Printf.bprintf out "%d" i
|
||||
| U_bool b -> Printf.bprintf out "%b" b
|
||||
| U_string s -> str_val out s
|
||||
| U_float f -> Printf.bprintf out "%g" f
|
||||
|
||||
(* emit args, if not empty. [ppv] is used to print values. *)
|
||||
let emit_args_o_ ppv (out : Buffer.t) args : unit =
|
||||
if args <> [] then (
|
||||
Printf.bprintf out {json|,"args": {|json};
|
||||
List.iteri
|
||||
(fun i (n, value) ->
|
||||
if i > 0 then raw_string out ",";
|
||||
Printf.bprintf out {json|"%s":%a|json} n ppv value)
|
||||
args;
|
||||
char out '}'
|
||||
)
|
||||
|
||||
let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit =
|
||||
let dur = end_ -. start in
|
||||
let ts = start in
|
||||
|
||||
Printf.bprintf buf
|
||||
{json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json}
|
||||
pid tid dur ts str_val name
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args
|
||||
|
||||
let emit_manual_begin ~pid ~tid ~name ~(id : int64) ~ts ~args
|
||||
~(flavor : Sub.flavor option) buf : unit =
|
||||
Printf.bprintf buf
|
||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||
pid id tid ts str_val name
|
||||
(match flavor with
|
||||
| None | Some Async -> 'b'
|
||||
| Some Sync -> 'B')
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args
|
||||
|
||||
let emit_manual_end ~pid ~tid ~name ~(id : int64) ~ts
|
||||
~(flavor : Sub.flavor option) ~args buf : unit =
|
||||
Printf.bprintf buf
|
||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||
pid id tid ts str_val name
|
||||
(match flavor with
|
||||
| None | Some Async -> 'e'
|
||||
| Some Sync -> 'E')
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args
|
||||
|
||||
let emit_instant_event ~pid ~tid ~name ~ts ~args buf : unit =
|
||||
Printf.bprintf buf
|
||||
{json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json}
|
||||
pid tid ts str_val name
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args
|
||||
|
||||
let emit_name_thread ~pid ~tid ~name buf : unit =
|
||||
Printf.bprintf buf
|
||||
{json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} pid tid
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ "name", U_string name ]
|
||||
|
||||
let emit_name_process ~pid ~name buf : unit =
|
||||
Printf.bprintf buf {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} pid
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ "name", U_string name ]
|
||||
|
||||
let emit_counter ~pid ~tid ~name ~ts buf f : unit =
|
||||
Printf.bprintf buf
|
||||
{json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} pid tid ts
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ name, U_float f ]
|
||||
54
src/tef/writer.mli
Normal file
54
src/tef/writer.mli
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
(** Write JSON events to a buffer.
|
||||
|
||||
This is the part of the code that knows how to emit TEF-compliant JSON from
|
||||
raw event data. *)
|
||||
|
||||
open Common_
|
||||
open Trace_core
|
||||
|
||||
val emit_duration_event :
|
||||
pid:int ->
|
||||
tid:int ->
|
||||
name:string ->
|
||||
start:float ->
|
||||
end_:float ->
|
||||
args:(string * Sub.user_data) list ->
|
||||
Buffer.t ->
|
||||
unit
|
||||
|
||||
val emit_manual_begin :
|
||||
pid:int ->
|
||||
tid:int ->
|
||||
name:string ->
|
||||
id:span ->
|
||||
ts:float ->
|
||||
args:(string * Sub.user_data) list ->
|
||||
flavor:Sub.flavor option ->
|
||||
Buffer.t ->
|
||||
unit
|
||||
|
||||
val emit_manual_end :
|
||||
pid:int ->
|
||||
tid:int ->
|
||||
name:string ->
|
||||
id:span ->
|
||||
ts:float ->
|
||||
flavor:Sub.flavor option ->
|
||||
args:(string * Sub.user_data) list ->
|
||||
Buffer.t ->
|
||||
unit
|
||||
|
||||
val emit_instant_event :
|
||||
pid:int ->
|
||||
tid:int ->
|
||||
name:string ->
|
||||
ts:float ->
|
||||
args:(string * Sub.user_data) list ->
|
||||
Buffer.t ->
|
||||
unit
|
||||
|
||||
val emit_name_thread : pid:int -> tid:int -> name:string -> Buffer.t -> unit
|
||||
val emit_name_process : pid:int -> name:string -> Buffer.t -> unit
|
||||
|
||||
val emit_counter :
|
||||
pid:int -> tid:int -> name:string -> ts:float -> Buffer.t -> float -> unit
|
||||
Loading…
Add table
Reference in a new issue