mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-08 03:47:57 -04:00
wip: trace-subscriber package
This commit is contained in:
parent
57aec09be9
commit
bebd037803
12 changed files with 566 additions and 178 deletions
88
dune-project
88
dune-project
|
|
@ -1,26 +1,34 @@
|
||||||
(lang dune 2.9)
|
(lang dune 2.9)
|
||||||
|
|
||||||
(name trace)
|
(name trace)
|
||||||
|
|
||||||
(generate_opam_files true)
|
(generate_opam_files true)
|
||||||
|
|
||||||
(version 0.7)
|
(version 0.7)
|
||||||
|
|
||||||
(source
|
(source
|
||||||
(github c-cube/ocaml-trace))
|
(github c-cube/ocaml-trace))
|
||||||
|
|
||||||
(authors "Simon Cruanes")
|
(authors "Simon Cruanes")
|
||||||
|
|
||||||
(maintainers "Simon Cruanes")
|
(maintainers "Simon Cruanes")
|
||||||
|
|
||||||
(license MIT)
|
(license MIT)
|
||||||
|
|
||||||
;(documentation https://url/to/documentation)
|
;(documentation https://url/to/documentation)
|
||||||
|
|
||||||
(package
|
(package
|
||||||
(name trace)
|
(name trace)
|
||||||
(synopsis "A stub for tracing/observability, agnostic in how data is collected")
|
(synopsis
|
||||||
|
"A stub for tracing/observability, agnostic in how data is collected")
|
||||||
(depends
|
(depends
|
||||||
(ocaml (>= 4.08))
|
(ocaml
|
||||||
dune)
|
(>= 4.08))
|
||||||
|
dune)
|
||||||
(depopts
|
(depopts
|
||||||
hmap
|
hmap
|
||||||
(mtime (>= 2.0)))
|
(mtime
|
||||||
|
(>= 2.0)))
|
||||||
(tags
|
(tags
|
||||||
(trace tracing observability profiling)))
|
(trace tracing observability profiling)))
|
||||||
|
|
||||||
|
|
@ -28,37 +36,67 @@
|
||||||
(name ppx_trace)
|
(name ppx_trace)
|
||||||
(synopsis "A ppx-based preprocessor for trace")
|
(synopsis "A ppx-based preprocessor for trace")
|
||||||
(depends
|
(depends
|
||||||
(ocaml (>= 4.12)) ; we use __FUNCTION__
|
(ocaml
|
||||||
(ppxlib (>= 0.28))
|
(>= 4.12)) ; we use __FUNCTION__
|
||||||
(trace (= :version))
|
(ppxlib
|
||||||
(trace-tef (and (= :version) :with-test))
|
(>= 0.28))
|
||||||
dune)
|
(trace
|
||||||
|
(= :version))
|
||||||
|
(trace-tef
|
||||||
|
(and
|
||||||
|
(= :version)
|
||||||
|
:with-test))
|
||||||
|
dune)
|
||||||
(tags
|
(tags
|
||||||
(trace ppx)))
|
(trace ppx)))
|
||||||
|
|
||||||
(package
|
(package
|
||||||
(name trace-tef)
|
(name trace-subscriber)
|
||||||
(synopsis "A simple backend for trace, emitting Catapult/TEF JSON into a file")
|
(synopsis "Generic subscriber system for `trace`")
|
||||||
(depends
|
(depends
|
||||||
(ocaml (>= 4.08))
|
(ocaml
|
||||||
(trace (= :version))
|
(>= 4.08))
|
||||||
(mtime (>= 2.0))
|
(trace
|
||||||
base-unix
|
(= :version))
|
||||||
dune)
|
(mtime
|
||||||
|
(>= 2.0)))
|
||||||
|
(tags
|
||||||
|
(trace subscriber)))
|
||||||
|
|
||||||
|
(package
|
||||||
|
(name trace-tef)
|
||||||
|
(synopsis
|
||||||
|
"A simple backend for trace, emitting Catapult/TEF JSON into a file")
|
||||||
|
(depends
|
||||||
|
(ocaml
|
||||||
|
(>= 4.08))
|
||||||
|
(trace
|
||||||
|
(= :version))
|
||||||
|
(trace-subscriber
|
||||||
|
(= :version))
|
||||||
|
(mtime
|
||||||
|
(>= 2.0))
|
||||||
|
base-unix
|
||||||
|
dune)
|
||||||
(tags
|
(tags
|
||||||
(trace tracing catapult TEF chrome-format)))
|
(trace tracing catapult TEF chrome-format)))
|
||||||
|
|
||||||
(package
|
(package
|
||||||
(name trace-fuchsia)
|
(name trace-fuchsia)
|
||||||
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
|
(synopsis
|
||||||
|
"A high-performance backend for trace, emitting a Fuchsia trace into a file")
|
||||||
(depends
|
(depends
|
||||||
(ocaml (>= 4.08))
|
(ocaml
|
||||||
(trace (= :version))
|
(>= 4.08))
|
||||||
(mtime (>= 2.0))
|
(trace
|
||||||
(thread-local-storage (>= 0.2))
|
(= :version))
|
||||||
base-bigarray
|
(mtime
|
||||||
base-unix
|
(>= 2.0))
|
||||||
dune)
|
(thread-local-storage
|
||||||
|
(>= 0.2))
|
||||||
|
base-bigarray
|
||||||
|
base-unix
|
||||||
|
dune)
|
||||||
(tags
|
(tags
|
||||||
(trace tracing fuchsia)))
|
(trace tracing fuchsia)))
|
||||||
|
|
||||||
|
|
|
||||||
121
src/subscriber/callbacks.ml
Normal file
121
src/subscriber/callbacks.ml
Normal file
|
|
@ -0,0 +1,121 @@
|
||||||
|
open Trace_core
|
||||||
|
|
||||||
|
module type S = sig
|
||||||
|
type st
|
||||||
|
|
||||||
|
val on_init : st -> time_ns:float -> unit
|
||||||
|
(** Called when the subscriber is initialized in a collector *)
|
||||||
|
|
||||||
|
val on_shutdown : st -> time_ns:float -> unit
|
||||||
|
(** Called when the collector is shutdown *)
|
||||||
|
|
||||||
|
val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit
|
||||||
|
(** Current thread is being named *)
|
||||||
|
|
||||||
|
val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit
|
||||||
|
(** Current process is being named *)
|
||||||
|
|
||||||
|
val on_enter_span :
|
||||||
|
st ->
|
||||||
|
__FUNCTION__:string option ->
|
||||||
|
__FILE__:string ->
|
||||||
|
__LINE__:int ->
|
||||||
|
time_ns:float ->
|
||||||
|
tid:int ->
|
||||||
|
data:(string * user_data) list ->
|
||||||
|
name:string ->
|
||||||
|
span ->
|
||||||
|
unit
|
||||||
|
(** Enter a regular (sync) span *)
|
||||||
|
|
||||||
|
val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit
|
||||||
|
(** Exit a span. This and [on_enter_span] must follow strict stack discipline *)
|
||||||
|
|
||||||
|
val on_add_data : st -> data:(string * user_data) list -> span -> unit
|
||||||
|
(** Add data to a regular span (which must be active) *)
|
||||||
|
|
||||||
|
val on_message :
|
||||||
|
st ->
|
||||||
|
time_ns:float ->
|
||||||
|
tid:int ->
|
||||||
|
span:span option ->
|
||||||
|
data:(string * user_data) list ->
|
||||||
|
string ->
|
||||||
|
unit
|
||||||
|
(** Emit a log message *)
|
||||||
|
|
||||||
|
val on_counter :
|
||||||
|
st ->
|
||||||
|
time_ns:float ->
|
||||||
|
tid:int ->
|
||||||
|
data:(string * user_data) list ->
|
||||||
|
name:string ->
|
||||||
|
float ->
|
||||||
|
unit
|
||||||
|
(** Emit the current value of a counter *)
|
||||||
|
|
||||||
|
val on_enter_manual_span :
|
||||||
|
st ->
|
||||||
|
__FUNCTION__:string option ->
|
||||||
|
__FILE__:string ->
|
||||||
|
__LINE__:int ->
|
||||||
|
time_ns:float ->
|
||||||
|
tid:int ->
|
||||||
|
parent:explicit_span option ->
|
||||||
|
data:(string * user_data) list ->
|
||||||
|
name:string ->
|
||||||
|
flavor:[ `Sync | `Async ] option ->
|
||||||
|
trace_id:int ->
|
||||||
|
span ->
|
||||||
|
unit
|
||||||
|
(** Enter a manual (possibly async) span *)
|
||||||
|
|
||||||
|
val on_exit_manual_span :
|
||||||
|
st ->
|
||||||
|
time_ns:float ->
|
||||||
|
tid:int ->
|
||||||
|
name:string ->
|
||||||
|
data:(string * user_data) list ->
|
||||||
|
flavor:[ `Sync | `Async ] option ->
|
||||||
|
trace_id:int ->
|
||||||
|
span ->
|
||||||
|
unit
|
||||||
|
(** Exit a manual span *)
|
||||||
|
end
|
||||||
|
|
||||||
|
type 'st t = (module S with type st = 'st)
|
||||||
|
|
||||||
|
(** Callbacks for a subscriber *)
|
||||||
|
|
||||||
|
(** Dummy callbacks *)
|
||||||
|
module Dummy = struct
|
||||||
|
let on_init _ ~time_ns:_ = ()
|
||||||
|
let on_shutdown _ ~time_ns:_ = ()
|
||||||
|
let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = ()
|
||||||
|
let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = ()
|
||||||
|
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = ()
|
||||||
|
let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = ()
|
||||||
|
|
||||||
|
let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ ~tid:_
|
||||||
|
~data:_ ~name:_ _sp =
|
||||||
|
()
|
||||||
|
|
||||||
|
let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
|
||||||
|
let on_add_data _ ~data:_ _sp = ()
|
||||||
|
|
||||||
|
let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_
|
||||||
|
~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp =
|
||||||
|
()
|
||||||
|
|
||||||
|
let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_
|
||||||
|
~trace_id:_ _ =
|
||||||
|
()
|
||||||
|
end
|
||||||
|
|
||||||
|
let dummy (type st) () : st t =
|
||||||
|
let module M = struct
|
||||||
|
type nonrec st = st
|
||||||
|
|
||||||
|
include Dummy
|
||||||
|
end in
|
||||||
|
(module M)
|
||||||
6
src/subscriber/dune
Normal file
6
src/subscriber/dune
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
|
||||||
|
(library
|
||||||
|
(name trace_subscriber)
|
||||||
|
(public_name trace-subscriber)
|
||||||
|
(libraries (re_export trace.core) mtime mtime.clock.os threads))
|
||||||
|
|
||||||
49
src/subscriber/subscriber.ml
Normal file
49
src/subscriber/subscriber.ml
Normal file
|
|
@ -0,0 +1,49 @@
|
||||||
|
(** A trace subscriber *)
|
||||||
|
type t =
|
||||||
|
| Sub : {
|
||||||
|
st: 'st;
|
||||||
|
callbacks: 'st Callbacks.t;
|
||||||
|
}
|
||||||
|
-> t
|
||||||
|
|
||||||
|
let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }
|
||||||
|
|
||||||
|
(* TODO:
|
||||||
|
let multiplex (l : t list) : t =
|
||||||
|
match l with
|
||||||
|
| [] -> dummy
|
||||||
|
| [ s ] -> s
|
||||||
|
| _ ->
|
||||||
|
let module M = struct
|
||||||
|
type st = t list
|
||||||
|
|
||||||
|
let on_init l ~time_ns =
|
||||||
|
List.iter
|
||||||
|
(fun (Sub { st; callbacks = (module CB) }) -> CB.on_init st ~time_ns)
|
||||||
|
l
|
||||||
|
|
||||||
|
let on_shutdown _ ~time_ns:_ = ()
|
||||||
|
let on_tick _ = ()
|
||||||
|
let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = ()
|
||||||
|
let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = ()
|
||||||
|
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = ()
|
||||||
|
let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = ()
|
||||||
|
|
||||||
|
let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_
|
||||||
|
~tid:_ ~data:_ ~name:_ _sp =
|
||||||
|
()
|
||||||
|
|
||||||
|
let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
|
||||||
|
let on_add_data _ ~data:_ _sp = ()
|
||||||
|
|
||||||
|
let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_
|
||||||
|
~time_ns:_ ~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp
|
||||||
|
=
|
||||||
|
()
|
||||||
|
|
||||||
|
let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_
|
||||||
|
~trace_id:_ _ =
|
||||||
|
()
|
||||||
|
end in
|
||||||
|
Sub { st = l; callbacks = (module M) }
|
||||||
|
*)
|
||||||
157
src/subscriber/trace_subscriber.ml
Normal file
157
src/subscriber/trace_subscriber.ml
Normal file
|
|
@ -0,0 +1,157 @@
|
||||||
|
open Trace_core
|
||||||
|
module Callbacks = Callbacks
|
||||||
|
module Subscriber = Subscriber
|
||||||
|
|
||||||
|
type t = Subscriber.t
|
||||||
|
|
||||||
|
module Private_ = struct
|
||||||
|
let get_now_ns_ = ref None
|
||||||
|
let get_tid_ = ref None
|
||||||
|
|
||||||
|
(** Now, in nanoseconds *)
|
||||||
|
let[@inline] now_ns () : float =
|
||||||
|
match !get_now_ns_ with
|
||||||
|
| Some f -> f ()
|
||||||
|
| None ->
|
||||||
|
let t = Mtime_clock.now () in
|
||||||
|
Int64.to_float (Mtime.to_uint64_ns t)
|
||||||
|
|
||||||
|
let[@inline] tid_ () : int =
|
||||||
|
match !get_tid_ with
|
||||||
|
| Some f -> f ()
|
||||||
|
| None -> Thread.id (Thread.self ())
|
||||||
|
end
|
||||||
|
|
||||||
|
open struct
|
||||||
|
module A = Trace_core.Internal_.Atomic_
|
||||||
|
|
||||||
|
type manual_span_info = {
|
||||||
|
name: string;
|
||||||
|
flavor: [ `Sync | `Async ] option;
|
||||||
|
mutable data: (string * user_data) list;
|
||||||
|
}
|
||||||
|
|
||||||
|
(** Key used to carry some information between begin and end of
|
||||||
|
manual spans, by way of the meta map *)
|
||||||
|
let key_manual_info : manual_span_info Meta_map.key = Meta_map.Key.create ()
|
||||||
|
|
||||||
|
(** key used to carry a unique "id" for all spans in an async context *)
|
||||||
|
let key_async_trace_id : int Meta_map.key = Meta_map.Key.create ()
|
||||||
|
end
|
||||||
|
|
||||||
|
(** A collector that calls the callbacks of subscriber *)
|
||||||
|
let collector (Sub { st; callbacks = (module CB) } : Subscriber.t) : collector =
|
||||||
|
let open Private_ in
|
||||||
|
let module M = struct
|
||||||
|
let trace_id_gen_ = A.make 0
|
||||||
|
|
||||||
|
(** generator for span ids *)
|
||||||
|
let new_span_ : unit -> int =
|
||||||
|
let span_id_gen_ = A.make 0 in
|
||||||
|
fun [@inline] () -> A.fetch_and_add span_id_gen_ 1
|
||||||
|
|
||||||
|
let enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : span =
|
||||||
|
let span = Int64.of_int (new_span_ ()) in
|
||||||
|
let tid = tid_ () in
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
CB.on_enter_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
|
||||||
|
~name span;
|
||||||
|
span
|
||||||
|
|
||||||
|
let exit_span span : unit =
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
let tid = tid_ () in
|
||||||
|
CB.on_exit_span st ~time_ns ~tid span
|
||||||
|
|
||||||
|
let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name f =
|
||||||
|
let span = enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name in
|
||||||
|
try
|
||||||
|
let x = f span in
|
||||||
|
exit_span span;
|
||||||
|
x
|
||||||
|
with exn ->
|
||||||
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
|
exit_span span;
|
||||||
|
Printexc.raise_with_backtrace exn bt
|
||||||
|
|
||||||
|
let add_data_to_span span data =
|
||||||
|
if data <> [] then CB.on_add_data st ~data span
|
||||||
|
|
||||||
|
let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__
|
||||||
|
~__FILE__ ~__LINE__ ~data name : explicit_span =
|
||||||
|
let span = Int64.of_int (new_span_ ()) in
|
||||||
|
let tid = tid_ () in
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
|
||||||
|
(* get the common trace id, or make a new one *)
|
||||||
|
let trace_id =
|
||||||
|
match parent with
|
||||||
|
| Some m -> Meta_map.find_exn key_async_trace_id m.meta
|
||||||
|
| None -> A.fetch_and_add trace_id_gen_ 1
|
||||||
|
in
|
||||||
|
|
||||||
|
CB.on_enter_manual_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~parent ~data
|
||||||
|
~time_ns ~tid ~name ~flavor ~trace_id span;
|
||||||
|
let meta =
|
||||||
|
Meta_map.empty
|
||||||
|
|> Meta_map.add key_manual_info { name; flavor; data = [] }
|
||||||
|
|> Meta_map.add key_async_trace_id trace_id
|
||||||
|
in
|
||||||
|
{ span; meta }
|
||||||
|
|
||||||
|
let exit_manual_span (es : explicit_span) : unit =
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
let tid = tid_ () in
|
||||||
|
let trace_id =
|
||||||
|
match Meta_map.find key_async_trace_id es.meta with
|
||||||
|
| None -> assert false
|
||||||
|
| Some id -> id
|
||||||
|
in
|
||||||
|
let minfo =
|
||||||
|
match Meta_map.find key_manual_info es.meta with
|
||||||
|
| None -> assert false
|
||||||
|
| Some m -> m
|
||||||
|
in
|
||||||
|
CB.on_exit_manual_span st ~tid ~time_ns ~data:minfo.data ~name:minfo.name
|
||||||
|
~flavor:minfo.flavor ~trace_id es.span
|
||||||
|
|
||||||
|
let add_data_to_manual_span (es : explicit_span) data =
|
||||||
|
if data <> [] then (
|
||||||
|
match Meta_map.find key_manual_info es.meta with
|
||||||
|
| None -> assert false
|
||||||
|
| Some m -> m.data <- List.rev_append data m.data
|
||||||
|
)
|
||||||
|
|
||||||
|
let message ?span ~data msg : unit =
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
let tid = tid_ () in
|
||||||
|
CB.on_message st ~time_ns ~tid ~span ~data msg
|
||||||
|
|
||||||
|
let counter_float ~data name f : unit =
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
let tid = tid_ () in
|
||||||
|
CB.on_counter st ~tid ~time_ns ~data ~name f
|
||||||
|
|
||||||
|
let[@inline] counter_int ~data name i =
|
||||||
|
counter_float ~data name (float_of_int i)
|
||||||
|
|
||||||
|
let name_process name : unit =
|
||||||
|
let tid = tid_ () in
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
CB.on_name_process st ~time_ns ~tid ~name
|
||||||
|
|
||||||
|
let name_thread name : unit =
|
||||||
|
let tid = tid_ () in
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
CB.on_name_thread st ~time_ns ~tid ~name
|
||||||
|
|
||||||
|
let shutdown () =
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
CB.on_shutdown st ~time_ns
|
||||||
|
|
||||||
|
let () =
|
||||||
|
(* init code *)
|
||||||
|
let time_ns = now_ns () in
|
||||||
|
CB.on_init st ~time_ns
|
||||||
|
end in
|
||||||
|
(module M)
|
||||||
29
src/subscriber/trace_subscriber.mli
Normal file
29
src/subscriber/trace_subscriber.mli
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
(** Generic subscribers.
|
||||||
|
|
||||||
|
This defines the notion of a {b subscriber},
|
||||||
|
a set of callbacks for every trace event.
|
||||||
|
It also defines a collector that needs to be installed
|
||||||
|
for the subscriber(s) to be called.
|
||||||
|
|
||||||
|
@since NEXT_RELEASE
|
||||||
|
*)
|
||||||
|
|
||||||
|
module Callbacks = Callbacks
|
||||||
|
module Subscriber = Subscriber
|
||||||
|
|
||||||
|
type t = Subscriber.t
|
||||||
|
|
||||||
|
val collector : t -> Trace_core.collector
|
||||||
|
|
||||||
|
(**/**)
|
||||||
|
|
||||||
|
(**/*)
|
||||||
|
module Private_ : sig
|
||||||
|
val get_now_ns_ : (unit -> float) option ref
|
||||||
|
(** The callback used to get the current timestamp *)
|
||||||
|
|
||||||
|
val get_tid_ : (unit -> int) option ref
|
||||||
|
(** The callback used to get the current thread's id *)
|
||||||
|
|
||||||
|
val now_ns : unit -> float
|
||||||
|
end
|
||||||
17
src/tef/dune
17
src/tef/dune
|
|
@ -1,6 +1,13 @@
|
||||||
|
|
||||||
(library
|
(library
|
||||||
(name trace_tef)
|
(name trace_tef)
|
||||||
(public_name trace-tef)
|
(public_name trace-tef)
|
||||||
(synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process")
|
(synopsis
|
||||||
(libraries trace.core trace.private.util mtime mtime.clock.os unix threads))
|
"Simple and lightweight tracing using TEF/Catapult format, in-process")
|
||||||
|
(libraries
|
||||||
|
trace.core
|
||||||
|
trace.private.util
|
||||||
|
trace-subscriber
|
||||||
|
mtime
|
||||||
|
mtime.clock.os
|
||||||
|
unix
|
||||||
|
threads))
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
open Trace_core
|
open Trace_core
|
||||||
|
|
||||||
|
(** An event, specialized for TEF *)
|
||||||
type t =
|
type t =
|
||||||
| E_tick
|
| E_tick
|
||||||
| E_message of {
|
| E_message of {
|
||||||
|
|
|
||||||
|
|
@ -1,31 +1,24 @@
|
||||||
open Trace_core
|
open Trace_core
|
||||||
open Trace_private_util
|
open Trace_private_util
|
||||||
open Event
|
open Event
|
||||||
|
module Sub = Trace_subscriber
|
||||||
module A = Trace_core.Internal_.Atomic_
|
module A = Trace_core.Internal_.Atomic_
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
|
||||||
|
|
||||||
module Mock_ = struct
|
module Mock_ = struct
|
||||||
let enabled = ref false
|
let enabled = ref false
|
||||||
let now = ref 0
|
let now = ref 0
|
||||||
|
|
||||||
let[@inline never] now_us () : float =
|
(* used to mock timing *)
|
||||||
|
let get_now_ns () : float =
|
||||||
let x = !now in
|
let x = !now in
|
||||||
incr now;
|
incr now;
|
||||||
float_of_int x
|
float_of_int x *. 1000.
|
||||||
|
|
||||||
|
let get_tid_ () : int = 3
|
||||||
end
|
end
|
||||||
|
|
||||||
(** Now, in microseconds *)
|
|
||||||
let[@inline] now_us () : float =
|
|
||||||
if !Mock_.enabled then
|
|
||||||
Mock_.now_us ()
|
|
||||||
else (
|
|
||||||
let t = Mtime_clock.now () in
|
|
||||||
Int64.to_float (Mtime.to_uint64_ns t) /. 1e3
|
|
||||||
)
|
|
||||||
|
|
||||||
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
|
|
||||||
|
|
||||||
module Span_tbl = Hashtbl.Make (struct
|
module Span_tbl = Hashtbl.Make (struct
|
||||||
include Int64
|
include Int64
|
||||||
|
|
||||||
|
|
@ -39,15 +32,6 @@ type span_info = {
|
||||||
mutable data: (string * user_data) list;
|
mutable data: (string * user_data) list;
|
||||||
}
|
}
|
||||||
|
|
||||||
(** key used to carry a unique "id" for all spans in an async context *)
|
|
||||||
let key_async_id : int Meta_map.key = Meta_map.Key.create ()
|
|
||||||
|
|
||||||
let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.key =
|
|
||||||
Meta_map.Key.create ()
|
|
||||||
|
|
||||||
let key_data : (string * user_data) list ref Meta_map.key =
|
|
||||||
Meta_map.Key.create ()
|
|
||||||
|
|
||||||
(** Writer: knows how to write entries to a file in TEF format *)
|
(** Writer: knows how to write entries to a file in TEF format *)
|
||||||
module Writer = struct
|
module Writer = struct
|
||||||
type t = {
|
type t = {
|
||||||
|
|
@ -280,7 +264,8 @@ let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit =
|
||||||
(* write a message about us closing *)
|
(* write a message about us closing *)
|
||||||
Writer.emit_instant_event ~name:"tef-worker.exit"
|
Writer.emit_instant_event ~name:"tef-worker.exit"
|
||||||
~tid:(Thread.id @@ Thread.self ())
|
~tid:(Thread.id @@ Thread.self ())
|
||||||
~ts:(now_us ()) ~args:[] writer;
|
~ts:(Sub.Private_.now_ns () *. 1e-3)
|
||||||
|
~args:[] writer;
|
||||||
|
|
||||||
(* warn if app didn't close all spans *)
|
(* warn if app didn't close all spans *)
|
||||||
if Span_tbl.length spans > 0 then
|
if Span_tbl.length spans > 0 then
|
||||||
|
|
@ -304,138 +289,90 @@ type output =
|
||||||
| `File of string
|
| `File of string
|
||||||
]
|
]
|
||||||
|
|
||||||
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
module Internal_st = struct
|
||||||
: collector =
|
type t = {
|
||||||
let module M = struct
|
active: bool A.t;
|
||||||
let active = A.make true
|
events: Event.t B_queue.t;
|
||||||
|
t_write: Thread.t;
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
(** generator for span ids *)
|
let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t =
|
||||||
let span_id_gen_ = A.make 0
|
let module M : Sub.Callbacks.S with type st = Internal_st.t = struct
|
||||||
|
type st = Internal_st.t
|
||||||
|
|
||||||
(* queue of messages to write *)
|
let on_init _ ~time_ns:_ = ()
|
||||||
let events : Event.t B_queue.t = B_queue.create ()
|
|
||||||
|
|
||||||
(** writer thread. It receives events and writes them to [oc]. *)
|
let on_shutdown (self : st) ~time_ns:_ =
|
||||||
let t_write : Thread.t =
|
if A.exchange self.active false then (
|
||||||
Thread.create
|
B_queue.close self.events;
|
||||||
(fun () ->
|
|
||||||
let@ () = Fun.protect ~finally in
|
|
||||||
bg_thread ~mode ~out events)
|
|
||||||
()
|
|
||||||
|
|
||||||
(** ticker thread, regularly sends a message to the writer thread.
|
|
||||||
no need to join it. *)
|
|
||||||
let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) ()
|
|
||||||
|
|
||||||
let shutdown () =
|
|
||||||
if A.exchange active false then (
|
|
||||||
B_queue.close events;
|
|
||||||
(* wait for writer thread to be done. The writer thread will exit
|
(* wait for writer thread to be done. The writer thread will exit
|
||||||
after processing remaining events because the queue is now closed *)
|
after processing remaining events because the queue is now closed *)
|
||||||
Thread.join t_write
|
Thread.join self.t_write
|
||||||
)
|
)
|
||||||
|
|
||||||
let get_tid_ () : int =
|
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
||||||
if !Mock_.enabled then
|
B_queue.push self.events @@ E_name_process { name }
|
||||||
3
|
|
||||||
else
|
|
||||||
Thread.id (Thread.self ())
|
|
||||||
|
|
||||||
let[@inline] enter_span_ ~fun_name ~data name : span =
|
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
||||||
let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in
|
B_queue.push self.events @@ E_name_thread { tid; name }
|
||||||
let tid = get_tid_ () in
|
|
||||||
let time_us = now_us () in
|
|
||||||
B_queue.push events
|
|
||||||
(E_define_span { tid; name; time_us; id = span; fun_name; data });
|
|
||||||
span
|
|
||||||
|
|
||||||
let enter_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name :
|
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||||
span =
|
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
||||||
enter_span_ ~fun_name ~data name
|
let time_us = time_ns *. 1e-3 in
|
||||||
|
B_queue.push self.events
|
||||||
|
@@ E_define_span { tid; name; time_us; id = span; fun_name; data }
|
||||||
|
|
||||||
let exit_span span : unit =
|
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
||||||
let time_us = now_us () in
|
let time_us = time_ns *. 1e-3 in
|
||||||
B_queue.push events (E_exit_span { id = span; time_us })
|
B_queue.push self.events @@ E_exit_span { id = span; time_us }
|
||||||
|
|
||||||
(* re-raise exception with its backtrace *)
|
let on_add_data (self : st) ~data span =
|
||||||
external reraise : exn -> 'a = "%reraise"
|
if data <> [] then
|
||||||
|
B_queue.push self.events @@ E_add_data { id = span; data }
|
||||||
|
|
||||||
let with_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name f =
|
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
||||||
let span = enter_span_ ~fun_name ~data name in
|
let time_us = time_ns *. 1e-3 in
|
||||||
try
|
B_queue.push self.events @@ E_message { tid; time_us; msg; data }
|
||||||
let x = f span in
|
|
||||||
exit_span span;
|
|
||||||
x
|
|
||||||
with exn ->
|
|
||||||
exit_span span;
|
|
||||||
reraise exn
|
|
||||||
|
|
||||||
let add_data_to_span span data =
|
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit =
|
||||||
if data <> [] then B_queue.push events (E_add_data { id = span; data })
|
let time_us = time_ns *. 1e-3 in
|
||||||
|
B_queue.push self.events @@ E_counter { name; n = f; time_us; tid }
|
||||||
|
|
||||||
let enter_manual_span ~(parent : explicit_span option) ~flavor
|
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||||
~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name :
|
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span
|
||||||
explicit_span =
|
: unit =
|
||||||
(* get the id, or make a new one *)
|
let time_us = time_ns *. 1e-3 in
|
||||||
let id =
|
B_queue.push self.events
|
||||||
match parent with
|
@@ E_enter_manual_span
|
||||||
| Some m -> Meta_map.find_exn key_async_id m.meta
|
{ id = trace_id; time_us; tid; data; name; fun_name; flavor }
|
||||||
| None -> A.fetch_and_add span_id_gen_ 1
|
|
||||||
in
|
|
||||||
let time_us = now_us () in
|
|
||||||
B_queue.push events
|
|
||||||
(E_enter_manual_span
|
|
||||||
{ id; time_us; tid = get_tid_ (); data; name; fun_name; flavor });
|
|
||||||
{
|
|
||||||
span = 0L;
|
|
||||||
meta =
|
|
||||||
Meta_map.(
|
|
||||||
empty |> add key_async_id id |> add key_async_data (name, flavor));
|
|
||||||
}
|
|
||||||
|
|
||||||
let exit_manual_span (es : explicit_span) : unit =
|
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
|
||||||
let id = Meta_map.find_exn key_async_id es.meta in
|
~trace_id (_ : span) : unit =
|
||||||
let name, flavor = Meta_map.find_exn key_async_data es.meta in
|
let time_us = time_ns *. 1e-3 in
|
||||||
let data =
|
B_queue.push self.events
|
||||||
match Meta_map.find key_data es.meta with
|
@@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor }
|
||||||
| None -> []
|
|
||||||
| Some r -> !r
|
|
||||||
in
|
|
||||||
let time_us = now_us () in
|
|
||||||
let tid = get_tid_ () in
|
|
||||||
B_queue.push events
|
|
||||||
(E_exit_manual_span { tid; id; name; time_us; data; flavor })
|
|
||||||
|
|
||||||
let add_data_to_manual_span (es : explicit_span) data =
|
|
||||||
if data <> [] then (
|
|
||||||
let data_ref, add =
|
|
||||||
match Meta_map.find key_data es.meta with
|
|
||||||
| Some r -> r, false
|
|
||||||
| None -> ref [], true
|
|
||||||
in
|
|
||||||
let new_data = List.rev_append data !data_ref in
|
|
||||||
data_ref := new_data;
|
|
||||||
if add then es.meta <- Meta_map.add key_data data_ref es.meta
|
|
||||||
)
|
|
||||||
|
|
||||||
let message ?span:_ ~data msg : unit =
|
|
||||||
let time_us = now_us () in
|
|
||||||
let tid = get_tid_ () in
|
|
||||||
B_queue.push events (E_message { tid; time_us; msg; data })
|
|
||||||
|
|
||||||
let counter_float ~data:_ name f =
|
|
||||||
let time_us = now_us () in
|
|
||||||
let tid = get_tid_ () in
|
|
||||||
B_queue.push events (E_counter { name; n = f; time_us; tid })
|
|
||||||
|
|
||||||
let counter_int ~data name i = counter_float ~data name (float_of_int i)
|
|
||||||
let name_process name : unit = B_queue.push events (E_name_process { name })
|
|
||||||
|
|
||||||
let name_thread name : unit =
|
|
||||||
let tid = get_tid_ () in
|
|
||||||
B_queue.push events (E_name_thread { tid; name })
|
|
||||||
end in
|
end in
|
||||||
(module M)
|
let events = B_queue.create () in
|
||||||
|
let t_write =
|
||||||
|
Thread.create
|
||||||
|
(fun () -> Fun.protect ~finally @@ fun () -> bg_thread ~mode ~out events)
|
||||||
|
()
|
||||||
|
in
|
||||||
|
|
||||||
|
(* ticker thread, regularly sends a message to the writer thread.
|
||||||
|
no need to join it. *)
|
||||||
|
let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () in
|
||||||
|
let st : Internal_st.t = { active = A.make true; events; t_write } in
|
||||||
|
Sub.Subscriber.Sub { st; callbacks = (module M) }
|
||||||
|
|
||||||
|
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
||||||
|
: collector =
|
||||||
|
let sub = subscriber_ ~finally ~mode ~out () in
|
||||||
|
Sub.collector sub
|
||||||
|
|
||||||
|
let[@inline] subscriber ~out () : Sub.t =
|
||||||
|
subscriber_ ~finally:ignore ~mode:`Single ~out ()
|
||||||
|
|
||||||
let[@inline] collector ~out () : collector =
|
let[@inline] collector ~out () : collector =
|
||||||
collector_ ~finally:ignore ~mode:`Single ~out ()
|
collector_ ~finally:ignore ~mode:`Single ~out ()
|
||||||
|
|
@ -462,8 +399,13 @@ let with_setup ?out () f =
|
||||||
setup ?out ();
|
setup ?out ();
|
||||||
Fun.protect ~finally:Trace_core.shutdown f
|
Fun.protect ~finally:Trace_core.shutdown f
|
||||||
|
|
||||||
module Internal_ = struct
|
module Private_ = struct
|
||||||
let mock_all_ () = Mock_.enabled := true
|
let mock_all_ () =
|
||||||
|
Mock_.enabled := true;
|
||||||
|
Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns;
|
||||||
|
Sub.Private_.get_tid_ := Some Mock_.get_tid_;
|
||||||
|
()
|
||||||
|
|
||||||
let on_tracing_error = on_tracing_error
|
let on_tracing_error = on_tracing_error
|
||||||
|
|
||||||
let collector_jsonl ~finally ~out () : collector =
|
let collector_jsonl ~finally ~out () : collector =
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,3 @@
|
||||||
val collector :
|
|
||||||
out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector
|
|
||||||
(** Make a collector that writes into the given output.
|
|
||||||
See {!setup} for more details. *)
|
|
||||||
|
|
||||||
type output =
|
type output =
|
||||||
[ `Stdout
|
[ `Stdout
|
||||||
| `Stderr
|
| `Stderr
|
||||||
|
|
@ -16,6 +11,14 @@ type output =
|
||||||
named "foo"
|
named "foo"
|
||||||
*)
|
*)
|
||||||
|
|
||||||
|
val subscriber : out:[< output ] -> unit -> Trace_subscriber.t
|
||||||
|
(** A subscriber emitting TEF traces into [out].
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
val collector : out:[< output ] -> unit -> Trace_core.collector
|
||||||
|
(** Make a collector that writes into the given output.
|
||||||
|
See {!setup} for more details. *)
|
||||||
|
|
||||||
val setup : ?out:[ output | `Env ] -> unit -> unit
|
val setup : ?out:[ output | `Env ] -> unit -> unit
|
||||||
(** [setup ()] installs the collector depending on [out].
|
(** [setup ()] installs the collector depending on [out].
|
||||||
|
|
||||||
|
|
@ -39,7 +42,7 @@ val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
||||||
module Internal_ : sig
|
module Private_ : sig
|
||||||
val mock_all_ : unit -> unit
|
val mock_all_ : unit -> unit
|
||||||
(** use fake, deterministic timestamps, TID, PID *)
|
(** use fake, deterministic timestamps, TID, PID *)
|
||||||
|
|
||||||
|
|
|
||||||
34
trace-subscriber.opam
Normal file
34
trace-subscriber.opam
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
# This file is generated by dune, edit dune-project instead
|
||||||
|
opam-version: "2.0"
|
||||||
|
version: "0.7"
|
||||||
|
synopsis: "Generic subscriber system for `trace`"
|
||||||
|
maintainer: ["Simon Cruanes"]
|
||||||
|
authors: ["Simon Cruanes"]
|
||||||
|
license: "MIT"
|
||||||
|
tags: ["trace" "subscriber"]
|
||||||
|
homepage: "https://github.com/c-cube/ocaml-trace"
|
||||||
|
bug-reports: "https://github.com/c-cube/ocaml-trace/issues"
|
||||||
|
depends: [
|
||||||
|
"dune" {>= "2.9"}
|
||||||
|
"ocaml" {>= "4.08"}
|
||||||
|
"trace" {= version}
|
||||||
|
"mtime" {>= "2.0"}
|
||||||
|
"odoc" {with-doc}
|
||||||
|
]
|
||||||
|
build: [
|
||||||
|
["dune" "subst"] {dev}
|
||||||
|
[
|
||||||
|
"dune"
|
||||||
|
"build"
|
||||||
|
"-p"
|
||||||
|
name
|
||||||
|
"-j"
|
||||||
|
jobs
|
||||||
|
"--promote-install-files=false"
|
||||||
|
"@install"
|
||||||
|
"@runtest" {with-test}
|
||||||
|
"@doc" {with-doc}
|
||||||
|
]
|
||||||
|
["dune" "install" "-p" name "--create-install-files" name]
|
||||||
|
]
|
||||||
|
dev-repo: "git+https://github.com/c-cube/ocaml-trace.git"
|
||||||
|
|
@ -12,6 +12,7 @@ bug-reports: "https://github.com/c-cube/ocaml-trace/issues"
|
||||||
depends: [
|
depends: [
|
||||||
"ocaml" {>= "4.08"}
|
"ocaml" {>= "4.08"}
|
||||||
"trace" {= version}
|
"trace" {= version}
|
||||||
|
"trace-subscriber" {= version}
|
||||||
"mtime" {>= "2.0"}
|
"mtime" {>= "2.0"}
|
||||||
"base-unix"
|
"base-unix"
|
||||||
"dune" {>= "2.9"}
|
"dune" {>= "2.9"}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue