diff --git a/src/fuchsia/subscriber.ml b/src/fuchsia/subscriber.ml index 768ae46..94b64e9 100644 --- a/src/fuchsia/subscriber.ml +++ b/src/fuchsia/subscriber.ml @@ -1,28 +1,14 @@ open Common_ open Trace_core -module Span_tbl = Trace_subscriber.Span_tbl let on_tracing_error = on_tracing_error -type span_info = { - tid: int; - name: string; - start_ns: int64; - mutable data: (string * user_data) list; - (* NOTE: thread safety: this is supposed to only be modified by the thread - that's running this (synchronous, stack-abiding) span. *) -} -(** Information we store about a span begin event, to emit a complete event when - we meet the corresponding span end event *) - type t = { active: bool A.t; pid: int; - spans: span_info Span_tbl.t; buf_chain: Buf_chain.t; exporter: Exporter.t; span_gen: Sub.Span_generator.t; - trace_id_gen: Sub.Trace_id_8B_generator.t; } (** Subscriber state *) @@ -32,6 +18,7 @@ open struct if Buf_chain.has_ready self.buf_chain then Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs + (* TODO: nice to have, can we make it optional? let print_non_closed_spans_warning spans = let module Str_set = Set.Make (String) in let spans = Span_tbl.to_list spans in @@ -49,15 +36,14 @@ open struct names; flush stderr ) + *) end let close (self : t) : unit = if A.exchange self.active false then ( Buf_chain.ready_all_non_empty self.buf_chain; write_ready_ self; - self.exporter.close (); - - print_non_closed_spans_warning self.spans + self.exporter.close () (* TODO: print_non_closed_spans_warning self.spans *) ) let[@inline] active self = A.get self.active @@ -74,20 +60,13 @@ let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = buf_chain; exporter; pid; - spans = Span_tbl.create (); span_gen = Sub.Span_generator.create (); - trace_id_gen = Sub.Trace_id_8B_generator.create (); } -module Callbacks = struct - type st = t +open struct + let new_span_id (self : t) = Sub.Span_generator.mk_span self.span_gen - let new_span (self : st) = Sub.Span_generator.mk_span self.span_gen - - let new_trace_id self = - Sub.Trace_id_8B_generator.mk_trace_id self.trace_id_gen - - let on_init (self : st) ~time_ns:_ = + let on_init (self : t) ~time_ns:_ = Writer.Metadata.Magic_record.encode self.buf_chain; Writer.Metadata.Initialization_record.( encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ()); @@ -98,14 +77,14 @@ module Callbacks = struct write_ready_ self - let on_shutdown (self : st) ~time_ns:_ = close self + let on_shutdown (self : t) ~time_ns:_ = close self - let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = + let on_name_process_ (self : t) name : unit = Writer.Kernel_object.( encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ()); write_ready_ self - let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = + let on_name_thread_ (self : t) ~tid name : unit = Writer.Kernel_object.( encode self.buf_chain ~name ~ty:ty_thread ~kid:tid ~args:[ "process", A_kid (Int64.of_int self.pid) ] @@ -118,43 +97,49 @@ module Callbacks = struct | None -> data | Some f -> ("function", `String f) :: data - let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ - ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = - let data = add_fun_name_ fun_name data in - let info = { tid; name; start_ns = time_ns; data } in - (* save the span so we find it at exit *) - Span_tbl.add self.spans span info + let on_enter_span (self : t) (span : Sub.Span_sub.t) : unit = + let { Sub.Span_sub.data; name; tid; time_ns; flavor; _ } = span in + match flavor with + | `Sync -> () + | `Async -> + let data = add_fun_name_ span.__FUNCTION__ data in + Writer.( + Event.Async_begin.encode self.buf_chain ~name + ~args:(args_of_user_data data) + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~time_ns ~async_id:0L ()); + write_ready_ self - let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = - match Span_tbl.find_exn self.spans span with - | exception Not_found -> - !on_tracing_error (Printf.sprintf "cannot find span %Ld" span) - | { tid; name; start_ns; data } -> - Span_tbl.remove self.spans span; + let on_exit_span (self : t) ~time_ns:end_time_ns ~tid:_ + (span : Sub.Span_sub.t) : unit = + let { Sub.Span_sub.tid; name; flavor; time_ns = start_ns; data; _ } = + span + in + let data = add_fun_name_ span.__FUNCTION__ data in + + match flavor with + | `Sync -> Writer.( Event.Duration_complete.encode self.buf_chain ~name ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) - ~time_ns:start_ns ~end_time_ns:time_ns ~args:(args_of_user_data data) - ()); + ~time_ns:start_ns ~end_time_ns ~args:(args_of_user_data data) ()); + write_ready_ self + | `Async -> + Writer.( + Event.Async_end.encode self.buf_chain ~name + ~args:(args_of_user_data data) + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~time_ns:end_time_ns ~async_id:0L ()); write_ready_ self - let on_add_data (self : st) ~data span = - if data <> [] then ( - try - let info = Span_tbl.find_exn self.spans span in - info.data <- List.rev_append data info.data - with Not_found -> - !on_tracing_error (Printf.sprintf "cannot find span %Ld" span) - ) - - let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = + let on_message (self : t) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit = Writer.( Event.Instant.encode self.buf_chain ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) ~name:msg ~time_ns ~args:(args_of_user_data data) ()); write_ready_ self - let on_counter (self : st) ~time_ns ~tid ~data ~name n : unit = + let on_counter (self : t) ~time_ns ~tid ~params:_ ~data ~name n : unit = Writer.( Event.Counter.encode self.buf_chain ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) @@ -163,25 +148,16 @@ module Callbacks = struct ()); write_ready_ self - let on_enter_manual_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ - ~time_ns ~tid ~parent:_ ~data ~name ~flavor:_ ~trace_id _span : unit = - Writer.( - Event.Async_begin.encode self.buf_chain ~name - ~args:(args_of_user_data data) - ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) - ~time_ns ~async_id:trace_id ()); - write_ready_ self - - let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor:_ - ~trace_id (_ : span) : unit = - Writer.( - Event.Async_end.encode self.buf_chain ~name ~args:(args_of_user_data data) - ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) - ~time_ns ~async_id:trace_id ()); - write_ready_ self - - let on_extension_event _ ~time_ns:_ ~tid:_ _ev = () + let on_extension_event (self : t) ~time_ns:_ ~tid ev = + match ev with + | Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name + | Core_ext.Extension_set_process_name name -> on_name_process_ self name + | _ -> () end +let sub_callbacks : _ Sub.Callbacks.t = + Sub.Callbacks.make ~new_span_id ~on_init ~on_shutdown ~on_enter_span + ~on_exit_span ~on_message ~on_counter ~on_extension_event () + let subscriber (self : t) : Sub.t = - Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) } + Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks } diff --git a/src/fuchsia/subscriber.mli b/src/fuchsia/subscriber.mli index 66318f5..8eaf791 100644 --- a/src/fuchsia/subscriber.mli +++ b/src/fuchsia/subscriber.mli @@ -7,8 +7,7 @@ val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t val flush : t -> unit val close : t -> unit val active : t -> bool - -module Callbacks : Trace_subscriber.Callbacks.S with type st = t +val sub_callbacks : t Trace_subscriber.Callbacks.t val subscriber : t -> Trace_subscriber.t (** Subscriber that writes json into this writer *) diff --git a/src/fuchsia/writer.ml b/src/fuchsia/writer.ml index 972d9eb..ffc363c 100644 --- a/src/fuchsia/writer.ml +++ b/src/fuchsia/writer.ml @@ -4,15 +4,6 @@ open Common_ module Util = Util - -open struct - let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = - if id == Trace_core.Collector.dummy_trace_id then - 0L - else - Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 -end - open Util type user_data = Trace_core.user_data @@ -491,7 +482,7 @@ module Event = struct + Arguments.size_word args + 1 (* async id *) let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns - ~(async_id : Trace_core.trace_id) ~args () : unit = + ~(async_id : int64) ~args () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in let@ buf = Buf_chain.with_buf bufs ~available_word:size in @@ -516,7 +507,7 @@ module Event = struct Buf.add_string buf name; Arguments.encode buf args; - Buf.add_i64 buf (int64_of_trace_id_ async_id); + Buf.add_i64 buf async_id; () end @@ -527,7 +518,7 @@ module Event = struct + Arguments.size_word args + 1 (* async id *) let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns - ~(async_id : Trace_core.trace_id) ~args () : unit = + ~(async_id : int64) ~args () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in let@ buf = Buf_chain.with_buf bufs ~available_word:size in @@ -552,7 +543,7 @@ module Event = struct Buf.add_string buf name; Arguments.encode buf args; - Buf.add_i64 buf (int64_of_trace_id_ async_id); + Buf.add_i64 buf async_id; () end end