mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
trace-tef: use new subscriber, no more global state
This commit is contained in:
parent
a1837e402e
commit
7a392e54d1
5 changed files with 66 additions and 95 deletions
|
|
@ -2,3 +2,5 @@ module Sub = Trace_subscriber
|
||||||
module A = Trace_core.Internal_.Atomic_
|
module A = Trace_core.Internal_.Atomic_
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
|
type span_id = Sub.Span_sub.span_id
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
open Common_
|
open Common_
|
||||||
open Trace_core
|
open Trace_core
|
||||||
open Trace_private_util
|
open Trace_private_util
|
||||||
module Span_tbl = Sub.Span_tbl
|
module Span_sub = Sub.Span_sub
|
||||||
|
|
||||||
module Buf_pool = struct
|
module Buf_pool = struct
|
||||||
type t = Buffer.t Rpool.t
|
type t = Buffer.t Rpool.t
|
||||||
|
|
@ -15,16 +15,11 @@ end
|
||||||
open struct
|
open struct
|
||||||
let[@inline] time_us_of_time_ns (t : int64) : float =
|
let[@inline] time_us_of_time_ns (t : int64) : float =
|
||||||
Int64.div t 1_000L |> Int64.to_float
|
Int64.div t 1_000L |> Int64.to_float
|
||||||
|
|
||||||
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
|
|
||||||
if id == Trace_core.Collector.dummy_trace_id then
|
|
||||||
0L
|
|
||||||
else
|
|
||||||
Bytes.get_int64_le (Bytes.unsafe_of_string id) 0
|
|
||||||
end
|
end
|
||||||
|
|
||||||
let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s)
|
let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s)
|
||||||
|
|
||||||
|
(*
|
||||||
type span_info = {
|
type span_info = {
|
||||||
tid: int;
|
tid: int;
|
||||||
name: string;
|
name: string;
|
||||||
|
|
@ -35,18 +30,18 @@ that's running this (synchronous, stack-abiding) span. *)
|
||||||
}
|
}
|
||||||
(** Information we store about a span begin event, to emit a complete event when
|
(** Information we store about a span begin event, to emit a complete event when
|
||||||
we meet the corresponding span end event *)
|
we meet the corresponding span end event *)
|
||||||
|
*)
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
active: bool A.t;
|
active: bool A.t;
|
||||||
pid: int;
|
pid: int;
|
||||||
spans: span_info Span_tbl.t;
|
|
||||||
buf_pool: Buf_pool.t;
|
buf_pool: Buf_pool.t;
|
||||||
exporter: Exporter.t;
|
exporter: Exporter.t;
|
||||||
span_gen: Sub.Span_generator.t;
|
span_gen: Sub.Span_generator.t;
|
||||||
trace_id_gen: Sub.Trace_id_8B_generator.t;
|
|
||||||
}
|
}
|
||||||
(** Subscriber state *)
|
(** Subscriber state *)
|
||||||
|
|
||||||
|
(* TODO: this is nice to have, can we make it optional?
|
||||||
open struct
|
open struct
|
||||||
let print_non_closed_spans_warning spans =
|
let print_non_closed_spans_warning spans =
|
||||||
let module Str_set = Set.Make (String) in
|
let module Str_set = Set.Make (String) in
|
||||||
|
|
@ -67,12 +62,12 @@ open struct
|
||||||
flush stderr
|
flush stderr
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
*)
|
||||||
|
|
||||||
let close (self : t) : unit =
|
let close (self : t) : unit =
|
||||||
if A.exchange self.active false then (
|
if A.exchange self.active false then
|
||||||
print_non_closed_spans_warning self.spans;
|
(* FIXME: print_non_closed_spans_warning self.spans; *)
|
||||||
self.exporter.close ()
|
self.exporter.close ()
|
||||||
)
|
|
||||||
|
|
||||||
let[@inline] active self = A.get self.active
|
let[@inline] active self = A.get self.active
|
||||||
let[@inline] flush (self : t) : unit = self.exporter.flush ()
|
let[@inline] flush (self : t) : unit = self.exporter.flush ()
|
||||||
|
|
@ -83,108 +78,84 @@ let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
||||||
exporter;
|
exporter;
|
||||||
buf_pool;
|
buf_pool;
|
||||||
pid;
|
pid;
|
||||||
spans = Span_tbl.create ();
|
|
||||||
span_gen = Sub.Span_generator.create ();
|
span_gen = Sub.Span_generator.create ();
|
||||||
trace_id_gen = Sub.Trace_id_8B_generator.create ();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module Callbacks = struct
|
open struct
|
||||||
type st = t
|
type st = t
|
||||||
|
|
||||||
let new_span (self : st) = Sub.Span_generator.mk_span self.span_gen
|
let new_span_id (self : st) = Sub.Span_generator.mk_span self.span_gen
|
||||||
|
|
||||||
let new_trace_id self =
|
|
||||||
Sub.Trace_id_8B_generator.mk_trace_id self.trace_id_gen
|
|
||||||
|
|
||||||
let on_init _ ~time_ns:_ = ()
|
let on_init _ ~time_ns:_ = ()
|
||||||
let on_shutdown (self : st) ~time_ns:_ = close self
|
let on_shutdown (self : st) ~time_ns:_ = close self
|
||||||
|
|
||||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_name_process ~pid:self.pid ~name buf;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
(* add function name, if provided, to the metadata *)
|
(* add function name, if provided, to the metadata *)
|
||||||
let add_fun_name_ fun_name data : _ list =
|
let add_fun_name_ fun_name data : _ list =
|
||||||
match fun_name with
|
match fun_name with
|
||||||
| None -> data
|
| None -> data
|
||||||
| Some f -> ("function", `String f) :: data
|
| Some f -> ("function", `String f) :: data
|
||||||
|
|
||||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
let on_enter_span (self : st) (span : Span_sub.t) : unit =
|
||||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
match span.flavor with
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
| `Async ->
|
||||||
let data = add_fun_name_ fun_name data in
|
let { Span_sub.time_ns; data; tid; name; _ } = span in
|
||||||
let info = { tid; name; start_us = time_us; data } in
|
let time_us = time_us_of_time_ns time_ns in
|
||||||
(* save the span so we find it at exit *)
|
let data = add_fun_name_ span.__FUNCTION__ data in
|
||||||
Span_tbl.add self.spans span info
|
|
||||||
|
|
||||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
|
||||||
|
|
||||||
match Span_tbl.find_exn self.spans span with
|
|
||||||
| exception Not_found ->
|
|
||||||
!on_tracing_error
|
|
||||||
(Printf.sprintf "trace-tef: error: cannot find span %Ld" span)
|
|
||||||
| { tid; name; start_us; data } ->
|
|
||||||
Span_tbl.remove self.spans span;
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
let@ buf = Rpool.with_ self.buf_pool in
|
||||||
Writer.emit_duration_event buf ~pid:self.pid ~tid ~name ~start:start_us
|
Writer.emit_begin buf ~pid:self.pid ~tid ~name ~id:42L ~ts:time_us
|
||||||
~end_:time_us ~args:data;
|
~args:data ~flavor:span.flavor;
|
||||||
|
|
||||||
self.exporter.on_json buf
|
self.exporter.on_json buf
|
||||||
|
| `Sync -> () (* done at exit *)
|
||||||
|
|
||||||
let on_add_data (self : st) ~data span =
|
let on_exit_span (self : st) ~time_ns:exit_time_ns ~tid:_ span : unit =
|
||||||
if data <> [] then (
|
let { Span_sub.data; tid; flavor; name; _ } = span in
|
||||||
try
|
let exit_time_us = time_us_of_time_ns exit_time_ns in
|
||||||
let info = Span_tbl.find_exn self.spans span in
|
|
||||||
info.data <- List.rev_append data info.data
|
|
||||||
with Not_found ->
|
|
||||||
!on_tracing_error
|
|
||||||
(Printf.sprintf "trace-tef: error: cannot find span %Ld" span)
|
|
||||||
)
|
|
||||||
|
|
||||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
let@ buf = Rpool.with_ self.buf_pool in
|
||||||
|
(match flavor with
|
||||||
|
| `Sync ->
|
||||||
|
(* emit full event *)
|
||||||
|
let start_time_us = time_us_of_time_ns span.time_ns in
|
||||||
|
Writer.emit_duration_event buf ~pid:self.pid ~tid ~name
|
||||||
|
~start:start_time_us ~end_:exit_time_us ~args:data
|
||||||
|
| `Async ->
|
||||||
|
Writer.emit_end buf ~pid:self.pid ~tid ~name ~id:42L ~ts:exit_time_us
|
||||||
|
~flavor ~args:data);
|
||||||
|
|
||||||
|
self.exporter.on_json buf
|
||||||
|
|
||||||
|
let on_message (self : st) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit =
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
let time_us = time_us_of_time_ns @@ time_ns in
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
let@ buf = Rpool.with_ self.buf_pool in
|
||||||
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
|
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
|
||||||
~args:data;
|
~args:data;
|
||||||
self.exporter.on_json buf
|
self.exporter.on_json buf
|
||||||
|
|
||||||
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name n : unit =
|
let on_counter (self : st) ~time_ns ~tid ~params:_ ~data:_ ~name n : unit =
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
let time_us = time_us_of_time_ns @@ time_ns in
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
let@ buf = Rpool.with_ self.buf_pool in
|
||||||
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
|
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
|
||||||
self.exporter.on_json buf
|
self.exporter.on_json buf
|
||||||
|
|
||||||
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
let on_name_thread_ (self : st) ~tid name : unit =
|
||||||
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span :
|
|
||||||
unit =
|
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
|
||||||
|
|
||||||
let data = add_fun_name_ fun_name data in
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
let@ buf = Rpool.with_ self.buf_pool in
|
||||||
Writer.emit_manual_begin buf ~pid:self.pid ~tid ~name
|
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
|
||||||
~id:(int64_of_trace_id_ trace_id)
|
|
||||||
~ts:time_us ~args:data ~flavor;
|
|
||||||
self.exporter.on_json buf
|
self.exporter.on_json buf
|
||||||
|
|
||||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
|
let on_name_process_ (self : st) name : unit =
|
||||||
~trace_id (_ : span) : unit =
|
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
|
||||||
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
let@ buf = Rpool.with_ self.buf_pool in
|
||||||
Writer.emit_manual_end buf ~pid:self.pid ~tid ~name
|
Writer.emit_name_process ~pid:self.pid ~name buf;
|
||||||
~id:(int64_of_trace_id_ trace_id)
|
|
||||||
~ts:time_us ~flavor ~args:data;
|
|
||||||
self.exporter.on_json buf
|
self.exporter.on_json buf
|
||||||
|
|
||||||
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
|
let on_extension_event (self : st) ~time_ns:_ ~tid ev =
|
||||||
|
match ev with
|
||||||
|
| Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name
|
||||||
|
| Core_ext.Extension_set_process_name name -> on_name_process_ self name
|
||||||
|
| _ -> ()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
let sub_callbacks : _ Sub.Callbacks.t =
|
||||||
|
Sub.Callbacks.make ~new_span_id ~on_init ~on_shutdown ~on_enter_span
|
||||||
|
~on_exit_span ~on_message ~on_counter ~on_extension_event ()
|
||||||
|
|
||||||
let subscriber (self : t) : Sub.t =
|
let subscriber (self : t) : Sub.t =
|
||||||
Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }
|
Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks }
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,8 @@ val flush : t -> unit
|
||||||
val close : t -> unit
|
val close : t -> unit
|
||||||
val active : t -> bool
|
val active : t -> bool
|
||||||
|
|
||||||
module Callbacks : Sub.Callbacks.S with type st = t
|
val sub_callbacks : t Sub.Callbacks.t
|
||||||
|
(** Callbacks used for the subscriber *)
|
||||||
|
|
||||||
val subscriber : t -> Sub.t
|
val subscriber : t -> Sub.t
|
||||||
(** Subscriber that writes json into this writer *)
|
(** Subscriber that writes json into this writer *)
|
||||||
|
|
|
||||||
|
|
@ -50,25 +50,23 @@ let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit =
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args
|
args
|
||||||
|
|
||||||
let emit_manual_begin ~pid ~tid ~name ~(id : int64) ~ts ~args
|
let emit_begin ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit =
|
||||||
~(flavor : Trace_core.span_flavor option) buf : unit =
|
|
||||||
Printf.bprintf buf
|
Printf.bprintf buf
|
||||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||||
pid id tid ts str_val name
|
pid id tid ts str_val name
|
||||||
(match flavor with
|
(match flavor with
|
||||||
| None | Some `Async -> 'b'
|
| `Async -> 'b'
|
||||||
| Some `Sync -> 'B')
|
| `Sync -> 'B')
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args
|
args
|
||||||
|
|
||||||
let emit_manual_end ~pid ~tid ~name ~(id : int64) ~ts
|
let emit_end ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit =
|
||||||
~(flavor : Trace_core.span_flavor option) ~args buf : unit =
|
|
||||||
Printf.bprintf buf
|
Printf.bprintf buf
|
||||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||||
pid id tid ts str_val name
|
pid id tid ts str_val name
|
||||||
(match flavor with
|
(match flavor with
|
||||||
| None | Some `Async -> 'e'
|
| `Async -> 'e'
|
||||||
| Some `Sync -> 'E')
|
| `Sync -> 'E')
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args
|
args
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
raw event data. *)
|
raw event data. *)
|
||||||
|
|
||||||
open Common_
|
open Common_
|
||||||
open Trace_core
|
|
||||||
|
|
||||||
val emit_duration_event :
|
val emit_duration_event :
|
||||||
pid:int ->
|
pid:int ->
|
||||||
|
|
@ -16,25 +15,25 @@ val emit_duration_event :
|
||||||
Buffer.t ->
|
Buffer.t ->
|
||||||
unit
|
unit
|
||||||
|
|
||||||
val emit_manual_begin :
|
val emit_begin :
|
||||||
pid:int ->
|
pid:int ->
|
||||||
tid:int ->
|
tid:int ->
|
||||||
name:string ->
|
name:string ->
|
||||||
id:span ->
|
id:span_id ->
|
||||||
ts:float ->
|
ts:float ->
|
||||||
args:(string * Trace_core.user_data) list ->
|
args:(string * Trace_core.user_data) list ->
|
||||||
flavor:Trace_core.span_flavor option ->
|
flavor:[ `Sync | `Async ] ->
|
||||||
Buffer.t ->
|
Buffer.t ->
|
||||||
unit
|
unit
|
||||||
|
|
||||||
val emit_manual_end :
|
val emit_end :
|
||||||
pid:int ->
|
pid:int ->
|
||||||
tid:int ->
|
tid:int ->
|
||||||
name:string ->
|
name:string ->
|
||||||
id:span ->
|
id:span_id ->
|
||||||
ts:float ->
|
ts:float ->
|
||||||
flavor:Trace_core.span_flavor option ->
|
|
||||||
args:(string * Trace_core.user_data) list ->
|
args:(string * Trace_core.user_data) list ->
|
||||||
|
flavor:[ `Sync | `Async ] ->
|
||||||
Buffer.t ->
|
Buffer.t ->
|
||||||
unit
|
unit
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue