mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-08 03:47:57 -04:00
rewrite trace-tef so it returns the new collector
This commit is contained in:
parent
7b0197e6c2
commit
dc37f68993
13 changed files with 269 additions and 262 deletions
195
src/tef/collector_tef.ml
Normal file
195
src/tef/collector_tef.ml
Normal file
|
|
@ -0,0 +1,195 @@
|
||||||
|
open Common_
|
||||||
|
open Types
|
||||||
|
open Trace_core
|
||||||
|
|
||||||
|
module Buf_pool = struct
|
||||||
|
type t = Buffer.t Trace_util.Rpool.t
|
||||||
|
|
||||||
|
let create ?(max_size = 32) ?(buf_size = 256) () : t =
|
||||||
|
Trace_util.Rpool.create ~max_size ~clear:Buffer.reset
|
||||||
|
~create:(fun () -> Buffer.create buf_size)
|
||||||
|
()
|
||||||
|
end
|
||||||
|
|
||||||
|
open struct
|
||||||
|
let[@inline] time_us_of_time_ns (t : int64) : float =
|
||||||
|
Int64.div t 1_000L |> Int64.to_float
|
||||||
|
end
|
||||||
|
|
||||||
|
let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s)
|
||||||
|
|
||||||
|
(*
|
||||||
|
type span_info = {
|
||||||
|
tid: int;
|
||||||
|
name: string;
|
||||||
|
start_us: float;
|
||||||
|
mutable data: (string * user_data) list;
|
||||||
|
(* NOTE: thread safety: this is supposed to only be modified by the thread
|
||||||
|
that's running this (synchronous, stack-abiding) span. *)
|
||||||
|
}
|
||||||
|
(** Information we store about a span begin event, to emit a complete event when
|
||||||
|
we meet the corresponding span end event *)
|
||||||
|
*)
|
||||||
|
|
||||||
|
type t = {
|
||||||
|
active: bool A.t;
|
||||||
|
pid: int;
|
||||||
|
buf_pool: Buf_pool.t;
|
||||||
|
exporter: Exporter.t;
|
||||||
|
span_id_gen: Trace_util.Span_id64.Gen.t;
|
||||||
|
trace_id_gen: Trace_util.Trace_id64.Gen.t;
|
||||||
|
}
|
||||||
|
(** Subscriber state *)
|
||||||
|
|
||||||
|
(* TODO: this is nice to have, can we make it optional?
|
||||||
|
open struct
|
||||||
|
let print_non_closed_spans_warning spans =
|
||||||
|
let module Str_set = Set.Make (String) in
|
||||||
|
let spans = Span_tbl.to_list spans in
|
||||||
|
if spans <> [] then (
|
||||||
|
!on_tracing_error
|
||||||
|
@@ Printf.sprintf "trace-tef: warning: %d spans were not closed"
|
||||||
|
(List.length spans);
|
||||||
|
let names =
|
||||||
|
List.fold_left
|
||||||
|
(fun set (_, span) -> Str_set.add span.name set)
|
||||||
|
Str_set.empty spans
|
||||||
|
in
|
||||||
|
Str_set.iter
|
||||||
|
(fun name ->
|
||||||
|
!on_tracing_error @@ Printf.sprintf " span %S was not closed" name)
|
||||||
|
names;
|
||||||
|
flush stderr
|
||||||
|
)
|
||||||
|
end
|
||||||
|
*)
|
||||||
|
|
||||||
|
let close (self : t) : unit =
|
||||||
|
if A.exchange self.active false then
|
||||||
|
(* FIXME: print_non_closed_spans_warning self.spans; *)
|
||||||
|
self.exporter.close ()
|
||||||
|
|
||||||
|
let[@inline] active self = A.get self.active
|
||||||
|
let[@inline] flush (self : t) : unit = self.exporter.flush ()
|
||||||
|
|
||||||
|
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
||||||
|
{
|
||||||
|
active = A.make true;
|
||||||
|
exporter;
|
||||||
|
buf_pool;
|
||||||
|
pid;
|
||||||
|
span_id_gen = Trace_util.Span_id64.Gen.create ();
|
||||||
|
trace_id_gen = Trace_util.Trace_id64.Gen.create ();
|
||||||
|
}
|
||||||
|
|
||||||
|
open struct
|
||||||
|
type st = t
|
||||||
|
|
||||||
|
let rec flavor_of_params = function
|
||||||
|
| [] -> `Sync
|
||||||
|
| Core_ext.Extension_span_flavor f :: _ -> f
|
||||||
|
| _ :: tl -> flavor_of_params tl
|
||||||
|
|
||||||
|
let new_span_id (self : st) = Trace_util.Span_id64.Gen.gen self.span_id_gen
|
||||||
|
let new_trace_id (self : st) = Trace_util.Trace_id64.Gen.gen self.trace_id_gen
|
||||||
|
let init _ = ()
|
||||||
|
let shutdown (self : st) = close self
|
||||||
|
|
||||||
|
(* add function name, if provided, to the metadata *)
|
||||||
|
let add_fun_name_ fun_name data : _ list =
|
||||||
|
match fun_name with
|
||||||
|
| None -> data
|
||||||
|
| Some f -> ("function", `String f) :: data
|
||||||
|
|
||||||
|
let enter_span (self : st) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~params ~data
|
||||||
|
~parent name : span =
|
||||||
|
let start_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
|
||||||
|
let flavor = flavor_of_params params in
|
||||||
|
|
||||||
|
let pid = self.pid in
|
||||||
|
let tid = Trace_util.Mock_.get_tid () in
|
||||||
|
match flavor with
|
||||||
|
| `Sync -> Span_tef_sync { name; pid; tid; args = data; start_us }
|
||||||
|
| `Async ->
|
||||||
|
let trace_id =
|
||||||
|
match parent with
|
||||||
|
| P_some (Span_tef_async sp) -> sp.trace_id
|
||||||
|
| _ -> new_trace_id self
|
||||||
|
in
|
||||||
|
|
||||||
|
(let@ buf = Trace_util.Rpool.with_ self.buf_pool in
|
||||||
|
Writer.emit_begin_async buf ~name ~pid ~tid ~trace_id ~ts:start_us
|
||||||
|
~args:data;
|
||||||
|
self.exporter.on_json buf);
|
||||||
|
|
||||||
|
Span_tef_async { pid; tid; trace_id; name; args = data }
|
||||||
|
|
||||||
|
let exit_span (self : st) sp =
|
||||||
|
let end_time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
|
||||||
|
|
||||||
|
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
|
||||||
|
let did_write =
|
||||||
|
match sp with
|
||||||
|
| Span_tef_sync { name; pid; tid; args; start_us } ->
|
||||||
|
(* emit full event *)
|
||||||
|
Writer.emit_duration_event buf ~pid ~tid ~name ~start:start_us
|
||||||
|
~end_:end_time_us ~args;
|
||||||
|
true
|
||||||
|
| Span_tef_async { name; trace_id; pid; tid; args } ->
|
||||||
|
Writer.emit_end_async buf ~pid ~tid ~name ~trace_id ~ts:end_time_us
|
||||||
|
~args;
|
||||||
|
true
|
||||||
|
| _ -> false
|
||||||
|
in
|
||||||
|
|
||||||
|
if did_write then self.exporter.on_json buf
|
||||||
|
|
||||||
|
let message (self : st) ~params:_ ~data ~span:_ msg : unit =
|
||||||
|
let tid = Trace_util.Mock_.get_tid () in
|
||||||
|
let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
|
||||||
|
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
|
||||||
|
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
|
||||||
|
~args:data;
|
||||||
|
self.exporter.on_json buf
|
||||||
|
|
||||||
|
let counter_float (self : st) ~params:_ ~data:_ name n : unit =
|
||||||
|
let tid = Trace_util.Mock_.get_tid () in
|
||||||
|
let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
|
||||||
|
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
|
||||||
|
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
|
||||||
|
self.exporter.on_json buf
|
||||||
|
|
||||||
|
let counter_int (self : st) ~params ~data name n : unit =
|
||||||
|
counter_float self ~params ~data name (float_of_int n)
|
||||||
|
|
||||||
|
let add_data_to_span _st sp data =
|
||||||
|
match sp with
|
||||||
|
| Span_tef_sync sp -> sp.args <- List.rev_append data sp.args
|
||||||
|
| Span_tef_async sp -> sp.args <- List.rev_append data sp.args
|
||||||
|
| _ -> ()
|
||||||
|
|
||||||
|
let on_name_thread_ (self : st) ~tid name : unit =
|
||||||
|
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
|
||||||
|
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
|
||||||
|
self.exporter.on_json buf
|
||||||
|
|
||||||
|
let on_name_process_ (self : st) name : unit =
|
||||||
|
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
|
||||||
|
Writer.emit_name_process ~pid:self.pid ~name buf;
|
||||||
|
self.exporter.on_json buf
|
||||||
|
|
||||||
|
let extension (self : st) ev =
|
||||||
|
match ev with
|
||||||
|
| Core_ext.Extension_set_thread_name name ->
|
||||||
|
let tid = Trace_util.Mock_.get_tid () in
|
||||||
|
on_name_thread_ self ~tid name
|
||||||
|
| Core_ext.Extension_set_process_name name -> on_name_process_ self name
|
||||||
|
| _ -> ()
|
||||||
|
end
|
||||||
|
|
||||||
|
let callbacks_collector : _ Collector.Callbacks.t =
|
||||||
|
Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span ~message
|
||||||
|
~add_data_to_span ~counter_int ~counter_float ~extension ()
|
||||||
|
|
||||||
|
let collector (self : t) : Collector.t =
|
||||||
|
Collector.C_some (self, callbacks_collector)
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
open Trace_core
|
||||||
open Common_
|
open Common_
|
||||||
|
|
||||||
module Buf_pool : sig
|
module Buf_pool : sig
|
||||||
|
|
@ -7,19 +8,19 @@ module Buf_pool : sig
|
||||||
end
|
end
|
||||||
|
|
||||||
type t
|
type t
|
||||||
(** Main subscriber state. *)
|
(** Main state. *)
|
||||||
|
|
||||||
val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t
|
val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t
|
||||||
(** Create a subscriber state. *)
|
(** Create a fresh state. *)
|
||||||
|
|
||||||
val flush : t -> unit
|
val flush : t -> unit
|
||||||
val close : t -> unit
|
val close : t -> unit
|
||||||
val active : t -> bool
|
val active : t -> bool
|
||||||
|
|
||||||
val sub_callbacks : t Sub.Callbacks.t
|
val callbacks_collector : t Collector.Callbacks.t
|
||||||
(** Callbacks used for the subscriber *)
|
(** Callbacks used for the subscriber *)
|
||||||
|
|
||||||
val subscriber : t -> Sub.t
|
val collector : t -> Collector.t
|
||||||
(** Subscriber that writes json into this writer *)
|
(** Subscriber that writes json into this writer *)
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
@ -1,6 +1,3 @@
|
||||||
module Sub = Trace_subscriber
|
|
||||||
module A = Trace_core.Internal_.Atomic_
|
module A = Trace_core.Internal_.Atomic_
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
type span_id = Sub.Span_sub.span_id
|
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,4 @@
|
||||||
(public_name trace-tef)
|
(public_name trace-tef)
|
||||||
(synopsis
|
(synopsis
|
||||||
"Simple and lightweight tracing using TEF/Catapult format, in-process")
|
"Simple and lightweight tracing using TEF/Catapult format, in-process")
|
||||||
(libraries
|
(libraries trace.core trace.util mtime mtime.clock.os unix threads))
|
||||||
trace.core
|
|
||||||
trace.private.util
|
|
||||||
trace.subscriber
|
|
||||||
mtime
|
|
||||||
mtime.clock.os
|
|
||||||
unix
|
|
||||||
threads))
|
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,10 @@
|
||||||
(** An exporter, takes JSON objects and writes them somewhere *)
|
|
||||||
|
|
||||||
open Common_
|
open Common_
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
on_json: Buffer.t -> unit;
|
on_json: Buffer.t -> unit;
|
||||||
(** Takes a buffer and writes it somewhere. The buffer is only valid
|
flush: unit -> unit;
|
||||||
during this call and must not be stored. *)
|
close: unit -> unit;
|
||||||
flush: unit -> unit; (** Force write *)
|
|
||||||
close: unit -> unit; (** Close underlying resources *)
|
|
||||||
}
|
}
|
||||||
(** An exporter, takes JSON objects and writes them somewhere.
|
|
||||||
|
|
||||||
This should be thread-safe if used in a threaded environment. *)
|
|
||||||
|
|
||||||
open struct
|
open struct
|
||||||
let with_lock lock f =
|
let with_lock lock f =
|
||||||
|
|
@ -26,11 +19,6 @@ open struct
|
||||||
Printexc.raise_with_backtrace e bt
|
Printexc.raise_with_backtrace e bt
|
||||||
end
|
end
|
||||||
|
|
||||||
(** Export to the channel
|
|
||||||
@param jsonl
|
|
||||||
if true, export as a JSON object per line, otherwise export as a single
|
|
||||||
big JSON array.
|
|
||||||
@param close_channel if true, closing the exporter will close the channel *)
|
|
||||||
let of_out_channel ~close_channel ~jsonl oc : t =
|
let of_out_channel ~close_channel ~jsonl oc : t =
|
||||||
let lock = Mutex.create () in
|
let lock = Mutex.create () in
|
||||||
let first = ref true in
|
let first = ref true in
|
||||||
|
|
|
||||||
22
src/tef/exporter.mli
Normal file
22
src/tef/exporter.mli
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
(** An exporter, takes JSON objects and writes them somewhere *)
|
||||||
|
|
||||||
|
type t = {
|
||||||
|
on_json: Buffer.t -> unit;
|
||||||
|
(** Takes a buffer and writes it somewhere. The buffer is only valid
|
||||||
|
during this call and must not be stored. *)
|
||||||
|
flush: unit -> unit; (** Force write *)
|
||||||
|
close: unit -> unit; (** Close underlying resources *)
|
||||||
|
}
|
||||||
|
(** An exporter, takes JSON objects and writes them somewhere.
|
||||||
|
|
||||||
|
This should be thread-safe if used in a threaded environment. *)
|
||||||
|
|
||||||
|
val of_out_channel : close_channel:bool -> jsonl:bool -> out_channel -> t
|
||||||
|
(** Export to the channel
|
||||||
|
@param jsonl
|
||||||
|
if true, export as a JSON object per line, otherwise export as a single
|
||||||
|
big JSON array.
|
||||||
|
@param close_channel if true, closing the exporter will close the channel *)
|
||||||
|
|
||||||
|
val of_buffer : jsonl:bool -> Buffer.t -> t
|
||||||
|
(** Emit into the buffer *)
|
||||||
|
|
@ -1,164 +0,0 @@
|
||||||
open Common_
|
|
||||||
open Trace_core
|
|
||||||
open Trace_private_util
|
|
||||||
module Span_sub = Sub.Span_sub
|
|
||||||
|
|
||||||
module Buf_pool = struct
|
|
||||||
type t = Buffer.t Rpool.t
|
|
||||||
|
|
||||||
let create ?(max_size = 32) ?(buf_size = 256) () : t =
|
|
||||||
Rpool.create ~max_size ~clear:Buffer.reset
|
|
||||||
~create:(fun () -> Buffer.create buf_size)
|
|
||||||
()
|
|
||||||
end
|
|
||||||
|
|
||||||
open struct
|
|
||||||
let[@inline] time_us_of_time_ns (t : int64) : float =
|
|
||||||
Int64.div t 1_000L |> Int64.to_float
|
|
||||||
end
|
|
||||||
|
|
||||||
let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s)
|
|
||||||
|
|
||||||
(*
|
|
||||||
type span_info = {
|
|
||||||
tid: int;
|
|
||||||
name: string;
|
|
||||||
start_us: float;
|
|
||||||
mutable data: (string * user_data) list;
|
|
||||||
(* NOTE: thread safety: this is supposed to only be modified by the thread
|
|
||||||
that's running this (synchronous, stack-abiding) span. *)
|
|
||||||
}
|
|
||||||
(** Information we store about a span begin event, to emit a complete event when
|
|
||||||
we meet the corresponding span end event *)
|
|
||||||
*)
|
|
||||||
|
|
||||||
type t = {
|
|
||||||
active: bool A.t;
|
|
||||||
pid: int;
|
|
||||||
buf_pool: Buf_pool.t;
|
|
||||||
exporter: Exporter.t;
|
|
||||||
span_id_gen: Sub.Span_id_generator.t;
|
|
||||||
trace_id_gen: Sub.Trace_id_generator.t;
|
|
||||||
}
|
|
||||||
(** Subscriber state *)
|
|
||||||
|
|
||||||
(* TODO: this is nice to have, can we make it optional?
|
|
||||||
open struct
|
|
||||||
let print_non_closed_spans_warning spans =
|
|
||||||
let module Str_set = Set.Make (String) in
|
|
||||||
let spans = Span_tbl.to_list spans in
|
|
||||||
if spans <> [] then (
|
|
||||||
!on_tracing_error
|
|
||||||
@@ Printf.sprintf "trace-tef: warning: %d spans were not closed"
|
|
||||||
(List.length spans);
|
|
||||||
let names =
|
|
||||||
List.fold_left
|
|
||||||
(fun set (_, span) -> Str_set.add span.name set)
|
|
||||||
Str_set.empty spans
|
|
||||||
in
|
|
||||||
Str_set.iter
|
|
||||||
(fun name ->
|
|
||||||
!on_tracing_error @@ Printf.sprintf " span %S was not closed" name)
|
|
||||||
names;
|
|
||||||
flush stderr
|
|
||||||
)
|
|
||||||
end
|
|
||||||
*)
|
|
||||||
|
|
||||||
let close (self : t) : unit =
|
|
||||||
if A.exchange self.active false then
|
|
||||||
(* FIXME: print_non_closed_spans_warning self.spans; *)
|
|
||||||
self.exporter.close ()
|
|
||||||
|
|
||||||
let[@inline] active self = A.get self.active
|
|
||||||
let[@inline] flush (self : t) : unit = self.exporter.flush ()
|
|
||||||
|
|
||||||
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
|
||||||
{
|
|
||||||
active = A.make true;
|
|
||||||
exporter;
|
|
||||||
buf_pool;
|
|
||||||
pid;
|
|
||||||
span_id_gen = Sub.Span_id_generator.create ();
|
|
||||||
trace_id_gen = Sub.Trace_id_generator.create ();
|
|
||||||
}
|
|
||||||
|
|
||||||
open struct
|
|
||||||
type st = t
|
|
||||||
|
|
||||||
let new_span_id (self : st) = Sub.Span_id_generator.gen self.span_id_gen
|
|
||||||
let new_trace_id (self : st) = Sub.Trace_id_generator.gen self.trace_id_gen
|
|
||||||
let on_init _ ~time_ns:_ = ()
|
|
||||||
let on_shutdown (self : st) ~time_ns:_ = close self
|
|
||||||
|
|
||||||
(* add function name, if provided, to the metadata *)
|
|
||||||
let add_fun_name_ fun_name data : _ list =
|
|
||||||
match fun_name with
|
|
||||||
| None -> data
|
|
||||||
| Some f -> ("function", `String f) :: data
|
|
||||||
|
|
||||||
let on_enter_span (self : st) (span : Span_sub.t) : unit =
|
|
||||||
match span.flavor with
|
|
||||||
| `Async ->
|
|
||||||
let { Span_sub.time_ns; data; tid; name; _ } = span in
|
|
||||||
let time_us = time_us_of_time_ns time_ns in
|
|
||||||
let data = add_fun_name_ span.__FUNCTION__ data in
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_begin buf ~pid:self.pid ~tid ~name ~id:span.trace_id
|
|
||||||
~ts:time_us ~args:data ~flavor:span.flavor;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
| `Sync -> () (* done at exit *)
|
|
||||||
|
|
||||||
let on_exit_span (self : st) ~time_ns:exit_time_ns ~tid:_ span : unit =
|
|
||||||
let { Span_sub.data; tid; flavor; name; _ } = span in
|
|
||||||
let exit_time_us = time_us_of_time_ns exit_time_ns in
|
|
||||||
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
(match flavor with
|
|
||||||
| `Sync ->
|
|
||||||
(* emit full event *)
|
|
||||||
let start_time_us = time_us_of_time_ns span.time_ns in
|
|
||||||
Writer.emit_duration_event buf ~pid:self.pid ~tid ~name
|
|
||||||
~start:start_time_us ~end_:exit_time_us ~args:data
|
|
||||||
| `Async ->
|
|
||||||
Writer.emit_end buf ~pid:self.pid ~tid ~name ~id:span.trace_id
|
|
||||||
~ts:exit_time_us ~flavor ~args:data);
|
|
||||||
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit =
|
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
|
|
||||||
~args:data;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
let on_counter (self : st) ~time_ns ~tid ~params:_ ~data:_ ~name n : unit =
|
|
||||||
let time_us = time_us_of_time_ns @@ time_ns in
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
let on_name_thread_ (self : st) ~tid name : unit =
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
let on_name_process_ (self : st) name : unit =
|
|
||||||
let@ buf = Rpool.with_ self.buf_pool in
|
|
||||||
Writer.emit_name_process ~pid:self.pid ~name buf;
|
|
||||||
self.exporter.on_json buf
|
|
||||||
|
|
||||||
let on_extension_event (self : st) ~time_ns:_ ~tid ev =
|
|
||||||
match ev with
|
|
||||||
| Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name
|
|
||||||
| Core_ext.Extension_set_process_name name -> on_name_process_ self name
|
|
||||||
| _ -> ()
|
|
||||||
end
|
|
||||||
|
|
||||||
let sub_callbacks : _ Sub.Callbacks.t =
|
|
||||||
Sub.Callbacks.make ~new_span_id ~new_trace_id ~on_init ~on_shutdown
|
|
||||||
~on_enter_span ~on_exit_span ~on_message ~on_counter ~on_extension_event ()
|
|
||||||
|
|
||||||
let subscriber (self : t) : Sub.t =
|
|
||||||
Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks }
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
open Trace_core
|
open Trace_core
|
||||||
module Subscriber = Subscriber
|
module Collector_tef = Collector_tef
|
||||||
module Exporter = Exporter
|
module Exporter = Exporter
|
||||||
module Writer = Writer
|
module Writer = Writer
|
||||||
|
module Types = Types
|
||||||
|
|
||||||
let block_signals () =
|
let block_signals () =
|
||||||
try
|
try
|
||||||
|
|
@ -21,11 +22,11 @@ let block_signals () =
|
||||||
|
|
||||||
(** Thread that simply regularly "ticks", sending events to the background
|
(** Thread that simply regularly "ticks", sending events to the background
|
||||||
thread so it has a chance to write to the file *)
|
thread so it has a chance to write to the file *)
|
||||||
let tick_thread (sub : Subscriber.t) : unit =
|
let tick_thread (c : Collector_tef.t) : unit =
|
||||||
block_signals ();
|
block_signals ();
|
||||||
while Subscriber.active sub do
|
while Collector_tef.active c do
|
||||||
Thread.delay 0.5;
|
Thread.delay 0.5;
|
||||||
Subscriber.flush sub
|
Collector_tef.flush c
|
||||||
done
|
done
|
||||||
|
|
||||||
type output =
|
type output =
|
||||||
|
|
@ -34,8 +35,8 @@ type output =
|
||||||
| `File of string
|
| `File of string
|
||||||
]
|
]
|
||||||
|
|
||||||
let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () :
|
let collector_ ~(finally : unit -> unit) ~out ~(mode : [ `Single | `Jsonl ]) ()
|
||||||
Trace_subscriber.t =
|
: Collector.t =
|
||||||
let jsonl = mode = `Jsonl in
|
let jsonl = mode = `Jsonl in
|
||||||
let oc, must_close =
|
let oc, must_close =
|
||||||
match out with
|
match out with
|
||||||
|
|
@ -46,12 +47,7 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () :
|
||||||
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
|
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
|
||||||
| `Output oc -> oc, false
|
| `Output oc -> oc, false
|
||||||
in
|
in
|
||||||
let pid =
|
let pid = Trace_util.Mock_.get_pid () in
|
||||||
if !Trace_subscriber.Private_.mock then
|
|
||||||
2
|
|
||||||
else
|
|
||||||
Unix.getpid ()
|
|
||||||
in
|
|
||||||
|
|
||||||
let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in
|
let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in
|
||||||
let exporter =
|
let exporter =
|
||||||
|
|
@ -63,17 +59,9 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () :
|
||||||
finally ());
|
finally ());
|
||||||
}
|
}
|
||||||
in
|
in
|
||||||
let sub = Subscriber.create ~pid ~exporter () in
|
let coll_st = Collector_tef.create ~pid ~exporter () in
|
||||||
let _t_tick : Thread.t = Thread.create tick_thread sub in
|
let _t_tick : Thread.t = Thread.create tick_thread coll_st in
|
||||||
Subscriber.subscriber sub
|
Collector_tef.collector coll_st
|
||||||
|
|
||||||
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
|
||||||
: collector =
|
|
||||||
let sub = subscriber_ ~finally ~mode ~out () in
|
|
||||||
Trace_subscriber.collector sub
|
|
||||||
|
|
||||||
let[@inline] subscriber ~out () : Trace_subscriber.t =
|
|
||||||
subscriber_ ~finally:ignore ~mode:`Single ~out ()
|
|
||||||
|
|
||||||
let[@inline] collector ~out () : collector =
|
let[@inline] collector ~out () : collector =
|
||||||
collector_ ~finally:ignore ~mode:`Single ~out ()
|
collector_ ~finally:ignore ~mode:`Single ~out ()
|
||||||
|
|
@ -111,29 +99,12 @@ let with_setup ?out () f =
|
||||||
setup ?out ();
|
setup ?out ();
|
||||||
Fun.protect ~finally:Trace_core.shutdown f
|
Fun.protect ~finally:Trace_core.shutdown f
|
||||||
|
|
||||||
module Mock_ = struct
|
|
||||||
let now = ref 0
|
|
||||||
|
|
||||||
(* used to mock timing *)
|
|
||||||
let get_now_ns () : int64 =
|
|
||||||
let x = !now in
|
|
||||||
incr now;
|
|
||||||
Int64.(mul (of_int x) 1000L)
|
|
||||||
|
|
||||||
let get_tid_ () : int = 3
|
|
||||||
end
|
|
||||||
|
|
||||||
module Private_ = struct
|
module Private_ = struct
|
||||||
let mock_all_ () =
|
let mock_all_ () =
|
||||||
Trace_subscriber.Private_.mock := true;
|
Trace_util.Mock_.mock_all ();
|
||||||
Trace_subscriber.Private_.get_now_ns_ := Mock_.get_now_ns;
|
|
||||||
Trace_subscriber.Private_.get_tid_ := Mock_.get_tid_;
|
|
||||||
()
|
()
|
||||||
|
|
||||||
let on_tracing_error = Subscriber.on_tracing_error
|
let on_tracing_error = Collector_tef.on_tracing_error
|
||||||
|
|
||||||
let subscriber_jsonl ~finally ~out () =
|
|
||||||
subscriber_ ~finally ~mode:`Jsonl ~out ()
|
|
||||||
|
|
||||||
let collector_jsonl ~finally ~out () : collector =
|
let collector_jsonl ~finally ~out () : collector =
|
||||||
collector_ ~finally ~mode:`Jsonl ~out ()
|
collector_ ~finally ~mode:`Jsonl ~out ()
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
module Subscriber = Subscriber
|
module Collector_tef = Collector_tef
|
||||||
module Exporter = Exporter
|
module Exporter = Exporter
|
||||||
module Writer = Writer
|
module Writer = Writer
|
||||||
|
module Types = Types
|
||||||
|
|
||||||
type output =
|
type output =
|
||||||
[ `Stdout
|
[ `Stdout
|
||||||
|
|
@ -14,10 +15,6 @@ type output =
|
||||||
- [`File "foo"] will enable tracing and print events into file named "foo"
|
- [`File "foo"] will enable tracing and print events into file named "foo"
|
||||||
*)
|
*)
|
||||||
|
|
||||||
val subscriber : out:[< output ] -> unit -> Trace_subscriber.t
|
|
||||||
(** A subscriber emitting TEF traces into [out].
|
|
||||||
@since 0.8 *)
|
|
||||||
|
|
||||||
val collector : out:[< output ] -> unit -> Trace_core.collector
|
val collector : out:[< output ] -> unit -> Trace_core.collector
|
||||||
(** Make a collector that writes into the given output. See {!setup} for more
|
(** Make a collector that writes into the given output. See {!setup} for more
|
||||||
details. *)
|
details. *)
|
||||||
|
|
@ -48,12 +45,6 @@ module Private_ : sig
|
||||||
|
|
||||||
val on_tracing_error : (string -> unit) ref
|
val on_tracing_error : (string -> unit) ref
|
||||||
|
|
||||||
val subscriber_jsonl :
|
|
||||||
finally:(unit -> unit) ->
|
|
||||||
out:[ `File_append of string | `Output of out_channel ] ->
|
|
||||||
unit ->
|
|
||||||
Trace_subscriber.t
|
|
||||||
|
|
||||||
val collector_jsonl :
|
val collector_jsonl :
|
||||||
finally:(unit -> unit) ->
|
finally:(unit -> unit) ->
|
||||||
out:[ `File_append of string | `Output of out_channel ] ->
|
out:[ `File_append of string | `Output of out_channel ] ->
|
||||||
|
|
|
||||||
20
src/tef/types.ml
Normal file
20
src/tef/types.ml
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
open Trace_core
|
||||||
|
module Trace_id = Trace_util.Trace_id64
|
||||||
|
|
||||||
|
type trace_id = Trace_id.t
|
||||||
|
|
||||||
|
type Trace_core.span +=
|
||||||
|
| Span_tef_sync of {
|
||||||
|
pid: int;
|
||||||
|
tid: int;
|
||||||
|
name: string;
|
||||||
|
start_us: float;
|
||||||
|
mutable args: (string * Trace_core.user_data) list;
|
||||||
|
}
|
||||||
|
| Span_tef_async of {
|
||||||
|
pid: int;
|
||||||
|
tid: int;
|
||||||
|
name: string;
|
||||||
|
trace_id: trace_id;
|
||||||
|
mutable args: (string * Trace_core.user_data) list;
|
||||||
|
}
|
||||||
|
|
@ -48,23 +48,17 @@ let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit =
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args
|
args
|
||||||
|
|
||||||
let emit_begin ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit =
|
let emit_begin_async ~pid ~tid ~name ~trace_id ~ts ~args buf =
|
||||||
Printf.bprintf buf
|
Printf.bprintf buf
|
||||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||||
pid id tid ts str_val name
|
pid trace_id tid ts str_val name 'b'
|
||||||
(match flavor with
|
|
||||||
| `Async -> 'b'
|
|
||||||
| `Sync -> 'B')
|
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args
|
args
|
||||||
|
|
||||||
let emit_end ~pid ~tid ~name ~(id : int64) ~ts ~args ~flavor buf : unit =
|
let emit_end_async ~pid ~tid ~name ~(trace_id : int64) ~ts ~args buf : unit =
|
||||||
Printf.bprintf buf
|
Printf.bprintf buf
|
||||||
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
{json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||||
pid id tid ts str_val name
|
pid trace_id tid ts str_val name 'e'
|
||||||
(match flavor with
|
|
||||||
| `Async -> 'e'
|
|
||||||
| `Sync -> 'E')
|
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args
|
args
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
raw event data. *)
|
raw event data. *)
|
||||||
|
|
||||||
open Common_
|
open Common_
|
||||||
|
open Types
|
||||||
|
|
||||||
val emit_duration_event :
|
val emit_duration_event :
|
||||||
pid:int ->
|
pid:int ->
|
||||||
|
|
@ -15,25 +16,23 @@ val emit_duration_event :
|
||||||
Buffer.t ->
|
Buffer.t ->
|
||||||
unit
|
unit
|
||||||
|
|
||||||
val emit_begin :
|
val emit_begin_async :
|
||||||
pid:int ->
|
pid:int ->
|
||||||
tid:int ->
|
tid:int ->
|
||||||
name:string ->
|
name:string ->
|
||||||
id:span_id ->
|
trace_id:trace_id ->
|
||||||
ts:float ->
|
ts:float ->
|
||||||
args:(string * Trace_core.user_data) list ->
|
args:(string * Trace_core.user_data) list ->
|
||||||
flavor:[ `Sync | `Async ] ->
|
|
||||||
Buffer.t ->
|
Buffer.t ->
|
||||||
unit
|
unit
|
||||||
|
|
||||||
val emit_end :
|
val emit_end_async :
|
||||||
pid:int ->
|
pid:int ->
|
||||||
tid:int ->
|
tid:int ->
|
||||||
name:string ->
|
name:string ->
|
||||||
id:span_id ->
|
trace_id:trace_id ->
|
||||||
ts:float ->
|
ts:float ->
|
||||||
args:(string * Trace_core.user_data) list ->
|
args:(string * Trace_core.user_data) list ->
|
||||||
flavor:[ `Sync | `Async ] ->
|
|
||||||
Buffer.t ->
|
Buffer.t ->
|
||||||
unit
|
unit
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue