mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-08 03:47:57 -04:00
port fuchsia to new subscriber
This commit is contained in:
parent
7389ca5b45
commit
2cfb3c67fa
3 changed files with 55 additions and 89 deletions
|
|
@ -1,28 +1,14 @@
|
||||||
open Common_
|
open Common_
|
||||||
open Trace_core
|
open Trace_core
|
||||||
module Span_tbl = Trace_subscriber.Span_tbl
|
|
||||||
|
|
||||||
let on_tracing_error = on_tracing_error
|
let on_tracing_error = on_tracing_error
|
||||||
|
|
||||||
type span_info = {
|
|
||||||
tid: int;
|
|
||||||
name: string;
|
|
||||||
start_ns: int64;
|
|
||||||
mutable data: (string * user_data) list;
|
|
||||||
(* NOTE: thread safety: this is supposed to only be modified by the thread
|
|
||||||
that's running this (synchronous, stack-abiding) span. *)
|
|
||||||
}
|
|
||||||
(** Information we store about a span begin event, to emit a complete event when
|
|
||||||
we meet the corresponding span end event *)
|
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
active: bool A.t;
|
active: bool A.t;
|
||||||
pid: int;
|
pid: int;
|
||||||
spans: span_info Span_tbl.t;
|
|
||||||
buf_chain: Buf_chain.t;
|
buf_chain: Buf_chain.t;
|
||||||
exporter: Exporter.t;
|
exporter: Exporter.t;
|
||||||
span_gen: Sub.Span_generator.t;
|
span_gen: Sub.Span_generator.t;
|
||||||
trace_id_gen: Sub.Trace_id_8B_generator.t;
|
|
||||||
}
|
}
|
||||||
(** Subscriber state *)
|
(** Subscriber state *)
|
||||||
|
|
||||||
|
|
@ -32,6 +18,7 @@ open struct
|
||||||
if Buf_chain.has_ready self.buf_chain then
|
if Buf_chain.has_ready self.buf_chain then
|
||||||
Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs
|
Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs
|
||||||
|
|
||||||
|
(* TODO: nice to have, can we make it optional?
|
||||||
let print_non_closed_spans_warning spans =
|
let print_non_closed_spans_warning spans =
|
||||||
let module Str_set = Set.Make (String) in
|
let module Str_set = Set.Make (String) in
|
||||||
let spans = Span_tbl.to_list spans in
|
let spans = Span_tbl.to_list spans in
|
||||||
|
|
@ -49,15 +36,14 @@ open struct
|
||||||
names;
|
names;
|
||||||
flush stderr
|
flush stderr
|
||||||
)
|
)
|
||||||
|
*)
|
||||||
end
|
end
|
||||||
|
|
||||||
let close (self : t) : unit =
|
let close (self : t) : unit =
|
||||||
if A.exchange self.active false then (
|
if A.exchange self.active false then (
|
||||||
Buf_chain.ready_all_non_empty self.buf_chain;
|
Buf_chain.ready_all_non_empty self.buf_chain;
|
||||||
write_ready_ self;
|
write_ready_ self;
|
||||||
self.exporter.close ();
|
self.exporter.close () (* TODO: print_non_closed_spans_warning self.spans *)
|
||||||
|
|
||||||
print_non_closed_spans_warning self.spans
|
|
||||||
)
|
)
|
||||||
|
|
||||||
let[@inline] active self = A.get self.active
|
let[@inline] active self = A.get self.active
|
||||||
|
|
@ -74,20 +60,13 @@ let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
||||||
buf_chain;
|
buf_chain;
|
||||||
exporter;
|
exporter;
|
||||||
pid;
|
pid;
|
||||||
spans = Span_tbl.create ();
|
|
||||||
span_gen = Sub.Span_generator.create ();
|
span_gen = Sub.Span_generator.create ();
|
||||||
trace_id_gen = Sub.Trace_id_8B_generator.create ();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module Callbacks = struct
|
open struct
|
||||||
type st = t
|
let new_span_id (self : t) = Sub.Span_generator.mk_span self.span_gen
|
||||||
|
|
||||||
let new_span (self : st) = Sub.Span_generator.mk_span self.span_gen
|
let on_init (self : t) ~time_ns:_ =
|
||||||
|
|
||||||
let new_trace_id self =
|
|
||||||
Sub.Trace_id_8B_generator.mk_trace_id self.trace_id_gen
|
|
||||||
|
|
||||||
let on_init (self : st) ~time_ns:_ =
|
|
||||||
Writer.Metadata.Magic_record.encode self.buf_chain;
|
Writer.Metadata.Magic_record.encode self.buf_chain;
|
||||||
Writer.Metadata.Initialization_record.(
|
Writer.Metadata.Initialization_record.(
|
||||||
encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
|
encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
|
||||||
|
|
@ -98,14 +77,14 @@ module Callbacks = struct
|
||||||
|
|
||||||
write_ready_ self
|
write_ready_ self
|
||||||
|
|
||||||
let on_shutdown (self : st) ~time_ns:_ = close self
|
let on_shutdown (self : t) ~time_ns:_ = close self
|
||||||
|
|
||||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
let on_name_process_ (self : t) name : unit =
|
||||||
Writer.Kernel_object.(
|
Writer.Kernel_object.(
|
||||||
encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
|
encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
|
||||||
write_ready_ self
|
write_ready_ self
|
||||||
|
|
||||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
let on_name_thread_ (self : t) ~tid name : unit =
|
||||||
Writer.Kernel_object.(
|
Writer.Kernel_object.(
|
||||||
encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
|
encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
|
||||||
~args:[ "process", A_kid (Int64.of_int self.pid) ]
|
~args:[ "process", A_kid (Int64.of_int self.pid) ]
|
||||||
|
|
@ -118,43 +97,49 @@ module Callbacks = struct
|
||||||
| None -> data
|
| None -> data
|
||||||
| Some f -> ("function", `String f) :: data
|
| Some f -> ("function", `String f) :: data
|
||||||
|
|
||||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
let on_enter_span (self : t) (span : Sub.Span_sub.t) : unit =
|
||||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
let { Sub.Span_sub.data; name; tid; time_ns; flavor; _ } = span in
|
||||||
let data = add_fun_name_ fun_name data in
|
match flavor with
|
||||||
let info = { tid; name; start_ns = time_ns; data } in
|
| `Sync -> ()
|
||||||
(* save the span so we find it at exit *)
|
| `Async ->
|
||||||
Span_tbl.add self.spans span info
|
let data = add_fun_name_ span.__FUNCTION__ data in
|
||||||
|
Writer.(
|
||||||
|
Event.Async_begin.encode self.buf_chain ~name
|
||||||
|
~args:(args_of_user_data data)
|
||||||
|
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||||
|
~time_ns ~async_id:0L ());
|
||||||
|
write_ready_ self
|
||||||
|
|
||||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
let on_exit_span (self : t) ~time_ns:end_time_ns ~tid:_
|
||||||
match Span_tbl.find_exn self.spans span with
|
(span : Sub.Span_sub.t) : unit =
|
||||||
| exception Not_found ->
|
let { Sub.Span_sub.tid; name; flavor; time_ns = start_ns; data; _ } =
|
||||||
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
|
span
|
||||||
| { tid; name; start_ns; data } ->
|
in
|
||||||
Span_tbl.remove self.spans span;
|
let data = add_fun_name_ span.__FUNCTION__ data in
|
||||||
|
|
||||||
|
match flavor with
|
||||||
|
| `Sync ->
|
||||||
Writer.(
|
Writer.(
|
||||||
Event.Duration_complete.encode self.buf_chain ~name
|
Event.Duration_complete.encode self.buf_chain ~name
|
||||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||||
~time_ns:start_ns ~end_time_ns:time_ns ~args:(args_of_user_data data)
|
~time_ns:start_ns ~end_time_ns ~args:(args_of_user_data data) ());
|
||||||
());
|
write_ready_ self
|
||||||
|
| `Async ->
|
||||||
|
Writer.(
|
||||||
|
Event.Async_end.encode self.buf_chain ~name
|
||||||
|
~args:(args_of_user_data data)
|
||||||
|
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||||
|
~time_ns:end_time_ns ~async_id:0L ());
|
||||||
write_ready_ self
|
write_ready_ self
|
||||||
|
|
||||||
let on_add_data (self : st) ~data span =
|
let on_message (self : t) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit =
|
||||||
if data <> [] then (
|
|
||||||
try
|
|
||||||
let info = Span_tbl.find_exn self.spans span in
|
|
||||||
info.data <- List.rev_append data info.data
|
|
||||||
with Not_found ->
|
|
||||||
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
|
|
||||||
)
|
|
||||||
|
|
||||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
|
||||||
Writer.(
|
Writer.(
|
||||||
Event.Instant.encode self.buf_chain
|
Event.Instant.encode self.buf_chain
|
||||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||||
~name:msg ~time_ns ~args:(args_of_user_data data) ());
|
~name:msg ~time_ns ~args:(args_of_user_data data) ());
|
||||||
write_ready_ self
|
write_ready_ self
|
||||||
|
|
||||||
let on_counter (self : st) ~time_ns ~tid ~data ~name n : unit =
|
let on_counter (self : t) ~time_ns ~tid ~params:_ ~data ~name n : unit =
|
||||||
Writer.(
|
Writer.(
|
||||||
Event.Counter.encode self.buf_chain
|
Event.Counter.encode self.buf_chain
|
||||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||||
|
|
@ -163,25 +148,16 @@ module Callbacks = struct
|
||||||
());
|
());
|
||||||
write_ready_ self
|
write_ready_ self
|
||||||
|
|
||||||
let on_enter_manual_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_
|
let on_extension_event (self : t) ~time_ns:_ ~tid ev =
|
||||||
~time_ns ~tid ~parent:_ ~data ~name ~flavor:_ ~trace_id _span : unit =
|
match ev with
|
||||||
Writer.(
|
| Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name
|
||||||
Event.Async_begin.encode self.buf_chain ~name
|
| Core_ext.Extension_set_process_name name -> on_name_process_ self name
|
||||||
~args:(args_of_user_data data)
|
| _ -> ()
|
||||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
|
||||||
~time_ns ~async_id:trace_id ());
|
|
||||||
write_ready_ self
|
|
||||||
|
|
||||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor:_
|
|
||||||
~trace_id (_ : span) : unit =
|
|
||||||
Writer.(
|
|
||||||
Event.Async_end.encode self.buf_chain ~name ~args:(args_of_user_data data)
|
|
||||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
|
||||||
~time_ns ~async_id:trace_id ());
|
|
||||||
write_ready_ self
|
|
||||||
|
|
||||||
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
let sub_callbacks : _ Sub.Callbacks.t =
|
||||||
|
Sub.Callbacks.make ~new_span_id ~on_init ~on_shutdown ~on_enter_span
|
||||||
|
~on_exit_span ~on_message ~on_counter ~on_extension_event ()
|
||||||
|
|
||||||
let subscriber (self : t) : Sub.t =
|
let subscriber (self : t) : Sub.t =
|
||||||
Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }
|
Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks }
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,7 @@ val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t
|
||||||
val flush : t -> unit
|
val flush : t -> unit
|
||||||
val close : t -> unit
|
val close : t -> unit
|
||||||
val active : t -> bool
|
val active : t -> bool
|
||||||
|
val sub_callbacks : t Trace_subscriber.Callbacks.t
|
||||||
module Callbacks : Trace_subscriber.Callbacks.S with type st = t
|
|
||||||
|
|
||||||
val subscriber : t -> Trace_subscriber.t
|
val subscriber : t -> Trace_subscriber.t
|
||||||
(** Subscriber that writes json into this writer *)
|
(** Subscriber that writes json into this writer *)
|
||||||
|
|
|
||||||
|
|
@ -4,15 +4,6 @@
|
||||||
|
|
||||||
open Common_
|
open Common_
|
||||||
module Util = Util
|
module Util = Util
|
||||||
|
|
||||||
open struct
|
|
||||||
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
|
|
||||||
if id == Trace_core.Collector.dummy_trace_id then
|
|
||||||
0L
|
|
||||||
else
|
|
||||||
Bytes.get_int64_le (Bytes.unsafe_of_string id) 0
|
|
||||||
end
|
|
||||||
|
|
||||||
open Util
|
open Util
|
||||||
|
|
||||||
type user_data = Trace_core.user_data
|
type user_data = Trace_core.user_data
|
||||||
|
|
@ -491,7 +482,7 @@ module Event = struct
|
||||||
+ Arguments.size_word args + 1 (* async id *)
|
+ Arguments.size_word args + 1 (* async id *)
|
||||||
|
|
||||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||||
~(async_id : Trace_core.trace_id) ~args () : unit =
|
~(async_id : int64) ~args () : unit =
|
||||||
let name = truncate_string name in
|
let name = truncate_string name in
|
||||||
let size = size_word ~name ~t_ref ~args () in
|
let size = size_word ~name ~t_ref ~args () in
|
||||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||||
|
|
@ -516,7 +507,7 @@ module Event = struct
|
||||||
|
|
||||||
Buf.add_string buf name;
|
Buf.add_string buf name;
|
||||||
Arguments.encode buf args;
|
Arguments.encode buf args;
|
||||||
Buf.add_i64 buf (int64_of_trace_id_ async_id);
|
Buf.add_i64 buf async_id;
|
||||||
()
|
()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -527,7 +518,7 @@ module Event = struct
|
||||||
+ Arguments.size_word args + 1 (* async id *)
|
+ Arguments.size_word args + 1 (* async id *)
|
||||||
|
|
||||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||||
~(async_id : Trace_core.trace_id) ~args () : unit =
|
~(async_id : int64) ~args () : unit =
|
||||||
let name = truncate_string name in
|
let name = truncate_string name in
|
||||||
let size = size_word ~name ~t_ref ~args () in
|
let size = size_word ~name ~t_ref ~args () in
|
||||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||||
|
|
@ -552,7 +543,7 @@ module Event = struct
|
||||||
|
|
||||||
Buf.add_string buf name;
|
Buf.add_string buf name;
|
||||||
Arguments.encode buf args;
|
Arguments.encode buf args;
|
||||||
Buf.add_i64 buf (int64_of_trace_id_ async_id);
|
Buf.add_i64 buf async_id;
|
||||||
()
|
()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue