diff --git a/src/tef/collector_tef.ml b/src/tef/collector_tef.ml new file mode 100644 index 0000000..fd390b0 --- /dev/null +++ b/src/tef/collector_tef.ml @@ -0,0 +1,195 @@ +open Common_ +open Types +open Trace_core + +module Buf_pool = struct + type t = Buffer.t Trace_util.Rpool.t + + let create ?(max_size = 32) ?(buf_size = 256) () : t = + Trace_util.Rpool.create ~max_size ~clear:Buffer.reset + ~create:(fun () -> Buffer.create buf_size) + () +end + +open struct + let[@inline] time_us_of_time_ns (t : int64) : float = + Int64.div t 1_000L |> Int64.to_float +end + +let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s) + +(* +type span_info = { + tid: int; + name: string; + start_us: float; + mutable data: (string * user_data) list; + (* NOTE: thread safety: this is supposed to only be modified by the thread +that's running this (synchronous, stack-abiding) span. *) +} +(** Information we store about a span begin event, to emit a complete event when + we meet the corresponding span end event *) +*) + +type t = { + active: bool A.t; + pid: int; + buf_pool: Buf_pool.t; + exporter: Exporter.t; + span_id_gen: Trace_util.Span_id64.Gen.t; + trace_id_gen: Trace_util.Trace_id64.Gen.t; +} +(** Subscriber state *) + +(* TODO: this is nice to have, can we make it optional? +open struct + let print_non_closed_spans_warning spans = + let module Str_set = Set.Make (String) in + let spans = Span_tbl.to_list spans in + if spans <> [] then ( + !on_tracing_error + @@ Printf.sprintf "trace-tef: warning: %d spans were not closed" + (List.length spans); + let names = + List.fold_left + (fun set (_, span) -> Str_set.add span.name set) + Str_set.empty spans + in + Str_set.iter + (fun name -> + !on_tracing_error @@ Printf.sprintf " span %S was not closed" name) + names; + flush stderr + ) +end +*) + +let close (self : t) : unit = + if A.exchange self.active false then + (* FIXME: print_non_closed_spans_warning self.spans; *) + self.exporter.close () + +let[@inline] active self = A.get self.active +let[@inline] flush (self : t) : unit = self.exporter.flush () + +let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = + { + active = A.make true; + exporter; + buf_pool; + pid; + span_id_gen = Trace_util.Span_id64.Gen.create (); + trace_id_gen = Trace_util.Trace_id64.Gen.create (); + } + +open struct + type st = t + + let rec flavor_of_params = function + | [] -> `Sync + | Core_ext.Extension_span_flavor f :: _ -> f + | _ :: tl -> flavor_of_params tl + + let new_span_id (self : st) = Trace_util.Span_id64.Gen.gen self.span_id_gen + let new_trace_id (self : st) = Trace_util.Trace_id64.Gen.gen self.trace_id_gen + let init _ = () + let shutdown (self : st) = close self + + (* add function name, if provided, to the metadata *) + let add_fun_name_ fun_name data : _ list = + match fun_name with + | None -> data + | Some f -> ("function", `String f) :: data + + let enter_span (self : st) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~params ~data + ~parent name : span = + let start_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in + let flavor = flavor_of_params params in + + let pid = self.pid in + let tid = Trace_util.Mock_.get_tid () in + match flavor with + | `Sync -> Span_tef_sync { name; pid; tid; args = data; start_us } + | `Async -> + let trace_id = + match parent with + | P_some (Span_tef_async sp) -> sp.trace_id + | _ -> new_trace_id self + in + + (let@ buf = Trace_util.Rpool.with_ self.buf_pool in + Writer.emit_begin_async buf ~name ~pid ~tid ~trace_id ~ts:start_us + ~args:data; + self.exporter.on_json buf); + + Span_tef_async { pid; tid; trace_id; name; args = data } + + let exit_span (self : st) sp = + let end_time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in + + let@ buf = Trace_util.Rpool.with_ self.buf_pool in + let did_write = + match sp with + | Span_tef_sync { name; pid; tid; args; start_us } -> + (* emit full event *) + Writer.emit_duration_event buf ~pid ~tid ~name ~start:start_us + ~end_:end_time_us ~args; + true + | Span_tef_async { name; trace_id; pid; tid; args } -> + Writer.emit_end_async buf ~pid ~tid ~name ~trace_id ~ts:end_time_us + ~args; + true + | _ -> false + in + + if did_write then self.exporter.on_json buf + + let message (self : st) ~params:_ ~data ~span:_ msg : unit = + let tid = Trace_util.Mock_.get_tid () in + let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in + let@ buf = Trace_util.Rpool.with_ self.buf_pool in + Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us + ~args:data; + self.exporter.on_json buf + + let counter_float (self : st) ~params:_ ~data:_ name n : unit = + let tid = Trace_util.Mock_.get_tid () in + let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in + let@ buf = Trace_util.Rpool.with_ self.buf_pool in + Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n; + self.exporter.on_json buf + + let counter_int (self : st) ~params ~data name n : unit = + counter_float self ~params ~data name (float_of_int n) + + let add_data_to_span _st sp data = + match sp with + | Span_tef_sync sp -> sp.args <- List.rev_append data sp.args + | Span_tef_async sp -> sp.args <- List.rev_append data sp.args + | _ -> () + + let on_name_thread_ (self : st) ~tid name : unit = + let@ buf = Trace_util.Rpool.with_ self.buf_pool in + Writer.emit_name_thread buf ~pid:self.pid ~tid ~name; + self.exporter.on_json buf + + let on_name_process_ (self : st) name : unit = + let@ buf = Trace_util.Rpool.with_ self.buf_pool in + Writer.emit_name_process ~pid:self.pid ~name buf; + self.exporter.on_json buf + + let extension (self : st) ev = + match ev with + | Core_ext.Extension_set_thread_name name -> + let tid = Trace_util.Mock_.get_tid () in + on_name_thread_ self ~tid name + | Core_ext.Extension_set_process_name name -> on_name_process_ self name + | _ -> () +end + +let callbacks_collector : _ Collector.Callbacks.t = + Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span ~message + ~add_data_to_span ~counter_int ~counter_float ~extension () + +let collector (self : t) : Collector.t = + Collector.C_some (self, callbacks_collector) diff --git a/src/tef/subscriber.mli b/src/tef/collector_tef.mli similarity index 74% rename from src/tef/subscriber.mli rename to src/tef/collector_tef.mli index 1cc1af3..1d94a5e 100644 --- a/src/tef/subscriber.mli +++ b/src/tef/collector_tef.mli @@ -1,3 +1,4 @@ +open Trace_core open Common_ module Buf_pool : sig @@ -7,19 +8,19 @@ module Buf_pool : sig end type t -(** Main subscriber state. *) +(** Main state. *) val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t -(** Create a subscriber state. *) +(** Create a fresh state. *) val flush : t -> unit val close : t -> unit val active : t -> bool -val sub_callbacks : t Sub.Callbacks.t +val callbacks_collector : t Collector.Callbacks.t (** Callbacks used for the subscriber *) -val subscriber : t -> Sub.t +val collector : t -> Collector.t (** Subscriber that writes json into this writer *) (**/**) diff --git a/src/tef/common_.ml b/src/tef/common_.ml index 2148902..169190b 100644 --- a/src/tef/common_.ml +++ b/src/tef/common_.ml @@ -1,6 +1,3 @@ -module Sub = Trace_subscriber module A = Trace_core.Internal_.Atomic_ let ( let@ ) = ( @@ ) - -type span_id = Sub.Span_sub.span_id diff --git a/src/tef/dune b/src/tef/dune index 2e036f9..2f3b5a8 100644 --- a/src/tef/dune +++ b/src/tef/dune @@ -3,11 +3,4 @@ (public_name trace-tef) (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") - (libraries - trace.core - trace.private.util - trace.subscriber - mtime - mtime.clock.os - unix - threads)) + (libraries trace.core trace.util mtime mtime.clock.os unix threads)) diff --git a/src/tef/emit_tef.ml b/src/tef/emit_tef.ml deleted file mode 100644 index e69de29..0000000 diff --git a/src/tef/exporter.ml b/src/tef/exporter.ml index 561d80d..8507c0e 100644 --- a/src/tef/exporter.ml +++ b/src/tef/exporter.ml @@ -1,17 +1,10 @@ -(** An exporter, takes JSON objects and writes them somewhere *) - open Common_ type t = { on_json: Buffer.t -> unit; - (** Takes a buffer and writes it somewhere. The buffer is only valid - during this call and must not be stored. *) - flush: unit -> unit; (** Force write *) - close: unit -> unit; (** Close underlying resources *) + flush: unit -> unit; + close: unit -> unit; } -(** An exporter, takes JSON objects and writes them somewhere. - - This should be thread-safe if used in a threaded environment. *) open struct let with_lock lock f = @@ -26,11 +19,6 @@ open struct Printexc.raise_with_backtrace e bt end -(** Export to the channel - @param jsonl - if true, export as a JSON object per line, otherwise export as a single - big JSON array. - @param close_channel if true, closing the exporter will close the channel *) let of_out_channel ~close_channel ~jsonl oc : t = let lock = Mutex.create () in let first = ref true in diff --git a/src/tef/exporter.mli b/src/tef/exporter.mli new file mode 100644 index 0000000..6414f9c --- /dev/null +++ b/src/tef/exporter.mli @@ -0,0 +1,22 @@ +(** An exporter, takes JSON objects and writes them somewhere *) + +type t = { + on_json: Buffer.t -> unit; + (** Takes a buffer and writes it somewhere. The buffer is only valid + during this call and must not be stored. *) + flush: unit -> unit; (** Force write *) + close: unit -> unit; (** Close underlying resources *) +} +(** An exporter, takes JSON objects and writes them somewhere. + + This should be thread-safe if used in a threaded environment. *) + +val of_out_channel : close_channel:bool -> jsonl:bool -> out_channel -> t +(** Export to the channel + @param jsonl + if true, export as a JSON object per line, otherwise export as a single + big JSON array. + @param close_channel if true, closing the exporter will close the channel *) + +val of_buffer : jsonl:bool -> Buffer.t -> t +(** Emit into the buffer *) diff --git a/src/tef/subscriber.ml b/src/tef/subscriber.ml deleted file mode 100644 index 886a255..0000000 --- a/src/tef/subscriber.ml +++ /dev/null @@ -1,164 +0,0 @@ -open Common_ -open Trace_core -open Trace_private_util -module Span_sub = Sub.Span_sub - -module Buf_pool = struct - type t = Buffer.t Rpool.t - - let create ?(max_size = 32) ?(buf_size = 256) () : t = - Rpool.create ~max_size ~clear:Buffer.reset - ~create:(fun () -> Buffer.create buf_size) - () -end - -open struct - let[@inline] time_us_of_time_ns (t : int64) : float = - Int64.div t 1_000L |> Int64.to_float -end - -let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s) - -(* -type span_info = { - tid: int; - name: string; - start_us: float; - mutable data: (string * user_data) list; - (* NOTE: thread safety: this is supposed to only be modified by the thread -that's running this (synchronous, stack-abiding) span. *) -} -(** Information we store about a span begin event, to emit a complete event when - we meet the corresponding span end event *) -*) - -type t = { - active: bool A.t; - pid: int; - buf_pool: Buf_pool.t; - exporter: Exporter.t; - span_id_gen: Sub.Span_id_generator.t; - trace_id_gen: Sub.Trace_id_generator.t; -} -(** Subscriber state *) - -(* TODO: this is nice to have, can we make it optional? -open struct - let print_non_closed_spans_warning spans = - let module Str_set = Set.Make (String) in - let spans = Span_tbl.to_list spans in - if spans <> [] then ( - !on_tracing_error - @@ Printf.sprintf "trace-tef: warning: %d spans were not closed" - (List.length spans); - let names = - List.fold_left - (fun set (_, span) -> Str_set.add span.name set) - Str_set.empty spans - in - Str_set.iter - (fun name -> - !on_tracing_error @@ Printf.sprintf " span %S was not closed" name) - names; - flush stderr - ) -end -*) - -let close (self : t) : unit = - if A.exchange self.active false then - (* FIXME: print_non_closed_spans_warning self.spans; *) - self.exporter.close () - -let[@inline] active self = A.get self.active -let[@inline] flush (self : t) : unit = self.exporter.flush () - -let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = - { - active = A.make true; - exporter; - buf_pool; - pid; - span_id_gen = Sub.Span_id_generator.create (); - trace_id_gen = Sub.Trace_id_generator.create (); - } - -open struct - type st = t - - let new_span_id (self : st) = Sub.Span_id_generator.gen self.span_id_gen - let new_trace_id (self : st) = Sub.Trace_id_generator.gen self.trace_id_gen - let on_init _ ~time_ns:_ = () - let on_shutdown (self : st) ~time_ns:_ = close self - - (* add function name, if provided, to the metadata *) - let add_fun_name_ fun_name data : _ list = - match fun_name with - | None -> data - | Some f -> ("function", `String f) :: data - - let on_enter_span (self : st) (span : Span_sub.t) : unit = - match span.flavor with - | `Async -> - let { Span_sub.time_ns; data; tid; name; _ } = span in - let time_us = time_us_of_time_ns time_ns in - let data = add_fun_name_ span.__FUNCTION__ data in - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_begin buf ~pid:self.pid ~tid ~name ~id:span.trace_id - ~ts:time_us ~args:data ~flavor:span.flavor; - self.exporter.on_json buf - | `Sync -> () (* done at exit *) - - let on_exit_span (self : st) ~time_ns:exit_time_ns ~tid:_ span : unit = - let { Span_sub.data; tid; flavor; name; _ } = span in - let exit_time_us = time_us_of_time_ns exit_time_ns in - - let@ buf = Rpool.with_ self.buf_pool in - (match flavor with - | `Sync -> - (* emit full event *) - let start_time_us = time_us_of_time_ns span.time_ns in - Writer.emit_duration_event buf ~pid:self.pid ~tid ~name - ~start:start_time_us ~end_:exit_time_us ~args:data - | `Async -> - Writer.emit_end buf ~pid:self.pid ~tid ~name ~id:span.trace_id - ~ts:exit_time_us ~flavor ~args:data); - - self.exporter.on_json buf - - let on_message (self : st) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit = - let time_us = time_us_of_time_ns @@ time_ns in - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us - ~args:data; - self.exporter.on_json buf - - let on_counter (self : st) ~time_ns ~tid ~params:_ ~data:_ ~name n : unit = - let time_us = time_us_of_time_ns @@ time_ns in - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n; - self.exporter.on_json buf - - let on_name_thread_ (self : st) ~tid name : unit = - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_name_thread buf ~pid:self.pid ~tid ~name; - self.exporter.on_json buf - - let on_name_process_ (self : st) name : unit = - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_name_process ~pid:self.pid ~name buf; - self.exporter.on_json buf - - let on_extension_event (self : st) ~time_ns:_ ~tid ev = - match ev with - | Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name - | Core_ext.Extension_set_process_name name -> on_name_process_ self name - | _ -> () -end - -let sub_callbacks : _ Sub.Callbacks.t = - Sub.Callbacks.make ~new_span_id ~new_trace_id ~on_init ~on_shutdown - ~on_enter_span ~on_exit_span ~on_message ~on_counter ~on_extension_event () - -let subscriber (self : t) : Sub.t = - Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks } diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index 39b9c06..a3935df 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -1,7 +1,8 @@ open Trace_core -module Subscriber = Subscriber +module Collector_tef = Collector_tef module Exporter = Exporter module Writer = Writer +module Types = Types let block_signals () = try @@ -21,11 +22,11 @@ let block_signals () = (** Thread that simply regularly "ticks", sending events to the background thread so it has a chance to write to the file *) -let tick_thread (sub : Subscriber.t) : unit = +let tick_thread (c : Collector_tef.t) : unit = block_signals (); - while Subscriber.active sub do + while Collector_tef.active c do Thread.delay 0.5; - Subscriber.flush sub + Collector_tef.flush c done type output = @@ -34,8 +35,8 @@ type output = | `File of string ] -let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : - Trace_subscriber.t = +let collector_ ~(finally : unit -> unit) ~out ~(mode : [ `Single | `Jsonl ]) () + : Collector.t = let jsonl = mode = `Jsonl in let oc, must_close = match out with @@ -46,12 +47,7 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true | `Output oc -> oc, false in - let pid = - if !Trace_subscriber.Private_.mock then - 2 - else - Unix.getpid () - in + let pid = Trace_util.Mock_.get_pid () in let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in let exporter = @@ -63,17 +59,9 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : finally ()); } in - let sub = Subscriber.create ~pid ~exporter () in - let _t_tick : Thread.t = Thread.create tick_thread sub in - Subscriber.subscriber sub - -let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out () - : collector = - let sub = subscriber_ ~finally ~mode ~out () in - Trace_subscriber.collector sub - -let[@inline] subscriber ~out () : Trace_subscriber.t = - subscriber_ ~finally:ignore ~mode:`Single ~out () + let coll_st = Collector_tef.create ~pid ~exporter () in + let _t_tick : Thread.t = Thread.create tick_thread coll_st in + Collector_tef.collector coll_st let[@inline] collector ~out () : collector = collector_ ~finally:ignore ~mode:`Single ~out () @@ -111,29 +99,12 @@ let with_setup ?out () f = setup ?out (); Fun.protect ~finally:Trace_core.shutdown f -module Mock_ = struct - let now = ref 0 - - (* used to mock timing *) - let get_now_ns () : int64 = - let x = !now in - incr now; - Int64.(mul (of_int x) 1000L) - - let get_tid_ () : int = 3 -end - module Private_ = struct let mock_all_ () = - Trace_subscriber.Private_.mock := true; - Trace_subscriber.Private_.get_now_ns_ := Mock_.get_now_ns; - Trace_subscriber.Private_.get_tid_ := Mock_.get_tid_; + Trace_util.Mock_.mock_all (); () - let on_tracing_error = Subscriber.on_tracing_error - - let subscriber_jsonl ~finally ~out () = - subscriber_ ~finally ~mode:`Jsonl ~out () + let on_tracing_error = Collector_tef.on_tracing_error let collector_jsonl ~finally ~out () : collector = collector_ ~finally ~mode:`Jsonl ~out () diff --git a/src/tef/trace_tef.mli b/src/tef/trace_tef.mli index 9cf8dd1..d9cfb16 100644 --- a/src/tef/trace_tef.mli +++ b/src/tef/trace_tef.mli @@ -1,6 +1,7 @@ -module Subscriber = Subscriber +module Collector_tef = Collector_tef module Exporter = Exporter module Writer = Writer +module Types = Types type output = [ `Stdout @@ -14,10 +15,6 @@ type output = - [`File "foo"] will enable tracing and print events into file named "foo" *) -val subscriber : out:[< output ] -> unit -> Trace_subscriber.t -(** A subscriber emitting TEF traces into [out]. - @since 0.8 *) - val collector : out:[< output ] -> unit -> Trace_core.collector (** Make a collector that writes into the given output. See {!setup} for more details. *) @@ -48,12 +45,6 @@ module Private_ : sig val on_tracing_error : (string -> unit) ref - val subscriber_jsonl : - finally:(unit -> unit) -> - out:[ `File_append of string | `Output of out_channel ] -> - unit -> - Trace_subscriber.t - val collector_jsonl : finally:(unit -> unit) -> out:[ `File_append of string | `Output of out_channel ] -> diff --git a/src/tef/types.ml b/src/tef/types.ml new file mode 100644 index 0000000..535e6d4 --- /dev/null +++ b/src/tef/types.ml @@ -0,0 +1,20 @@ +open Trace_core +module Trace_id = Trace_util.Trace_id64 + +type trace_id = Trace_id.t + +type Trace_core.span += + | Span_tef_sync of { + pid: int; + tid: int; + name: string; + start_us: float; + mutable args: (string * Trace_core.user_data) list; + } + | Span_tef_async of { + pid: int; + tid: int; + name: string; + trace_id: trace_id; + mutable args: (string * Trace_core.user_data) list; + } diff --git a/src/tef/writer.ml b/src/tef/writer.ml index 85d9bbe..ea2b75b 100644 --- a/src/tef/writer.ml +++ b/src/tef/writer.ml @@ -48,23 +48,17 @@ let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit = (emit_args_o_ pp_user_data_) args -let emit_begin ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit = +let emit_begin_async ~pid ~tid ~name ~trace_id ~ts ~args buf = Printf.bprintf buf {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - pid id tid ts str_val name - (match flavor with - | `Async -> 'b' - | `Sync -> 'B') + pid trace_id tid ts str_val name 'b' (emit_args_o_ pp_user_data_) args -let emit_end ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit = +let emit_end_async ~pid ~tid ~name ~(trace_id : int64) ~ts ~args buf : unit = Printf.bprintf buf {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - pid id tid ts str_val name - (match flavor with - | `Async -> 'e' - | `Sync -> 'E') + pid trace_id tid ts str_val name 'e' (emit_args_o_ pp_user_data_) args diff --git a/src/tef/writer.mli b/src/tef/writer.mli index 465545c..bb53767 100644 --- a/src/tef/writer.mli +++ b/src/tef/writer.mli @@ -4,6 +4,7 @@ raw event data. *) open Common_ +open Types val emit_duration_event : pid:int -> @@ -15,25 +16,23 @@ val emit_duration_event : Buffer.t -> unit -val emit_begin : +val emit_begin_async : pid:int -> tid:int -> name:string -> - id:span_id -> + trace_id:trace_id -> ts:float -> args:(string * Trace_core.user_data) list -> - flavor:[ `Sync | `Async ] -> Buffer.t -> unit -val emit_end : +val emit_end_async : pid:int -> tid:int -> name:string -> - id:span_id -> + trace_id:trace_id -> ts:float -> args:(string * Trace_core.user_data) list -> - flavor:[ `Sync | `Async ] -> Buffer.t -> unit