diff --git a/src/tef/common_.ml b/src/tef/common_.ml new file mode 100644 index 0000000..d09157b --- /dev/null +++ b/src/tef/common_.ml @@ -0,0 +1,18 @@ +let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = + if id == Trace_core.Collector.dummy_trace_id then + 0L + else + Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 + +module Mock_ = struct + let enabled = ref false + let now = ref 0 + + (* used to mock timing *) + let get_now_ns () : float = + let x = !now in + incr now; + float_of_int x *. 1000. + + let get_tid_ () : int = 3 +end diff --git a/src/tef/dune b/src/tef/dune index 2e036f9..38a585d 100644 --- a/src/tef/dune +++ b/src/tef/dune @@ -7,6 +7,7 @@ trace.core trace.private.util trace.subscriber + trace.stdext mtime mtime.clock.os unix diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index a24f5a1..776e4c8 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -1,30 +1,13 @@ open Trace_core open Trace_private_util open Event +open Common_ module Sub = Trace_subscriber module A = Trace_core.Internal_.Atomic_ +module Writer = Writer let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) -let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = - if id == Trace_core.Collector.dummy_trace_id then - 0L - else - Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 - -module Mock_ = struct - let enabled = ref false - let now = ref 0 - - (* used to mock timing *) - let get_now_ns () : float = - let x = !now in - incr now; - float_of_int x *. 1000. - - let get_tid_ () : int = 3 -end - module Span_tbl = Hashtbl.Make (struct include Int64 @@ -38,178 +21,6 @@ type span_info = { mutable data: (string * Sub.user_data) list; } -(** Writer: knows how to write entries to a file in TEF format *) -module Writer = struct - type t = { - oc: out_channel; - jsonl: bool; (** JSONL mode, one json event per line *) - mutable first: bool; (** first event? useful in json mode *) - buf: Buffer.t; (** Buffer to write into *) - must_close: bool; (** Do we have to close the underlying channel [oc]? *) - pid: int; - } - (** A writer to a [out_channel]. It writes JSON entries in an array and closes - the array at the end. *) - - let create ~(mode : [ `Single | `Jsonl ]) ~out () : t = - let jsonl = mode = `Jsonl in - let oc, must_close = - match out with - | `Stdout -> stdout, false - | `Stderr -> stderr, false - | `File path -> open_out path, true - | `File_append path -> - open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true - | `Output oc -> oc, false - in - let pid = - if !Mock_.enabled then - 2 - else - Unix.getpid () - in - if not jsonl then output_char oc '['; - { oc; jsonl; first = true; pid; must_close; buf = Buffer.create 2_048 } - - let close (self : t) : unit = - if self.jsonl then - output_char self.oc '\n' - else - output_char self.oc ']'; - flush self.oc; - if self.must_close then close_out self.oc - - let with_ ~mode ~out f = - let writer = create ~mode ~out () in - Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) - - let[@inline] flush (self : t) : unit = flush self.oc - - (** Emit "," if we need, and get the buffer ready *) - let emit_sep_and_start_ (self : t) = - Buffer.reset self.buf; - if self.jsonl then - Buffer.add_char self.buf '\n' - else if self.first then - self.first <- false - else - Buffer.add_string self.buf ",\n" - - let char = Buffer.add_char - let raw_string = Buffer.add_string - - let str_val (buf : Buffer.t) (s : string) = - char buf '"'; - let encode_char c = - match c with - | '"' -> raw_string buf {|\"|} - | '\\' -> raw_string buf {|\\|} - | '\n' -> raw_string buf {|\n|} - | '\b' -> raw_string buf {|\b|} - | '\r' -> raw_string buf {|\r|} - | '\t' -> raw_string buf {|\t|} - | _ when Char.code c <= 0x1f -> - raw_string buf {|\u00|}; - Printf.bprintf buf "%02x" (Char.code c) - | c -> char buf c - in - String.iter encode_char s; - char buf '"' - - let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function - | U_none -> raw_string out "null" - | U_int i -> Printf.bprintf out "%d" i - | U_bool b -> Printf.bprintf out "%b" b - | U_string s -> str_val out s - | U_float f -> Printf.bprintf out "%g" f - - (* emit args, if not empty. [ppv] is used to print values. *) - let emit_args_o_ ppv (out : Buffer.t) args : unit = - if args <> [] then ( - Printf.bprintf out {json|,"args": {|json}; - List.iteri - (fun i (n, value) -> - if i > 0 then raw_string out ","; - Printf.bprintf out {json|"%s":%a|json} n ppv value) - args; - char out '}' - ) - - let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit = - let dur = end_ -. start in - let ts = start in - - emit_sep_and_start_ self; - - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} - self.pid tid dur ts str_val name - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_manual_begin ~tid ~name ~(id : trace_id) ~ts ~args - ~(flavor : Sub.flavor option) (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - self.pid (int64_of_trace_id_ id) tid ts str_val name - (match flavor with - | None | Some Async -> 'b' - | Some Sync -> 'B') - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_manual_end ~tid ~name ~(id : trace_id) ~ts - ~(flavor : Sub.flavor option) ~args (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - self.pid (int64_of_trace_id_ id) tid ts str_val name - (match flavor with - | None | Some Async -> 'e' - | Some Sync -> 'E') - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} - self.pid tid ts str_val name - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_name_thread ~tid ~name (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid - tid - (emit_args_o_ pp_user_data_) - [ "name", U_string name ]; - Buffer.output_buffer self.oc self.buf - - let emit_name_process ~name (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid - (emit_args_o_ pp_user_data_) - [ "name", U_string name ]; - Buffer.output_buffer self.oc self.buf - - let emit_counter ~name ~tid ~ts (self : t) f : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid - tid ts - (emit_args_o_ pp_user_data_) - [ name, U_float f ]; - Buffer.output_buffer self.oc self.buf -end - let block_signals () = try ignore diff --git a/src/tef/writer.ml b/src/tef/writer.ml new file mode 100644 index 0000000..1b7e082 --- /dev/null +++ b/src/tef/writer.ml @@ -0,0 +1,170 @@ +open Trace_core +open Common_ +module Sub = Trace_subscriber + +type t = { + oc: out_channel; + jsonl: bool; (** JSONL mode, one json event per line *) + mutable first: bool; (** first event? useful in json mode *) + buf: Buffer.t; (** Buffer to write into *) + must_close: bool; (** Do we have to close the underlying channel [oc]? *) + pid: int; +} + +let create ~(mode : [ `Single | `Jsonl ]) ~out () : t = + let jsonl = mode = `Jsonl in + let oc, must_close = + match out with + | `Stdout -> stdout, false + | `Stderr -> stderr, false + | `File path -> open_out path, true + | `File_append path -> + open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true + | `Output oc -> oc, false + in + let pid = + if !Mock_.enabled then + 2 + else + Unix.getpid () + in + if not jsonl then output_char oc '['; + { oc; jsonl; first = true; pid; must_close; buf = Buffer.create 2_048 } + +let close (self : t) : unit = + if self.jsonl then + output_char self.oc '\n' + else + output_char self.oc ']'; + flush self.oc; + if self.must_close then close_out self.oc + +let with_ ~mode ~out f = + let writer = create ~mode ~out () in + Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) + +let[@inline] flush (self : t) : unit = flush self.oc + +(** Emit "," if we need, and get the buffer ready *) +let emit_sep_and_start_ (self : t) = + Buffer.reset self.buf; + if self.jsonl then + Buffer.add_char self.buf '\n' + else if self.first then + self.first <- false + else + Buffer.add_string self.buf ",\n" + +let char = Buffer.add_char +let raw_string = Buffer.add_string + +let str_val (buf : Buffer.t) (s : string) = + char buf '"'; + let encode_char c = + match c with + | '"' -> raw_string buf {|\"|} + | '\\' -> raw_string buf {|\\|} + | '\n' -> raw_string buf {|\n|} + | '\b' -> raw_string buf {|\b|} + | '\r' -> raw_string buf {|\r|} + | '\t' -> raw_string buf {|\t|} + | _ when Char.code c <= 0x1f -> + raw_string buf {|\u00|}; + Printf.bprintf buf "%02x" (Char.code c) + | c -> char buf c + in + String.iter encode_char s; + char buf '"' + +let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function + | U_none -> raw_string out "null" + | U_int i -> Printf.bprintf out "%d" i + | U_bool b -> Printf.bprintf out "%b" b + | U_string s -> str_val out s + | U_float f -> Printf.bprintf out "%g" f + +(* emit args, if not empty. [ppv] is used to print values. *) +let emit_args_o_ ppv (out : Buffer.t) args : unit = + if args <> [] then ( + Printf.bprintf out {json|,"args": {|json}; + List.iteri + (fun i (n, value) -> + if i > 0 then raw_string out ","; + Printf.bprintf out {json|"%s":%a|json} n ppv value) + args; + char out '}' + ) + +let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit = + let dur = end_ -. start in + let ts = start in + + emit_sep_and_start_ self; + + Printf.bprintf self.buf + {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} + self.pid tid dur ts str_val name + (emit_args_o_ pp_user_data_) + args; + Buffer.output_buffer self.oc self.buf + +let emit_manual_begin ~tid ~name ~(id : trace_id) ~ts ~args + ~(flavor : Sub.flavor option) (self : t) : unit = + emit_sep_and_start_ self; + Printf.bprintf self.buf + {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} + self.pid (int64_of_trace_id_ id) tid ts str_val name + (match flavor with + | None | Some Async -> 'b' + | Some Sync -> 'B') + (emit_args_o_ pp_user_data_) + args; + Buffer.output_buffer self.oc self.buf + +let emit_manual_end ~tid ~name ~(id : trace_id) ~ts + ~(flavor : Sub.flavor option) ~args (self : t) : unit = + emit_sep_and_start_ self; + Printf.bprintf self.buf + {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} + self.pid (int64_of_trace_id_ id) tid ts str_val name + (match flavor with + | None | Some Async -> 'e' + | Some Sync -> 'E') + (emit_args_o_ pp_user_data_) + args; + Buffer.output_buffer self.oc self.buf + +let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = + emit_sep_and_start_ self; + Printf.bprintf self.buf + {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} + self.pid tid ts str_val name + (emit_args_o_ pp_user_data_) + args; + Buffer.output_buffer self.oc self.buf + +let emit_name_thread ~tid ~name (self : t) : unit = + emit_sep_and_start_ self; + Printf.bprintf self.buf + {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid + tid + (emit_args_o_ pp_user_data_) + [ "name", U_string name ]; + Buffer.output_buffer self.oc self.buf + +let emit_name_process ~name (self : t) : unit = + emit_sep_and_start_ self; + Printf.bprintf self.buf + {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid + (emit_args_o_ pp_user_data_) + [ "name", U_string name ]; + Buffer.output_buffer self.oc self.buf + +let emit_counter ~name ~tid ~ts (self : t) f : unit = + emit_sep_and_start_ self; + Printf.bprintf self.buf + {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid tid + ts + (emit_args_o_ pp_user_data_) + [ name, U_float f ]; + Buffer.output_buffer self.oc self.buf diff --git a/src/tef/writer.mli b/src/tef/writer.mli new file mode 100644 index 0000000..11d27e2 --- /dev/null +++ b/src/tef/writer.mli @@ -0,0 +1,80 @@ +(** Writer: knows how to write entries to a file in TEF format *) + +open Trace_core +module Sub = Trace_subscriber + +type t +(** A writer to a [out_channel]. It writes JSON entries in an array and closes + the array at the end. *) + +val create : + mode:[ `Jsonl | `Single ] -> + out: + [< `File of trace_id + | `File_append of trace_id + | `Output of out_channel + | `Stderr + | `Stdout + ] -> + unit -> + t + +val flush : t -> unit +val close : t -> unit + +val with_ : + mode:[ `Jsonl | `Single ] -> + out: + [< `File of trace_id + | `File_append of trace_id + | `Output of out_channel + | `Stderr + | `Stdout + ] -> + (t -> 'a) -> + 'a +(** [with_ ~mode ~out f] creates a writer and calls [f] with it. + @param mode + choose between jsonl (easier to read and write) and single (single json + object, directly usable in perfetto) *) + +val emit_duration_event : + tid:int -> + name:trace_id -> + start:float -> + end_:float -> + args:(trace_id * Sub.user_data) list -> + t -> + unit + +val emit_manual_begin : + tid:int -> + name:trace_id -> + id:trace_id -> + ts:float -> + args:(trace_id * Sub.user_data) list -> + flavor:Sub.flavor option -> + t -> + unit + +val emit_manual_end : + tid:int -> + name:trace_id -> + id:trace_id -> + ts:float -> + flavor:Sub.flavor option -> + args:(trace_id * Sub.user_data) list -> + t -> + unit + +val emit_instant_event : + tid:int -> + name:trace_id -> + ts:float -> + args:(trace_id * Sub.user_data) list -> + t -> + unit + +val emit_name_thread : tid:int -> name:trace_id -> t -> unit +val emit_name_process : name:trace_id -> t -> unit +val emit_counter : name:trace_id -> tid:int -> ts:float -> t -> float -> unit