diff --git a/src/tef/common_.ml b/src/tef/common_.ml index 114d81c..2148902 100644 --- a/src/tef/common_.ml +++ b/src/tef/common_.ml @@ -2,3 +2,5 @@ module Sub = Trace_subscriber module A = Trace_core.Internal_.Atomic_ let ( let@ ) = ( @@ ) + +type span_id = Sub.Span_sub.span_id diff --git a/src/tef/subscriber.ml b/src/tef/subscriber.ml index b90a6a3..d3d99c6 100644 --- a/src/tef/subscriber.ml +++ b/src/tef/subscriber.ml @@ -1,7 +1,7 @@ open Common_ open Trace_core open Trace_private_util -module Span_tbl = Sub.Span_tbl +module Span_sub = Sub.Span_sub module Buf_pool = struct type t = Buffer.t Rpool.t @@ -15,16 +15,11 @@ end open struct let[@inline] time_us_of_time_ns (t : int64) : float = Int64.div t 1_000L |> Int64.to_float - - let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = - if id == Trace_core.Collector.dummy_trace_id then - 0L - else - Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 end let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s) +(* type span_info = { tid: int; name: string; @@ -35,18 +30,18 @@ that's running this (synchronous, stack-abiding) span. *) } (** Information we store about a span begin event, to emit a complete event when we meet the corresponding span end event *) +*) type t = { active: bool A.t; pid: int; - spans: span_info Span_tbl.t; buf_pool: Buf_pool.t; exporter: Exporter.t; span_gen: Sub.Span_generator.t; - trace_id_gen: Sub.Trace_id_8B_generator.t; } (** Subscriber state *) +(* TODO: this is nice to have, can we make it optional? open struct let print_non_closed_spans_warning spans = let module Str_set = Set.Make (String) in @@ -67,12 +62,12 @@ open struct flush stderr ) end +*) let close (self : t) : unit = - if A.exchange self.active false then ( - print_non_closed_spans_warning self.spans; + if A.exchange self.active false then + (* FIXME: print_non_closed_spans_warning self.spans; *) self.exporter.close () - ) let[@inline] active self = A.get self.active let[@inline] flush (self : t) : unit = self.exporter.flush () @@ -83,108 +78,84 @@ let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = exporter; buf_pool; pid; - spans = Span_tbl.create (); span_gen = Sub.Span_generator.create (); - trace_id_gen = Sub.Trace_id_8B_generator.create (); } -module Callbacks = struct +open struct type st = t - let new_span (self : st) = Sub.Span_generator.mk_span self.span_gen - - let new_trace_id self = - Sub.Trace_id_8B_generator.mk_trace_id self.trace_id_gen - + let new_span_id (self : st) = Sub.Span_generator.mk_span self.span_gen let on_init _ ~time_ns:_ = () let on_shutdown (self : st) ~time_ns:_ = close self - let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_name_process ~pid:self.pid ~name buf; - self.exporter.on_json buf - - let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = - let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_name_thread buf ~pid:self.pid ~tid ~name; - self.exporter.on_json buf - (* add function name, if provided, to the metadata *) let add_fun_name_ fun_name data : _ list = match fun_name with | None -> data | Some f -> ("function", `String f) :: data - let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ - ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = - let time_us = time_us_of_time_ns @@ time_ns in - let data = add_fun_name_ fun_name data in - let info = { tid; name; start_us = time_us; data } in - (* save the span so we find it at exit *) - Span_tbl.add self.spans span info - - let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = - let time_us = time_us_of_time_ns @@ time_ns in - - match Span_tbl.find_exn self.spans span with - | exception Not_found -> - !on_tracing_error - (Printf.sprintf "trace-tef: error: cannot find span %Ld" span) - | { tid; name; start_us; data } -> - Span_tbl.remove self.spans span; + let on_enter_span (self : st) (span : Span_sub.t) : unit = + match span.flavor with + | `Async -> + let { Span_sub.time_ns; data; tid; name; _ } = span in + let time_us = time_us_of_time_ns time_ns in + let data = add_fun_name_ span.__FUNCTION__ data in let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_duration_event buf ~pid:self.pid ~tid ~name ~start:start_us - ~end_:time_us ~args:data; - + Writer.emit_begin buf ~pid:self.pid ~tid ~name ~id:42L ~ts:time_us + ~args:data ~flavor:span.flavor; self.exporter.on_json buf + | `Sync -> () (* done at exit *) - let on_add_data (self : st) ~data span = - if data <> [] then ( - try - let info = Span_tbl.find_exn self.spans span in - info.data <- List.rev_append data info.data - with Not_found -> - !on_tracing_error - (Printf.sprintf "trace-tef: error: cannot find span %Ld" span) - ) + let on_exit_span (self : st) ~time_ns:exit_time_ns ~tid:_ span : unit = + let { Span_sub.data; tid; flavor; name; _ } = span in + let exit_time_us = time_us_of_time_ns exit_time_ns in - let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = + let@ buf = Rpool.with_ self.buf_pool in + (match flavor with + | `Sync -> + (* emit full event *) + let start_time_us = time_us_of_time_ns span.time_ns in + Writer.emit_duration_event buf ~pid:self.pid ~tid ~name + ~start:start_time_us ~end_:exit_time_us ~args:data + | `Async -> + Writer.emit_end buf ~pid:self.pid ~tid ~name ~id:42L ~ts:exit_time_us + ~flavor ~args:data); + + self.exporter.on_json buf + + let on_message (self : st) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit = let time_us = time_us_of_time_ns @@ time_ns in let@ buf = Rpool.with_ self.buf_pool in Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us ~args:data; self.exporter.on_json buf - let on_counter (self : st) ~time_ns ~tid ~data:_ ~name n : unit = + let on_counter (self : st) ~time_ns ~tid ~params:_ ~data:_ ~name n : unit = let time_us = time_us_of_time_ns @@ time_ns in let@ buf = Rpool.with_ self.buf_pool in Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n; self.exporter.on_json buf - let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ - ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span : - unit = - let time_us = time_us_of_time_ns @@ time_ns in - - let data = add_fun_name_ fun_name data in + let on_name_thread_ (self : st) ~tid name : unit = let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_manual_begin buf ~pid:self.pid ~tid ~name - ~id:(int64_of_trace_id_ trace_id) - ~ts:time_us ~args:data ~flavor; + Writer.emit_name_thread buf ~pid:self.pid ~tid ~name; self.exporter.on_json buf - let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor - ~trace_id (_ : span) : unit = - let time_us = time_us_of_time_ns @@ time_ns in - + let on_name_process_ (self : st) name : unit = let@ buf = Rpool.with_ self.buf_pool in - Writer.emit_manual_end buf ~pid:self.pid ~tid ~name - ~id:(int64_of_trace_id_ trace_id) - ~ts:time_us ~flavor ~args:data; + Writer.emit_name_process ~pid:self.pid ~name buf; self.exporter.on_json buf - let on_extension_event _ ~time_ns:_ ~tid:_ _ev = () + let on_extension_event (self : st) ~time_ns:_ ~tid ev = + match ev with + | Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name + | Core_ext.Extension_set_process_name name -> on_name_process_ self name + | _ -> () end +let sub_callbacks : _ Sub.Callbacks.t = + Sub.Callbacks.make ~new_span_id ~on_init ~on_shutdown ~on_enter_span + ~on_exit_span ~on_message ~on_counter ~on_extension_event () + let subscriber (self : t) : Sub.t = - Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) } + Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks } diff --git a/src/tef/subscriber.mli b/src/tef/subscriber.mli index 9f2f235..1cc1af3 100644 --- a/src/tef/subscriber.mli +++ b/src/tef/subscriber.mli @@ -16,7 +16,8 @@ val flush : t -> unit val close : t -> unit val active : t -> bool -module Callbacks : Sub.Callbacks.S with type st = t +val sub_callbacks : t Sub.Callbacks.t +(** Callbacks used for the subscriber *) val subscriber : t -> Sub.t (** Subscriber that writes json into this writer *) diff --git a/src/tef/writer.ml b/src/tef/writer.ml index 26286c3..e84b4d8 100644 --- a/src/tef/writer.ml +++ b/src/tef/writer.ml @@ -50,25 +50,23 @@ let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit = (emit_args_o_ pp_user_data_) args -let emit_manual_begin ~pid ~tid ~name ~(id : int64) ~ts ~args - ~(flavor : Trace_core.span_flavor option) buf : unit = +let emit_begin ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit = Printf.bprintf buf {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} pid id tid ts str_val name (match flavor with - | None | Some `Async -> 'b' - | Some `Sync -> 'B') + | `Async -> 'b' + | `Sync -> 'B') (emit_args_o_ pp_user_data_) args -let emit_manual_end ~pid ~tid ~name ~(id : int64) ~ts - ~(flavor : Trace_core.span_flavor option) ~args buf : unit = +let emit_end ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit = Printf.bprintf buf {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} pid id tid ts str_val name (match flavor with - | None | Some `Async -> 'e' - | Some `Sync -> 'E') + | `Async -> 'e' + | `Sync -> 'E') (emit_args_o_ pp_user_data_) args diff --git a/src/tef/writer.mli b/src/tef/writer.mli index 536b602..465545c 100644 --- a/src/tef/writer.mli +++ b/src/tef/writer.mli @@ -4,7 +4,6 @@ raw event data. *) open Common_ -open Trace_core val emit_duration_event : pid:int -> @@ -16,25 +15,25 @@ val emit_duration_event : Buffer.t -> unit -val emit_manual_begin : +val emit_begin : pid:int -> tid:int -> name:string -> - id:span -> + id:span_id -> ts:float -> args:(string * Trace_core.user_data) list -> - flavor:Trace_core.span_flavor option -> + flavor:[ `Sync | `Async ] -> Buffer.t -> unit -val emit_manual_end : +val emit_end : pid:int -> tid:int -> name:string -> - id:span -> + id:span_id -> ts:float -> - flavor:Trace_core.span_flavor option -> args:(string * Trace_core.user_data) list -> + flavor:[ `Sync | `Async ] -> Buffer.t -> unit