diff --git a/src/tef/common_.ml b/src/tef/common_.ml new file mode 100644 index 0000000..114d81c --- /dev/null +++ b/src/tef/common_.ml @@ -0,0 +1,4 @@ +module Sub = Trace_subscriber +module A = Trace_core.Internal_.Atomic_ + +let ( let@ ) = ( @@ ) diff --git a/src/tef/emit_tef.ml b/src/tef/emit_tef.ml new file mode 100644 index 0000000..e69de29 diff --git a/src/tef/event.ml b/src/tef/event.ml deleted file mode 100644 index 7e5c0fc..0000000 --- a/src/tef/event.ml +++ /dev/null @@ -1,56 +0,0 @@ -open Trace_core -module Sub = Trace_subscriber - -(** An event, specialized for TEF *) -type t = - | E_tick - | E_message of { - tid: int; - msg: string; - time_us: float; - data: (string * Sub.user_data) list; - } - | E_define_span of { - tid: int; - name: string; - time_us: float; - id: span; - fun_name: string option; - data: (string * Sub.user_data) list; - } - | E_exit_span of { - id: span; - time_us: float; - } - | E_add_data of { - id: span; - data: (string * Sub.user_data) list; - } - | E_enter_manual_span of { - tid: int; - name: string; - time_us: float; - id: trace_id; - flavor: Sub.flavor option; - fun_name: string option; - data: (string * Sub.user_data) list; - } - | E_exit_manual_span of { - tid: int; - name: string; - time_us: float; - flavor: Sub.flavor option; - data: (string * Sub.user_data) list; - id: trace_id; - } - | E_counter of { - name: string; - tid: int; - time_us: float; - n: float; - } - | E_name_process of { name: string } - | E_name_thread of { - tid: int; - name: string; - } diff --git a/src/tef/exporter.ml b/src/tef/exporter.ml new file mode 100644 index 0000000..561d80d --- /dev/null +++ b/src/tef/exporter.ml @@ -0,0 +1,85 @@ +(** An exporter, takes JSON objects and writes them somewhere *) + +open Common_ + +type t = { + on_json: Buffer.t -> unit; + (** Takes a buffer and writes it somewhere. The buffer is only valid + during this call and must not be stored. *) + flush: unit -> unit; (** Force write *) + close: unit -> unit; (** Close underlying resources *) +} +(** An exporter, takes JSON objects and writes them somewhere. + + This should be thread-safe if used in a threaded environment. *) + +open struct + let with_lock lock f = + Mutex.lock lock; + try + let res = f () in + Mutex.unlock lock; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + Mutex.unlock lock; + Printexc.raise_with_backtrace e bt +end + +(** Export to the channel + @param jsonl + if true, export as a JSON object per line, otherwise export as a single + big JSON array. + @param close_channel if true, closing the exporter will close the channel *) +let of_out_channel ~close_channel ~jsonl oc : t = + let lock = Mutex.create () in + let first = ref true in + let closed = ref false in + let flush () = + let@ () = with_lock lock in + flush oc + in + let close () = + let@ () = with_lock lock in + if not !closed then ( + closed := true; + if not jsonl then output_char oc ']'; + if close_channel then close_out_noerr oc + ) + in + let on_json buf = + let@ () = with_lock lock in + if not jsonl then + if !first then ( + if not jsonl then output_char oc '['; + first := false + ) else + output_string oc ",\n"; + Buffer.output_buffer oc buf; + if jsonl then output_char oc '\n' + in + { flush; close; on_json } + +let of_buffer ~jsonl (buf : Buffer.t) : t = + let lock = Mutex.create () in + let first = ref true in + let closed = ref false in + let close () = + let@ () = with_lock lock in + if not !closed then ( + closed := true; + if not jsonl then Buffer.add_char buf ']' + ) + in + let on_json json = + let@ () = with_lock lock in + if not jsonl then + if !first then ( + if not jsonl then Buffer.add_char buf '['; + first := false + ) else + Buffer.add_string buf ",\n"; + Buffer.add_buffer buf json; + if jsonl then Buffer.add_char buf '\n' + in + { flush = ignore; close; on_json } diff --git a/src/tef/subscriber.ml b/src/tef/subscriber.ml new file mode 100644 index 0000000..d3e4b54 --- /dev/null +++ b/src/tef/subscriber.ml @@ -0,0 +1,173 @@ +open Common_ +open Trace_core +open Trace_private_util +module Span_tbl = Sub.Span_tbl + +module Buf_pool = struct + type t = Buffer.t Rpool.t + + let create ?(max_size = 32) ?(buf_size = 256) () : t = + Rpool.create ~max_size ~clear:Buffer.reset + ~create:(fun () -> Buffer.create buf_size) + () +end + +open struct + let[@inline] time_us_of_time_ns (t : int64) : float = + Int64.div t 1_000L |> Int64.to_float + + let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = + if id == Trace_core.Collector.dummy_trace_id then + 0L + else + Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 +end + +let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) + +type span_info = { + tid: int; + name: string; + start_us: float; + mutable data: (string * Sub.user_data) list; + (* NOTE: thread safety: this is supposed to only be modified by the thread +that's running this (synchronous, stack-abiding) span. *) +} +(** Information we store about a span begin event, to emit a complete event when + we meet the corresponding span end event *) + +type t = { + active: bool A.t; + pid: int; + spans: span_info Span_tbl.t; + buf_pool: Buf_pool.t; + exporter: Exporter.t; +} +(** Subscriber state *) + +open struct + let print_non_closed_spans_warning spans = + let module Str_set = Set.Make (String) in + let spans = Span_tbl.to_list spans in + if spans <> [] then ( + !on_tracing_error + @@ Printf.sprintf "trace-tef: warning: %d spans were not closed\n" + (List.length spans); + let names = + List.fold_left + (fun set (_, span) -> Str_set.add span.name set) + Str_set.empty spans + in + Str_set.iter + (fun name -> + !on_tracing_error @@ Printf.sprintf " span %S was not closed\n" name) + names; + flush stderr + ) +end + +let close (self : t) : unit = + if A.exchange self.active false then ( + print_non_closed_spans_warning self.spans; + self.exporter.close () + ) + +let[@inline] active self = A.get self.active +let[@inline] flush (self : t) : unit = self.exporter.flush () + +let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = + { active = A.make true; exporter; buf_pool; pid; spans = Span_tbl.create () } + +module Callbacks = struct + type st = t + + let on_init _ ~time_ns:_ = () + let on_shutdown (self : st) ~time_ns:_ = close self + + let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_name_process ~pid:self.pid ~name buf; + self.exporter.on_json buf + + let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_name_thread buf ~pid:self.pid ~tid ~name; + self.exporter.on_json buf + + (* add function name, if provided, to the metadata *) + let add_fun_name_ fun_name data : _ list = + match fun_name with + | None -> data + | Some f -> ("function", Sub.U_string f) :: data + + let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = + let time_us = time_us_of_time_ns @@ time_ns in + let data = add_fun_name_ fun_name data in + let info = { tid; name; start_us = time_us; data } in + (* save the span so we find it at exit *) + Span_tbl.add self.spans span info + + let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = + let time_us = time_us_of_time_ns @@ time_ns in + + match Span_tbl.find_exn self.spans span with + | exception Not_found -> + !on_tracing_error (Printf.sprintf "cannot find span %Ld" span) + | { tid; name; start_us; data } -> + Span_tbl.remove self.spans span; + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_duration_event buf ~pid:self.pid ~tid ~name ~start:start_us + ~end_:time_us ~args:data; + + self.exporter.on_json buf + + let on_add_data (self : st) ~data span = + if data <> [] then ( + try + let info = Span_tbl.find_exn self.spans span in + info.data <- List.rev_append data info.data + with Not_found -> + !on_tracing_error (Printf.sprintf "cannot find span %Ld" span) + ) + + let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = + let time_us = time_us_of_time_ns @@ time_ns in + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us + ~args:data; + self.exporter.on_json buf + + let on_counter (self : st) ~time_ns ~tid ~data:_ ~name n : unit = + let time_us = time_us_of_time_ns @@ time_ns in + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n; + self.exporter.on_json buf + + let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span : + unit = + let time_us = time_us_of_time_ns @@ time_ns in + + let data = add_fun_name_ fun_name data in + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_manual_begin buf ~pid:self.pid ~tid ~name + ~id:(int64_of_trace_id_ trace_id) + ~ts:time_us ~args:data ~flavor; + self.exporter.on_json buf + + let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor + ~trace_id (_ : span) : unit = + let time_us = time_us_of_time_ns @@ time_ns in + + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_manual_end buf ~pid:self.pid ~tid ~name + ~id:(int64_of_trace_id_ trace_id) + ~ts:time_us ~flavor ~args:data; + self.exporter.on_json buf + + let on_extension_event _ ~time_ns:_ ~tid:_ _ev = () +end + +let subscriber (self : t) : Sub.t = + Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) } diff --git a/src/tef/subscriber.mli b/src/tef/subscriber.mli new file mode 100644 index 0000000..9f2f235 --- /dev/null +++ b/src/tef/subscriber.mli @@ -0,0 +1,28 @@ +open Common_ + +module Buf_pool : sig + type t + + val create : ?max_size:int -> ?buf_size:int -> unit -> t +end + +type t +(** Main subscriber state. *) + +val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t +(** Create a subscriber state. *) + +val flush : t -> unit +val close : t -> unit +val active : t -> bool + +module Callbacks : Sub.Callbacks.S with type st = t + +val subscriber : t -> Sub.t +(** Subscriber that writes json into this writer *) + +(**/**) + +val on_tracing_error : (string -> unit) ref + +(**/**) diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index d9dd4f8..e29488f 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -1,216 +1,7 @@ open Trace_core -open Trace_private_util -open Event -module Sub = Trace_subscriber -module A = Trace_core.Internal_.Atomic_ - -let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) - -let[@inline] time_us_of_time_ns (t : int64) : float = - Int64.div t 1_000L |> Int64.to_float - -let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = - if id == Trace_core.Collector.dummy_trace_id then - 0L - else - Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 - -module Mock_ = struct - let now = ref 0 - - (* used to mock timing *) - let get_now_ns () : int64 = - let x = !now in - incr now; - Int64.(mul (of_int x) 1000L) - - let get_tid_ () : int = 3 -end - -module Span_tbl = Hashtbl.Make (struct - include Int64 - - let hash : t -> int = Hashtbl.hash -end) - -type span_info = { - tid: int; - name: string; - start_us: float; - mutable data: (string * Sub.user_data) list; -} - -(** Writer: knows how to write entries to a file in TEF format *) -module Writer = struct - type t = { - oc: out_channel; - jsonl: bool; (** JSONL mode, one json event per line *) - mutable first: bool; (** first event? useful in json mode *) - buf: Buffer.t; (** Buffer to write into *) - must_close: bool; (** Do we have to close the underlying channel [oc]? *) - pid: int; - } - (** A writer to a [out_channel]. It writes JSON entries in an array and closes - the array at the end. *) - - let create ~(mode : [ `Single | `Jsonl ]) ~out () : t = - let jsonl = mode = `Jsonl in - let oc, must_close = - match out with - | `Stdout -> stdout, false - | `Stderr -> stderr, false - | `File path -> open_out path, true - | `File_append path -> - open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true - | `Output oc -> oc, false - in - let pid = - if !Sub.Private_.mock then - 2 - else - Unix.getpid () - in - if not jsonl then output_char oc '['; - { oc; jsonl; first = true; pid; must_close; buf = Buffer.create 2_048 } - - let close (self : t) : unit = - if self.jsonl then - output_char self.oc '\n' - else - output_char self.oc ']'; - flush self.oc; - if self.must_close then close_out self.oc - - let with_ ~mode ~out f = - let writer = create ~mode ~out () in - Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) - - let[@inline] flush (self : t) : unit = flush self.oc - - (** Emit "," if we need, and get the buffer ready *) - let emit_sep_and_start_ (self : t) = - Buffer.reset self.buf; - if self.jsonl then - Buffer.add_char self.buf '\n' - else if self.first then - self.first <- false - else - Buffer.add_string self.buf ",\n" - - let char = Buffer.add_char - let raw_string = Buffer.add_string - - let str_val (buf : Buffer.t) (s : string) = - char buf '"'; - let encode_char c = - match c with - | '"' -> raw_string buf {|\"|} - | '\\' -> raw_string buf {|\\|} - | '\n' -> raw_string buf {|\n|} - | '\b' -> raw_string buf {|\b|} - | '\r' -> raw_string buf {|\r|} - | '\t' -> raw_string buf {|\t|} - | _ when Char.code c <= 0x1f -> - raw_string buf {|\u00|}; - Printf.bprintf buf "%02x" (Char.code c) - | c -> char buf c - in - String.iter encode_char s; - char buf '"' - - let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function - | U_none -> raw_string out "null" - | U_int i -> Printf.bprintf out "%d" i - | U_bool b -> Printf.bprintf out "%b" b - | U_string s -> str_val out s - | U_float f -> Printf.bprintf out "%g" f - - (* emit args, if not empty. [ppv] is used to print values. *) - let emit_args_o_ ppv (out : Buffer.t) args : unit = - if args <> [] then ( - Printf.bprintf out {json|,"args": {|json}; - List.iteri - (fun i (n, value) -> - if i > 0 then raw_string out ","; - Printf.bprintf out {json|"%s":%a|json} n ppv value) - args; - char out '}' - ) - - let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit = - let dur = end_ -. start in - let ts = start in - - emit_sep_and_start_ self; - - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} - self.pid tid dur ts str_val name - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_manual_begin ~tid ~name ~(id : trace_id) ~ts ~args - ~(flavor : Sub.flavor option) (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - self.pid (int64_of_trace_id_ id) tid ts str_val name - (match flavor with - | None | Some Async -> 'b' - | Some Sync -> 'B') - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_manual_end ~tid ~name ~(id : trace_id) ~ts - ~(flavor : Sub.flavor option) ~args (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - self.pid (int64_of_trace_id_ id) tid ts str_val name - (match flavor with - | None | Some Async -> 'e' - | Some Sync -> 'E') - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} - self.pid tid ts str_val name - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_name_thread ~tid ~name (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid - tid - (emit_args_o_ pp_user_data_) - [ "name", U_string name ]; - Buffer.output_buffer self.oc self.buf - - let emit_name_process ~name (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid - (emit_args_o_ pp_user_data_) - [ "name", U_string name ]; - Buffer.output_buffer self.oc self.buf - - let emit_counter ~name ~tid ~ts (self : t) f : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid - tid ts - (emit_args_o_ pp_user_data_) - [ name, U_float f ]; - Buffer.output_buffer self.oc self.buf -end +module Subscriber = Subscriber +module Exporter = Exporter +module Writer = Writer let block_signals () = try @@ -228,97 +19,14 @@ let block_signals () = : _ list) with _ -> () -let print_non_closed_spans_warning spans = - let module Str_set = Set.Make (String) in - Printf.eprintf "trace-tef: warning: %d spans were not closed\n" - (Span_tbl.length spans); - let names = ref Str_set.empty in - Span_tbl.iter (fun _ span -> names := Str_set.add span.name !names) spans; - Str_set.iter - (fun name -> Printf.eprintf " span %S was not closed\n" name) - !names; - flush stderr - -(** Background thread, takes events from the queue, puts them in context using - local state, and writes fully resolved TEF events to [out]. *) -let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit = - block_signals (); - - (* open a writer to [out] *) - Writer.with_ ~mode ~out @@ fun writer -> - (* local state, to keep track of span information and implicit stack context *) - let spans : span_info Span_tbl.t = Span_tbl.create 32 in - - (* add function name, if provided, to the metadata *) - let add_fun_name_ fun_name data : _ list = - match fun_name with - | None -> data - | Some f -> ("function", Sub.U_string f) :: data - in - - (* how to deal with an event *) - let handle_ev (ev : Event.t) : unit = - match ev with - | E_tick -> Writer.flush writer - | E_message { tid; msg; time_us; data } -> - Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer - | E_define_span { tid; name; id; time_us; fun_name; data } -> - let data = add_fun_name_ fun_name data in - let info = { tid; name; start_us = time_us; data } in - (* save the span so we find it at exit *) - Span_tbl.add spans id info - | E_exit_span { id; time_us = stop_us } -> - (match Span_tbl.find_opt spans id with - | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) - | Some { tid; name; start_us; data } -> - Span_tbl.remove spans id; - Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us - ~args:data writer) - | E_add_data { id; data } -> - (match Span_tbl.find_opt spans id with - | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) - | Some info -> info.data <- List.rev_append data info.data) - | E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } -> - let data = add_fun_name_ fun_name data in - Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor - writer - | E_exit_manual_span { tid; time_us; name; id; flavor; data } -> - Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data - writer - | E_counter { tid; name; time_us; n } -> - Writer.emit_counter ~name ~tid ~ts:time_us writer n - | E_name_process { name } -> Writer.emit_name_process ~name writer - | E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer - in - - try - while true do - (* get all the events in the incoming blocking queue, in - one single critical section. *) - let local = B_queue.pop_all events in - List.iter handle_ev local - done - with B_queue.Closed -> - (* write a message about us closing *) - Writer.emit_instant_event ~name:"tef-worker.exit" - ~tid:(Thread.id @@ Thread.self ()) - ~ts:(time_us_of_time_ns @@ Sub.Private_.now_ns ()) - ~args:[] writer; - - (* warn if app didn't close all spans *) - if Span_tbl.length spans > 0 then print_non_closed_spans_warning spans; - () - (** Thread that simply regularly "ticks", sending events to the background thread so it has a chance to write to the file *) -let tick_thread events : unit = +let tick_thread (sub : Subscriber.t) : unit = block_signals (); - try - while true do - Thread.delay 0.5; - B_queue.push events E_tick - done - with B_queue.Closed -> () + while Subscriber.active sub do + Thread.delay 0.5; + Subscriber.flush sub + done type output = [ `Stdout @@ -326,91 +34,45 @@ type output = | `File of string ] -module Internal_st = struct - type t = { - active: bool A.t; - events: Event.t B_queue.t; - t_write: Thread.t; - } -end - -let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t = - let module M : Sub.Callbacks.S with type st = Internal_st.t = struct - type st = Internal_st.t - - let on_init _ ~time_ns:_ = () - - let on_shutdown (self : st) ~time_ns:_ = - if A.exchange self.active false then ( - B_queue.close self.events; - (* wait for writer thread to be done. The writer thread will exit - after processing remaining events because the queue is now closed *) - Thread.join self.t_write - ) - - let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = - B_queue.push self.events @@ E_name_process { name } - - let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = - B_queue.push self.events @@ E_name_thread { tid; name } - - let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ - ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = - let time_us = time_us_of_time_ns @@ time_ns in - B_queue.push self.events - @@ E_define_span { tid; name; time_us; id = span; fun_name; data } - - let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = - let time_us = time_us_of_time_ns @@ time_ns in - B_queue.push self.events @@ E_exit_span { id = span; time_us } - - let on_add_data (self : st) ~data span = - if data <> [] then - B_queue.push self.events @@ E_add_data { id = span; data } - - let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = - let time_us = time_us_of_time_ns @@ time_ns in - B_queue.push self.events @@ E_message { tid; time_us; msg; data } - - let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit = - let time_us = time_us_of_time_ns @@ time_ns in - B_queue.push self.events @@ E_counter { name; n = f; time_us; tid } - - let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ - ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span - : unit = - let time_us = time_us_of_time_ns @@ time_ns in - B_queue.push self.events - @@ E_enter_manual_span - { id = trace_id; time_us; tid; data; name; fun_name; flavor } - - let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor - ~trace_id (_ : span) : unit = - let time_us = time_us_of_time_ns @@ time_ns in - B_queue.push self.events - @@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor } - - let on_extension_event _ ~time_ns:_ ~tid:_ _ev = () - end in - let events = B_queue.create () in - let t_write = - Thread.create - (fun () -> Fun.protect ~finally @@ fun () -> bg_thread ~mode ~out events) - () +let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : + Trace_subscriber.t = + let jsonl = mode = `Jsonl in + let oc, must_close = + match out with + | `Stdout -> stdout, false + | `Stderr -> stderr, false + | `File path -> open_out path, true + | `File_append path -> + open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true + | `Output oc -> oc, false + in + let pid = + if !Trace_subscriber.Private_.mock then + 2 + else + Unix.getpid () in - (* ticker thread, regularly sends a message to the writer thread. - no need to join it. *) - let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () in - let st : Internal_st.t = { active = A.make true; events; t_write } in - Sub.Subscriber.Sub { st; callbacks = (module M) } + let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in + let exporter = + { + exporter with + close = + (fun () -> + exporter.close (); + finally ()); + } + in + let sub = Subscriber.create ~pid ~exporter () in + let _t_tick : Thread.t = Thread.create tick_thread sub in + Subscriber.subscriber sub let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out () : collector = let sub = subscriber_ ~finally ~mode ~out () in - Sub.collector sub + Trace_subscriber.collector sub -let[@inline] subscriber ~out () : Sub.t = +let[@inline] subscriber ~out () : Trace_subscriber.t = subscriber_ ~finally:ignore ~mode:`Single ~out () let[@inline] collector ~out () : collector = @@ -438,14 +100,26 @@ let with_setup ?out () f = setup ?out (); Fun.protect ~finally:Trace_core.shutdown f +module Mock_ = struct + let now = ref 0 + + (* used to mock timing *) + let get_now_ns () : int64 = + let x = !now in + incr now; + Int64.(mul (of_int x) 1000L) + + let get_tid_ () : int = 3 +end + module Private_ = struct let mock_all_ () = - Sub.Private_.mock := true; - Sub.Private_.get_now_ns_ := Mock_.get_now_ns; - Sub.Private_.get_tid_ := Mock_.get_tid_; + Trace_subscriber.Private_.mock := true; + Trace_subscriber.Private_.get_now_ns_ := Mock_.get_now_ns; + Trace_subscriber.Private_.get_tid_ := Mock_.get_tid_; () - let on_tracing_error = on_tracing_error + let on_tracing_error = Subscriber.on_tracing_error let subscriber_jsonl ~finally ~out () = subscriber_ ~finally ~mode:`Jsonl ~out () diff --git a/src/tef/trace_tef.mli b/src/tef/trace_tef.mli index d5741fd..9cf8dd1 100644 --- a/src/tef/trace_tef.mli +++ b/src/tef/trace_tef.mli @@ -1,3 +1,7 @@ +module Subscriber = Subscriber +module Exporter = Exporter +module Writer = Writer + type output = [ `Stdout | `Stderr diff --git a/src/tef/writer.ml b/src/tef/writer.ml new file mode 100644 index 0000000..9865988 --- /dev/null +++ b/src/tef/writer.ml @@ -0,0 +1,97 @@ +open Common_ + +let char = Buffer.add_char +let raw_string = Buffer.add_string + +let str_val (buf : Buffer.t) (s : string) = + char buf '"'; + let encode_char c = + match c with + | '"' -> raw_string buf {|\"|} + | '\\' -> raw_string buf {|\\|} + | '\n' -> raw_string buf {|\n|} + | '\b' -> raw_string buf {|\b|} + | '\r' -> raw_string buf {|\r|} + | '\t' -> raw_string buf {|\t|} + | _ when Char.code c <= 0x1f -> + raw_string buf {|\u00|}; + Printf.bprintf buf "%02x" (Char.code c) + | c -> char buf c + in + String.iter encode_char s; + char buf '"' + +let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function + | U_none -> raw_string out "null" + | U_int i -> Printf.bprintf out "%d" i + | U_bool b -> Printf.bprintf out "%b" b + | U_string s -> str_val out s + | U_float f -> Printf.bprintf out "%g" f + +(* emit args, if not empty. [ppv] is used to print values. *) +let emit_args_o_ ppv (out : Buffer.t) args : unit = + if args <> [] then ( + Printf.bprintf out {json|,"args": {|json}; + List.iteri + (fun i (n, value) -> + if i > 0 then raw_string out ","; + Printf.bprintf out {json|"%s":%a|json} n ppv value) + args; + char out '}' + ) + +let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit = + let dur = end_ -. start in + let ts = start in + + Printf.bprintf buf + {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} + pid tid dur ts str_val name + (emit_args_o_ pp_user_data_) + args + +let emit_manual_begin ~pid ~tid ~name ~(id : int64) ~ts ~args + ~(flavor : Sub.flavor option) buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} + pid id tid ts str_val name + (match flavor with + | None | Some Async -> 'b' + | Some Sync -> 'B') + (emit_args_o_ pp_user_data_) + args + +let emit_manual_end ~pid ~tid ~name ~(id : int64) ~ts + ~(flavor : Sub.flavor option) ~args buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} + pid id tid ts str_val name + (match flavor with + | None | Some Async -> 'e' + | Some Sync -> 'E') + (emit_args_o_ pp_user_data_) + args + +let emit_instant_event ~pid ~tid ~name ~ts ~args buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} + pid tid ts str_val name + (emit_args_o_ pp_user_data_) + args + +let emit_name_thread ~pid ~tid ~name buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} pid tid + (emit_args_o_ pp_user_data_) + [ "name", U_string name ] + +let emit_name_process ~pid ~name buf : unit = + Printf.bprintf buf {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} pid + (emit_args_o_ pp_user_data_) + [ "name", U_string name ] + +let emit_counter ~pid ~tid ~name ~ts buf f : unit = + Printf.bprintf buf + {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} pid tid ts + (emit_args_o_ pp_user_data_) + [ name, U_float f ] diff --git a/src/tef/writer.mli b/src/tef/writer.mli new file mode 100644 index 0000000..d1563a7 --- /dev/null +++ b/src/tef/writer.mli @@ -0,0 +1,54 @@ +(** Write JSON events to a buffer. + + This is the part of the code that knows how to emit TEF-compliant JSON from + raw event data. *) + +open Common_ +open Trace_core + +val emit_duration_event : + pid:int -> + tid:int -> + name:string -> + start:float -> + end_:float -> + args:(string * Sub.user_data) list -> + Buffer.t -> + unit + +val emit_manual_begin : + pid:int -> + tid:int -> + name:string -> + id:span -> + ts:float -> + args:(string * Sub.user_data) list -> + flavor:Sub.flavor option -> + Buffer.t -> + unit + +val emit_manual_end : + pid:int -> + tid:int -> + name:string -> + id:span -> + ts:float -> + flavor:Sub.flavor option -> + args:(string * Sub.user_data) list -> + Buffer.t -> + unit + +val emit_instant_event : + pid:int -> + tid:int -> + name:string -> + ts:float -> + args:(string * Sub.user_data) list -> + Buffer.t -> + unit + +val emit_name_thread : pid:int -> tid:int -> name:string -> Buffer.t -> unit +val emit_name_process : pid:int -> name:string -> Buffer.t -> unit + +val emit_counter : + pid:int -> tid:int -> name:string -> ts:float -> Buffer.t -> float -> unit