mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
commit
11d313df18
27 changed files with 2589 additions and 2091 deletions
2
.github/workflows/main.yml
vendored
2
.github/workflows/main.yml
vendored
|
|
@ -43,6 +43,6 @@ jobs:
|
|||
- run: opam exec -- dune runtest -p trace-tef,trace-fuchsia
|
||||
|
||||
# with depopts
|
||||
- run: opam install hmap
|
||||
- run: opam install hmap mtime
|
||||
- run: opam exec -- dune build '@install' -p trace,trace-tef,trace-fuchsia
|
||||
|
||||
|
|
|
|||
2
Makefile
2
Makefile
|
|
@ -8,6 +8,8 @@ clean:
|
|||
|
||||
test:
|
||||
@dune runtest $(DUNE_OPTS)
|
||||
test-autopromote:
|
||||
@dune runtest $(DUNE_OPTS) --auto-promote
|
||||
|
||||
doc:
|
||||
@dune build $(DUNE_OPTS) @doc
|
||||
|
|
|
|||
73
dune-project
73
dune-project
|
|
@ -1,26 +1,34 @@
|
|||
(lang dune 2.9)
|
||||
|
||||
(name trace)
|
||||
|
||||
(generate_opam_files true)
|
||||
|
||||
(version 0.7)
|
||||
|
||||
(source
|
||||
(github c-cube/ocaml-trace))
|
||||
|
||||
(authors "Simon Cruanes")
|
||||
|
||||
(maintainers "Simon Cruanes")
|
||||
|
||||
(license MIT)
|
||||
|
||||
;(documentation https://url/to/documentation)
|
||||
|
||||
(package
|
||||
(name trace)
|
||||
(synopsis "A stub for tracing/observability, agnostic in how data is collected")
|
||||
(synopsis
|
||||
"A stub for tracing/observability, agnostic in how data is collected")
|
||||
(depends
|
||||
(ocaml (>= 4.08))
|
||||
dune)
|
||||
(ocaml
|
||||
(>= 4.08))
|
||||
dune)
|
||||
(depopts
|
||||
hmap
|
||||
(mtime (>= 2.0)))
|
||||
hmap
|
||||
(mtime
|
||||
(>= 2.0)))
|
||||
(tags
|
||||
(trace tracing observability profiling)))
|
||||
|
||||
|
|
@ -28,37 +36,54 @@
|
|||
(name ppx_trace)
|
||||
(synopsis "A ppx-based preprocessor for trace")
|
||||
(depends
|
||||
(ocaml (>= 4.12)) ; we use __FUNCTION__
|
||||
(ppxlib (>= 0.28))
|
||||
(trace (= :version))
|
||||
(trace-tef (and (= :version) :with-test))
|
||||
dune)
|
||||
(ocaml
|
||||
(>= 4.12)) ; we use __FUNCTION__
|
||||
(ppxlib
|
||||
(>= 0.28))
|
||||
(trace
|
||||
(= :version))
|
||||
(trace-tef
|
||||
(and
|
||||
(= :version)
|
||||
:with-test))
|
||||
dune)
|
||||
(depopts
|
||||
(mtime (>= 2.0)))
|
||||
(tags
|
||||
(trace ppx)))
|
||||
|
||||
(package
|
||||
(name trace-tef)
|
||||
(synopsis "A simple backend for trace, emitting Catapult/TEF JSON into a file")
|
||||
(synopsis
|
||||
"A simple backend for trace, emitting Catapult/TEF JSON into a file")
|
||||
(depends
|
||||
(ocaml (>= 4.08))
|
||||
(trace (= :version))
|
||||
(mtime (>= 2.0))
|
||||
base-unix
|
||||
dune)
|
||||
(ocaml
|
||||
(>= 4.08))
|
||||
(trace
|
||||
(= :version))
|
||||
(mtime
|
||||
(>= 2.0))
|
||||
base-unix
|
||||
dune)
|
||||
(tags
|
||||
(trace tracing catapult TEF chrome-format)))
|
||||
|
||||
(package
|
||||
(name trace-fuchsia)
|
||||
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
|
||||
(synopsis
|
||||
"A high-performance backend for trace, emitting a Fuchsia trace into a file")
|
||||
(depends
|
||||
(ocaml (>= 4.08))
|
||||
(trace (= :version))
|
||||
(mtime (>= 2.0))
|
||||
(thread-local-storage (>= 0.2))
|
||||
base-bigarray
|
||||
base-unix
|
||||
dune)
|
||||
(ocaml
|
||||
(>= 4.08))
|
||||
(trace
|
||||
(= :version))
|
||||
(mtime
|
||||
(>= 2.0))
|
||||
(thread-local-storage
|
||||
(>= 0.2))
|
||||
base-bigarray
|
||||
base-unix
|
||||
dune)
|
||||
(tags
|
||||
(trace tracing fuchsia)))
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,9 @@ depends: [
|
|||
"dune" {>= "2.9"}
|
||||
"odoc" {with-doc}
|
||||
]
|
||||
depopts: [
|
||||
"mtime" {>= "2.0"}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
[
|
||||
|
|
|
|||
129
src/subscriber/callbacks.ml
Normal file
129
src/subscriber/callbacks.ml
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
open Trace_core
|
||||
open Types
|
||||
|
||||
(** First class module signature for callbacks *)
|
||||
module type S = sig
|
||||
type st
|
||||
(** Type of the state passed to every callback. *)
|
||||
|
||||
val on_init : st -> time_ns:float -> unit
|
||||
(** Called when the subscriber is initialized in a collector *)
|
||||
|
||||
val on_shutdown : st -> time_ns:float -> unit
|
||||
(** Called when the collector is shutdown *)
|
||||
|
||||
val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit
|
||||
(** Current thread is being named *)
|
||||
|
||||
val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit
|
||||
(** Current process is being named *)
|
||||
|
||||
val on_enter_span :
|
||||
st ->
|
||||
__FUNCTION__:string option ->
|
||||
__FILE__:string ->
|
||||
__LINE__:int ->
|
||||
time_ns:float ->
|
||||
tid:int ->
|
||||
data:(string * user_data) list ->
|
||||
name:string ->
|
||||
span ->
|
||||
unit
|
||||
(** Enter a regular (sync) span *)
|
||||
|
||||
val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit
|
||||
(** Exit a span. This and [on_enter_span] must follow strict stack discipline *)
|
||||
|
||||
val on_add_data : st -> data:(string * user_data) list -> span -> unit
|
||||
(** Add data to a regular span (which must be active) *)
|
||||
|
||||
val on_message :
|
||||
st ->
|
||||
time_ns:float ->
|
||||
tid:int ->
|
||||
span:span option ->
|
||||
data:(string * user_data) list ->
|
||||
string ->
|
||||
unit
|
||||
(** Emit a log message *)
|
||||
|
||||
val on_counter :
|
||||
st ->
|
||||
time_ns:float ->
|
||||
tid:int ->
|
||||
data:(string * user_data) list ->
|
||||
name:string ->
|
||||
float ->
|
||||
unit
|
||||
(** Emit the current value of a counter *)
|
||||
|
||||
val on_enter_manual_span :
|
||||
st ->
|
||||
__FUNCTION__:string option ->
|
||||
__FILE__:string ->
|
||||
__LINE__:int ->
|
||||
time_ns:float ->
|
||||
tid:int ->
|
||||
parent:span option ->
|
||||
data:(string * user_data) list ->
|
||||
name:string ->
|
||||
flavor:flavor option ->
|
||||
trace_id:int ->
|
||||
span ->
|
||||
unit
|
||||
(** Enter a manual (possibly async) span *)
|
||||
|
||||
val on_exit_manual_span :
|
||||
st ->
|
||||
time_ns:float ->
|
||||
tid:int ->
|
||||
name:string ->
|
||||
data:(string * user_data) list ->
|
||||
flavor:flavor option ->
|
||||
trace_id:int ->
|
||||
span ->
|
||||
unit
|
||||
(** Exit a manual span *)
|
||||
end
|
||||
|
||||
type 'st t = (module S with type st = 'st)
|
||||
(** Callbacks for a subscriber. There is one callback per event
|
||||
in {!Trace}. The type ['st] is the state that is passed to
|
||||
every single callback. *)
|
||||
|
||||
(** Dummy callbacks.
|
||||
It can be useful to reuse some of these functions in a
|
||||
real subscriber that doesn't want to handle {b all}
|
||||
events, but only some of them. *)
|
||||
module Dummy = struct
|
||||
let on_init _ ~time_ns:_ = ()
|
||||
let on_shutdown _ ~time_ns:_ = ()
|
||||
let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = ()
|
||||
let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = ()
|
||||
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = ()
|
||||
let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = ()
|
||||
|
||||
let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ ~tid:_
|
||||
~data:_ ~name:_ _sp =
|
||||
()
|
||||
|
||||
let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
|
||||
let on_add_data _ ~data:_ _sp = ()
|
||||
|
||||
let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_
|
||||
~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp =
|
||||
()
|
||||
|
||||
let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_
|
||||
~trace_id:_ _ =
|
||||
()
|
||||
end
|
||||
|
||||
(** Dummy callbacks, do nothing. *)
|
||||
let dummy (type st) () : st t =
|
||||
let module M = struct
|
||||
type nonrec st = st
|
||||
|
||||
include Dummy
|
||||
end in
|
||||
(module M)
|
||||
13
src/subscriber/dune
Normal file
13
src/subscriber/dune
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
|
||||
(library
|
||||
(name trace_subscriber)
|
||||
(public_name trace.subscriber)
|
||||
(libraries (re_export trace.core)
|
||||
(select thread_.ml from
|
||||
(threads -> thread_.real.ml)
|
||||
( -> thread_.dummy.ml))
|
||||
(select time_.ml from
|
||||
(mtime mtime.clock.os -> time_.mtime.ml)
|
||||
(mtime mtime.clock.jsoo -> time_.mtime.ml)
|
||||
( -> time_.dummy.ml))))
|
||||
|
||||
104
src/subscriber/subscriber.ml
Normal file
104
src/subscriber/subscriber.ml
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
(** A trace subscriber. It pairs a set of callbacks
|
||||
with the state they need (which can contain a file handle,
|
||||
a socket, config, etc.).
|
||||
|
||||
The design goal for this is that it should be possible to avoid allocations
|
||||
when the trace collector calls the callbacks. *)
|
||||
type t =
|
||||
| Sub : {
|
||||
st: 'st;
|
||||
callbacks: 'st Callbacks.t;
|
||||
}
|
||||
-> t
|
||||
|
||||
(** Dummy subscriber that ignores every call. *)
|
||||
let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }
|
||||
|
||||
open struct
|
||||
module Tee_cb : Callbacks.S with type st = t * t = struct
|
||||
type nonrec st = t * t
|
||||
|
||||
let on_init
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
|
||||
CB1.on_init s1 ~time_ns;
|
||||
CB2.on_init s2 ~time_ns
|
||||
|
||||
let on_shutdown
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
|
||||
CB1.on_shutdown s1 ~time_ns;
|
||||
CB2.on_shutdown s2 ~time_ns
|
||||
|
||||
let on_name_thread
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
|
||||
CB1.on_name_thread s1 ~time_ns ~tid ~name;
|
||||
CB2.on_name_thread s2 ~time_ns ~tid ~name
|
||||
|
||||
let on_name_process
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
|
||||
CB1.on_name_process s1 ~time_ns ~tid ~name;
|
||||
CB2.on_name_process s2 ~time_ns ~tid ~name
|
||||
|
||||
let on_enter_span
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
|
||||
~__LINE__ ~time_ns ~tid ~data ~name span =
|
||||
CB1.on_enter_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
|
||||
~name span;
|
||||
CB2.on_enter_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
|
||||
~name span
|
||||
|
||||
let on_exit_span
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid span =
|
||||
CB1.on_exit_span s1 ~time_ns ~tid span;
|
||||
CB2.on_exit_span s2 ~time_ns ~tid span
|
||||
|
||||
let on_add_data
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~data span =
|
||||
CB1.on_add_data s1 ~data span;
|
||||
CB2.on_add_data s2 ~data span
|
||||
|
||||
let on_message
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~span ~data
|
||||
msg =
|
||||
CB1.on_message s1 ~time_ns ~tid ~span ~data msg;
|
||||
CB2.on_message s2 ~time_ns ~tid ~span ~data msg
|
||||
|
||||
let on_counter
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~data ~name
|
||||
n =
|
||||
CB1.on_counter s1 ~time_ns ~tid ~data ~name n;
|
||||
CB2.on_counter s2 ~time_ns ~tid ~data ~name n
|
||||
|
||||
let on_enter_manual_span
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
|
||||
~__LINE__ ~time_ns ~tid ~parent ~data ~name ~flavor ~trace_id span =
|
||||
CB1.on_enter_manual_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
|
||||
~tid ~parent ~data ~name ~flavor ~trace_id span;
|
||||
CB2.on_enter_manual_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
|
||||
~tid ~parent ~data ~name ~flavor ~trace_id span
|
||||
|
||||
let on_exit_manual_span
|
||||
( Sub { st = s1; callbacks = (module CB1) },
|
||||
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name ~data
|
||||
~flavor ~trace_id span =
|
||||
CB1.on_exit_manual_span s1 ~time_ns ~tid ~name ~data ~flavor ~trace_id
|
||||
span;
|
||||
CB2.on_exit_manual_span s2 ~time_ns ~tid ~name ~data ~flavor ~trace_id
|
||||
span
|
||||
end
|
||||
end
|
||||
|
||||
(** [tee s1 s2] is a subscriber that forwards every
|
||||
call to [s1] and [s2] both. *)
|
||||
let tee (s1 : t) (s2 : t) : t =
|
||||
let st = s1, s2 in
|
||||
Sub { st; callbacks = (module Tee_cb) }
|
||||
1
src/subscriber/thread_.dummy.ml
Normal file
1
src/subscriber/thread_.dummy.ml
Normal file
|
|
@ -0,0 +1 @@
|
|||
let[@inline] get_tid () = 0
|
||||
2
src/subscriber/thread_.mli
Normal file
2
src/subscriber/thread_.mli
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
val get_tid : unit -> int
|
||||
(** Get current thread ID *)
|
||||
1
src/subscriber/thread_.real.ml
Normal file
1
src/subscriber/thread_.real.ml
Normal file
|
|
@ -0,0 +1 @@
|
|||
let[@inline] get_tid () = Thread.id @@ Thread.self ()
|
||||
1
src/subscriber/time_.dummy.ml
Normal file
1
src/subscriber/time_.dummy.ml
Normal file
|
|
@ -0,0 +1 @@
|
|||
let[@inline] get_time_ns () : float = 0.
|
||||
1
src/subscriber/time_.mli
Normal file
1
src/subscriber/time_.mli
Normal file
|
|
@ -0,0 +1 @@
|
|||
val get_time_ns : unit -> float
|
||||
3
src/subscriber/time_.mtime.ml
Normal file
3
src/subscriber/time_.mtime.ml
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
let get_time_ns () : float =
|
||||
let t = Mtime_clock.now () in
|
||||
Int64.to_float (Mtime.to_uint64_ns t)
|
||||
187
src/subscriber/trace_subscriber.ml
Normal file
187
src/subscriber/trace_subscriber.ml
Normal file
|
|
@ -0,0 +1,187 @@
|
|||
open Trace_core
|
||||
module Callbacks = Callbacks
|
||||
module Subscriber = Subscriber
|
||||
include Types
|
||||
|
||||
type t = Subscriber.t
|
||||
|
||||
module Private_ = struct
|
||||
let get_now_ns_ = ref None
|
||||
let get_tid_ = ref None
|
||||
|
||||
(** Now, in nanoseconds *)
|
||||
let[@inline] now_ns () : float =
|
||||
match !get_now_ns_ with
|
||||
| Some f -> f ()
|
||||
| None ->
|
||||
let t = Mtime_clock.now () in
|
||||
Int64.to_float (Mtime.to_uint64_ns t)
|
||||
|
||||
let[@inline] tid_ () : int =
|
||||
match !get_tid_ with
|
||||
| Some f -> f ()
|
||||
| None -> Thread_.get_tid ()
|
||||
end
|
||||
|
||||
open struct
|
||||
module A = Trace_core.Internal_.Atomic_
|
||||
|
||||
type manual_span_info = {
|
||||
name: string;
|
||||
flavor: flavor option;
|
||||
mutable data: (string * user_data) list;
|
||||
}
|
||||
|
||||
(** Key used to carry some information between begin and end of
|
||||
manual spans, by way of the meta map *)
|
||||
let key_manual_info : manual_span_info Meta_map.key = Meta_map.Key.create ()
|
||||
|
||||
(** key used to carry a unique "id" for all spans in an async context *)
|
||||
let key_async_trace_id : int Meta_map.key = Meta_map.Key.create ()
|
||||
end
|
||||
|
||||
let[@inline] conv_flavor = function
|
||||
| `Async -> Async
|
||||
| `Sync -> Sync
|
||||
|
||||
let[@inline] conv_flavor_opt = function
|
||||
| None -> None
|
||||
| Some f -> Some (conv_flavor f)
|
||||
|
||||
let[@inline] conv_user_data = function
|
||||
| `Int i -> U_int i
|
||||
| `Bool b -> U_bool b
|
||||
| `Float f -> U_float f
|
||||
| `String s -> U_string s
|
||||
| `None -> U_none
|
||||
|
||||
let rec conv_data = function
|
||||
| [] -> []
|
||||
| [ (k, v) ] -> [ k, conv_user_data v ]
|
||||
| (k, v) :: tl -> (k, conv_user_data v) :: conv_data tl
|
||||
|
||||
(** A collector that calls the callbacks of subscriber *)
|
||||
let collector (Sub { st; callbacks = (module CB) } : Subscriber.t) : collector =
|
||||
let open Private_ in
|
||||
let module M = struct
|
||||
let trace_id_gen_ = A.make 0
|
||||
|
||||
(** generator for span ids *)
|
||||
let new_span_ : unit -> int =
|
||||
let span_id_gen_ = A.make 0 in
|
||||
fun [@inline] () -> A.fetch_and_add span_id_gen_ 1
|
||||
|
||||
let enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : span =
|
||||
let span = Int64.of_int (new_span_ ()) in
|
||||
let tid = tid_ () in
|
||||
let time_ns = now_ns () in
|
||||
let data = conv_data data in
|
||||
CB.on_enter_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
|
||||
~name span;
|
||||
span
|
||||
|
||||
let exit_span span : unit =
|
||||
let time_ns = now_ns () in
|
||||
let tid = tid_ () in
|
||||
CB.on_exit_span st ~time_ns ~tid span
|
||||
|
||||
let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name f =
|
||||
let span = enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name in
|
||||
try
|
||||
let x = f span in
|
||||
exit_span span;
|
||||
x
|
||||
with exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
exit_span span;
|
||||
Printexc.raise_with_backtrace exn bt
|
||||
|
||||
let add_data_to_span span data =
|
||||
if data <> [] then (
|
||||
let data = conv_data data in
|
||||
CB.on_add_data st ~data span
|
||||
)
|
||||
|
||||
let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__
|
||||
~__FILE__ ~__LINE__ ~data name : explicit_span =
|
||||
let span = Int64.of_int (new_span_ ()) in
|
||||
let tid = tid_ () in
|
||||
let time_ns = now_ns () in
|
||||
let data = conv_data data in
|
||||
let flavor = conv_flavor_opt flavor in
|
||||
|
||||
(* get the common trace id, or make a new one *)
|
||||
let trace_id, parent =
|
||||
match parent with
|
||||
| Some m -> Meta_map.find_exn key_async_trace_id m.meta, Some m.span
|
||||
| None -> A.fetch_and_add trace_id_gen_ 1, None
|
||||
in
|
||||
|
||||
CB.on_enter_manual_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~parent ~data
|
||||
~time_ns ~tid ~name ~flavor ~trace_id span;
|
||||
let meta =
|
||||
Meta_map.empty
|
||||
|> Meta_map.add key_manual_info { name; flavor; data = [] }
|
||||
|> Meta_map.add key_async_trace_id trace_id
|
||||
in
|
||||
{ span; meta }
|
||||
|
||||
let exit_manual_span (es : explicit_span) : unit =
|
||||
let time_ns = now_ns () in
|
||||
let tid = tid_ () in
|
||||
let trace_id =
|
||||
match Meta_map.find key_async_trace_id es.meta with
|
||||
| None -> assert false
|
||||
| Some id -> id
|
||||
in
|
||||
let minfo =
|
||||
match Meta_map.find key_manual_info es.meta with
|
||||
| None -> assert false
|
||||
| Some m -> m
|
||||
in
|
||||
CB.on_exit_manual_span st ~tid ~time_ns ~data:minfo.data ~name:minfo.name
|
||||
~flavor:minfo.flavor ~trace_id es.span
|
||||
|
||||
let add_data_to_manual_span (es : explicit_span) data =
|
||||
if data <> [] then (
|
||||
let data = conv_data data in
|
||||
match Meta_map.find key_manual_info es.meta with
|
||||
| None -> assert false
|
||||
| Some m -> m.data <- List.rev_append data m.data
|
||||
)
|
||||
|
||||
let message ?span ~data msg : unit =
|
||||
let time_ns = now_ns () in
|
||||
let tid = tid_ () in
|
||||
let data = conv_data data in
|
||||
CB.on_message st ~time_ns ~tid ~span ~data msg
|
||||
|
||||
let counter_float ~data name f : unit =
|
||||
let time_ns = now_ns () in
|
||||
let tid = tid_ () in
|
||||
let data = conv_data data in
|
||||
CB.on_counter st ~tid ~time_ns ~data ~name f
|
||||
|
||||
let[@inline] counter_int ~data name i =
|
||||
counter_float ~data name (float_of_int i)
|
||||
|
||||
let name_process name : unit =
|
||||
let tid = tid_ () in
|
||||
let time_ns = now_ns () in
|
||||
CB.on_name_process st ~time_ns ~tid ~name
|
||||
|
||||
let name_thread name : unit =
|
||||
let tid = tid_ () in
|
||||
let time_ns = now_ns () in
|
||||
CB.on_name_thread st ~time_ns ~tid ~name
|
||||
|
||||
let shutdown () =
|
||||
let time_ns = now_ns () in
|
||||
CB.on_shutdown st ~time_ns
|
||||
|
||||
let () =
|
||||
(* init code *)
|
||||
let time_ns = now_ns () in
|
||||
CB.on_init st ~time_ns
|
||||
end in
|
||||
(module M)
|
||||
33
src/subscriber/trace_subscriber.mli
Normal file
33
src/subscriber/trace_subscriber.mli
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
(** Generic subscribers.
|
||||
|
||||
This defines the notion of a {b subscriber},
|
||||
a set of callbacks for every trace event.
|
||||
It also defines a collector that needs to be installed
|
||||
for the subscriber(s) to be called.
|
||||
|
||||
@since NEXT_RELEASE
|
||||
*)
|
||||
|
||||
module Callbacks = Callbacks
|
||||
module Subscriber = Subscriber
|
||||
|
||||
include module type of struct
|
||||
include Types
|
||||
end
|
||||
|
||||
type t = Subscriber.t
|
||||
|
||||
val collector : t -> Trace_core.collector
|
||||
|
||||
(**/**)
|
||||
|
||||
(**/*)
|
||||
module Private_ : sig
|
||||
val get_now_ns_ : (unit -> float) option ref
|
||||
(** The callback used to get the current timestamp *)
|
||||
|
||||
val get_tid_ : (unit -> int) option ref
|
||||
(** The callback used to get the current thread's id *)
|
||||
|
||||
val now_ns : unit -> float
|
||||
end
|
||||
10
src/subscriber/types.ml
Normal file
10
src/subscriber/types.ml
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
type user_data =
|
||||
| U_bool of bool
|
||||
| U_float of float
|
||||
| U_int of int
|
||||
| U_none
|
||||
| U_string of string
|
||||
|
||||
type flavor =
|
||||
| Sync
|
||||
| Async
|
||||
|
|
@ -1,6 +1,11 @@
|
|||
|
||||
(library
|
||||
(name trace_tef_tldrs)
|
||||
(public_name trace-tef.tldrs)
|
||||
(synopsis "Multiprocess tracing using the `tldrs` daemon")
|
||||
(libraries trace.core trace.private.util trace-tef unix threads))
|
||||
(name trace_tef_tldrs)
|
||||
(public_name trace-tef.tldrs)
|
||||
(synopsis "Multiprocess tracing using the `tldrs` daemon")
|
||||
(libraries
|
||||
trace.core
|
||||
trace.private.util
|
||||
trace.subscriber
|
||||
trace-tef
|
||||
unix
|
||||
threads))
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ let find_role ~out () : role =
|
|||
| Some path -> Some (write_to_file path)
|
||||
| None -> None))
|
||||
|
||||
let collector_ (client : as_client) : collector =
|
||||
let subscriber_ (client : as_client) : Trace_subscriber.t =
|
||||
(* connect to unix socket *)
|
||||
let sock = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
|
||||
(try Unix.connect sock (Unix.ADDR_UNIX client.socket)
|
||||
|
|
@ -105,24 +105,31 @@ let collector_ (client : as_client) : collector =
|
|||
in
|
||||
|
||||
fpf out "OPEN %s\n%!" client.trace_id;
|
||||
Trace_tef.Internal_.collector_jsonl ~finally ~out:(`Output out) ()
|
||||
Trace_tef.Private_.subscriber_jsonl ~finally ~out:(`Output out) ()
|
||||
|
||||
let subscriber ~out () =
|
||||
let role = find_role ~out () in
|
||||
match role with
|
||||
| None -> assert false
|
||||
| Some c -> subscriber_ c
|
||||
|
||||
let collector ~out () : collector =
|
||||
let role = find_role ~out () in
|
||||
match role with
|
||||
| None -> assert false
|
||||
| Some c -> collector_ c
|
||||
| Some c -> subscriber_ c |> Trace_subscriber.collector
|
||||
|
||||
let setup ?(out = `Env) () =
|
||||
let role = find_role ~out () in
|
||||
match role with
|
||||
| None -> ()
|
||||
| Some c -> Trace_core.setup_collector @@ collector_ c
|
||||
| Some c ->
|
||||
Trace_core.setup_collector @@ Trace_subscriber.collector @@ subscriber_ c
|
||||
|
||||
let with_setup ?out () f =
|
||||
setup ?out ();
|
||||
Fun.protect ~finally:Trace_core.shutdown f
|
||||
|
||||
module Internal_ = struct
|
||||
include Trace_tef.Internal_
|
||||
module Private_ = struct
|
||||
include Trace_tef.Private_
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,7 +1,13 @@
|
|||
(** Emit traces by talking to the {{: https://github.com/imandra-ai/tldrs} tldrs} daemon *)
|
||||
|
||||
val collector : out:[ `File of string ] -> unit -> Trace_core.collector
|
||||
(** Make a collector that writes into the given output.
|
||||
See {!setup} for more details. *)
|
||||
|
||||
val subscriber : out:[ `File of string ] -> unit -> Trace_subscriber.t
|
||||
(** Make a subscriber that writes into the given output.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
type output = [ `File of string ]
|
||||
(** Output for tracing.
|
||||
- [`File "foo"] will enable tracing and print events into file
|
||||
|
|
@ -30,7 +36,7 @@ val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
|
|||
|
||||
(**/**)
|
||||
|
||||
module Internal_ : sig
|
||||
module Private_ : sig
|
||||
val mock_all_ : unit -> unit
|
||||
(** use fake, deterministic timestamps, TID, PID *)
|
||||
|
||||
|
|
|
|||
17
src/tef/dune
17
src/tef/dune
|
|
@ -1,6 +1,13 @@
|
|||
|
||||
(library
|
||||
(name trace_tef)
|
||||
(public_name trace-tef)
|
||||
(synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process")
|
||||
(libraries trace.core trace.private.util mtime mtime.clock.os unix threads))
|
||||
(name trace_tef)
|
||||
(public_name trace-tef)
|
||||
(synopsis
|
||||
"Simple and lightweight tracing using TEF/Catapult format, in-process")
|
||||
(libraries
|
||||
trace.core
|
||||
trace.private.util
|
||||
trace.subscriber
|
||||
mtime
|
||||
mtime.clock.os
|
||||
unix
|
||||
threads))
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
open Trace_core
|
||||
module Sub = Trace_subscriber
|
||||
|
||||
(** An event, specialized for TEF *)
|
||||
type t =
|
||||
| E_tick
|
||||
| E_message of {
|
||||
tid: int;
|
||||
msg: string;
|
||||
time_us: float;
|
||||
data: (string * user_data) list;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_define_span of {
|
||||
tid: int;
|
||||
|
|
@ -14,7 +16,7 @@ type t =
|
|||
time_us: float;
|
||||
id: span;
|
||||
fun_name: string option;
|
||||
data: (string * user_data) list;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_exit_span of {
|
||||
id: span;
|
||||
|
|
@ -22,23 +24,23 @@ type t =
|
|||
}
|
||||
| E_add_data of {
|
||||
id: span;
|
||||
data: (string * user_data) list;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_enter_manual_span of {
|
||||
tid: int;
|
||||
name: string;
|
||||
time_us: float;
|
||||
id: int;
|
||||
flavor: [ `Sync | `Async ] option;
|
||||
flavor: Sub.flavor option;
|
||||
fun_name: string option;
|
||||
data: (string * user_data) list;
|
||||
data: (string * Sub.user_data) list;
|
||||
}
|
||||
| E_exit_manual_span of {
|
||||
tid: int;
|
||||
name: string;
|
||||
time_us: float;
|
||||
flavor: [ `Sync | `Async ] option;
|
||||
data: (string * user_data) list;
|
||||
flavor: Sub.flavor option;
|
||||
data: (string * Sub.user_data) list;
|
||||
id: int;
|
||||
}
|
||||
| E_counter of {
|
||||
|
|
|
|||
|
|
@ -1,31 +1,24 @@
|
|||
open Trace_core
|
||||
open Trace_private_util
|
||||
open Event
|
||||
module Sub = Trace_subscriber
|
||||
module A = Trace_core.Internal_.Atomic_
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
|
||||
|
||||
module Mock_ = struct
|
||||
let enabled = ref false
|
||||
let now = ref 0
|
||||
|
||||
let[@inline never] now_us () : float =
|
||||
(* used to mock timing *)
|
||||
let get_now_ns () : float =
|
||||
let x = !now in
|
||||
incr now;
|
||||
float_of_int x
|
||||
float_of_int x *. 1000.
|
||||
|
||||
let get_tid_ () : int = 3
|
||||
end
|
||||
|
||||
(** Now, in microseconds *)
|
||||
let[@inline] now_us () : float =
|
||||
if !Mock_.enabled then
|
||||
Mock_.now_us ()
|
||||
else (
|
||||
let t = Mtime_clock.now () in
|
||||
Int64.to_float (Mtime.to_uint64_ns t) /. 1e3
|
||||
)
|
||||
|
||||
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
|
||||
|
||||
module Span_tbl = Hashtbl.Make (struct
|
||||
include Int64
|
||||
|
||||
|
|
@ -36,18 +29,9 @@ type span_info = {
|
|||
tid: int;
|
||||
name: string;
|
||||
start_us: float;
|
||||
mutable data: (string * user_data) list;
|
||||
mutable data: (string * Sub.user_data) list;
|
||||
}
|
||||
|
||||
(** key used to carry a unique "id" for all spans in an async context *)
|
||||
let key_async_id : int Meta_map.key = Meta_map.Key.create ()
|
||||
|
||||
let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.key =
|
||||
Meta_map.Key.create ()
|
||||
|
||||
let key_data : (string * user_data) list ref Meta_map.key =
|
||||
Meta_map.Key.create ()
|
||||
|
||||
(** Writer: knows how to write entries to a file in TEF format *)
|
||||
module Writer = struct
|
||||
type t = {
|
||||
|
|
@ -126,12 +110,12 @@ module Writer = struct
|
|||
String.iter encode_char s;
|
||||
char buf '"'
|
||||
|
||||
let pp_user_data_ (out : Buffer.t) : [< user_data ] -> unit = function
|
||||
| `None -> raw_string out "null"
|
||||
| `Int i -> Printf.bprintf out "%d" i
|
||||
| `Bool b -> Printf.bprintf out "%b" b
|
||||
| `String s -> str_val out s
|
||||
| `Float f -> Printf.bprintf out "%g" f
|
||||
let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function
|
||||
| U_none -> raw_string out "null"
|
||||
| U_int i -> Printf.bprintf out "%d" i
|
||||
| U_bool b -> Printf.bprintf out "%b" b
|
||||
| U_string s -> str_val out s
|
||||
| U_float f -> Printf.bprintf out "%g" f
|
||||
|
||||
(* emit args, if not empty. [ppv] is used to print values. *)
|
||||
let emit_args_o_ ppv (out : Buffer.t) args : unit =
|
||||
|
|
@ -158,26 +142,28 @@ module Writer = struct
|
|||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_manual_begin ~tid ~name ~id ~ts ~args ~flavor (self : t) : unit =
|
||||
let emit_manual_begin ~tid ~name ~id ~ts ~args ~(flavor : Sub.flavor option)
|
||||
(self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||
self.pid id tid ts str_val name
|
||||
(match flavor with
|
||||
| None | Some `Async -> 'b'
|
||||
| Some `Sync -> 'B')
|
||||
| None | Some Async -> 'b'
|
||||
| Some Sync -> 'B')
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_manual_end ~tid ~name ~id ~ts ~flavor ~args (self : t) : unit =
|
||||
let emit_manual_end ~tid ~name ~id ~ts ~(flavor : Sub.flavor option) ~args
|
||||
(self : t) : unit =
|
||||
emit_sep_and_start_ self;
|
||||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
|
||||
self.pid id tid ts str_val name
|
||||
(match flavor with
|
||||
| None | Some `Async -> 'e'
|
||||
| Some `Sync -> 'E')
|
||||
| None | Some Async -> 'e'
|
||||
| Some Sync -> 'E')
|
||||
(emit_args_o_ pp_user_data_)
|
||||
args;
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
|
@ -197,7 +183,7 @@ module Writer = struct
|
|||
{json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid
|
||||
tid
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ "name", `String name ];
|
||||
[ "name", U_string name ];
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_name_process ~name (self : t) : unit =
|
||||
|
|
@ -205,7 +191,7 @@ module Writer = struct
|
|||
Printf.bprintf self.buf
|
||||
{json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ "name", `String name ];
|
||||
[ "name", U_string name ];
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
|
||||
let emit_counter ~name ~tid ~ts (self : t) f : unit =
|
||||
|
|
@ -214,7 +200,7 @@ module Writer = struct
|
|||
{json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid
|
||||
tid ts
|
||||
(emit_args_o_ pp_user_data_)
|
||||
[ name, `Float f ];
|
||||
[ name, U_float f ];
|
||||
Buffer.output_buffer self.oc self.buf
|
||||
end
|
||||
|
||||
|
|
@ -231,7 +217,7 @@ let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit =
|
|||
let add_fun_name_ fun_name data : _ list =
|
||||
match fun_name with
|
||||
| None -> data
|
||||
| Some f -> ("function", `String f) :: data
|
||||
| Some f -> ("function", Sub.U_string f) :: data
|
||||
in
|
||||
|
||||
(* how to deal with an event *)
|
||||
|
|
@ -280,7 +266,8 @@ let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit =
|
|||
(* write a message about us closing *)
|
||||
Writer.emit_instant_event ~name:"tef-worker.exit"
|
||||
~tid:(Thread.id @@ Thread.self ())
|
||||
~ts:(now_us ()) ~args:[] writer;
|
||||
~ts:(Sub.Private_.now_ns () *. 1e-3)
|
||||
~args:[] writer;
|
||||
|
||||
(* warn if app didn't close all spans *)
|
||||
if Span_tbl.length spans > 0 then
|
||||
|
|
@ -304,138 +291,90 @@ type output =
|
|||
| `File of string
|
||||
]
|
||||
|
||||
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
||||
: collector =
|
||||
let module M = struct
|
||||
let active = A.make true
|
||||
module Internal_st = struct
|
||||
type t = {
|
||||
active: bool A.t;
|
||||
events: Event.t B_queue.t;
|
||||
t_write: Thread.t;
|
||||
}
|
||||
end
|
||||
|
||||
(** generator for span ids *)
|
||||
let span_id_gen_ = A.make 0
|
||||
let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t =
|
||||
let module M : Sub.Callbacks.S with type st = Internal_st.t = struct
|
||||
type st = Internal_st.t
|
||||
|
||||
(* queue of messages to write *)
|
||||
let events : Event.t B_queue.t = B_queue.create ()
|
||||
let on_init _ ~time_ns:_ = ()
|
||||
|
||||
(** writer thread. It receives events and writes them to [oc]. *)
|
||||
let t_write : Thread.t =
|
||||
Thread.create
|
||||
(fun () ->
|
||||
let@ () = Fun.protect ~finally in
|
||||
bg_thread ~mode ~out events)
|
||||
()
|
||||
|
||||
(** ticker thread, regularly sends a message to the writer thread.
|
||||
no need to join it. *)
|
||||
let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) ()
|
||||
|
||||
let shutdown () =
|
||||
if A.exchange active false then (
|
||||
B_queue.close events;
|
||||
let on_shutdown (self : st) ~time_ns:_ =
|
||||
if A.exchange self.active false then (
|
||||
B_queue.close self.events;
|
||||
(* wait for writer thread to be done. The writer thread will exit
|
||||
after processing remaining events because the queue is now closed *)
|
||||
Thread.join t_write
|
||||
Thread.join self.t_write
|
||||
)
|
||||
|
||||
let get_tid_ () : int =
|
||||
if !Mock_.enabled then
|
||||
3
|
||||
else
|
||||
Thread.id (Thread.self ())
|
||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
||||
B_queue.push self.events @@ E_name_process { name }
|
||||
|
||||
let[@inline] enter_span_ ~fun_name ~data name : span =
|
||||
let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in
|
||||
let tid = get_tid_ () in
|
||||
let time_us = now_us () in
|
||||
B_queue.push events
|
||||
(E_define_span { tid; name; time_us; id = span; fun_name; data });
|
||||
span
|
||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
||||
B_queue.push self.events @@ E_name_thread { tid; name }
|
||||
|
||||
let enter_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name :
|
||||
span =
|
||||
enter_span_ ~fun_name ~data name
|
||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
||||
let time_us = time_ns *. 1e-3 in
|
||||
B_queue.push self.events
|
||||
@@ E_define_span { tid; name; time_us; id = span; fun_name; data }
|
||||
|
||||
let exit_span span : unit =
|
||||
let time_us = now_us () in
|
||||
B_queue.push events (E_exit_span { id = span; time_us })
|
||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
||||
let time_us = time_ns *. 1e-3 in
|
||||
B_queue.push self.events @@ E_exit_span { id = span; time_us }
|
||||
|
||||
(* re-raise exception with its backtrace *)
|
||||
external reraise : exn -> 'a = "%reraise"
|
||||
let on_add_data (self : st) ~data span =
|
||||
if data <> [] then
|
||||
B_queue.push self.events @@ E_add_data { id = span; data }
|
||||
|
||||
let with_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name f =
|
||||
let span = enter_span_ ~fun_name ~data name in
|
||||
try
|
||||
let x = f span in
|
||||
exit_span span;
|
||||
x
|
||||
with exn ->
|
||||
exit_span span;
|
||||
reraise exn
|
||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
||||
let time_us = time_ns *. 1e-3 in
|
||||
B_queue.push self.events @@ E_message { tid; time_us; msg; data }
|
||||
|
||||
let add_data_to_span span data =
|
||||
if data <> [] then B_queue.push events (E_add_data { id = span; data })
|
||||
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit =
|
||||
let time_us = time_ns *. 1e-3 in
|
||||
B_queue.push self.events @@ E_counter { name; n = f; time_us; tid }
|
||||
|
||||
let enter_manual_span ~(parent : explicit_span option) ~flavor
|
||||
~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name :
|
||||
explicit_span =
|
||||
(* get the id, or make a new one *)
|
||||
let id =
|
||||
match parent with
|
||||
| Some m -> Meta_map.find_exn key_async_id m.meta
|
||||
| None -> A.fetch_and_add span_id_gen_ 1
|
||||
in
|
||||
let time_us = now_us () in
|
||||
B_queue.push events
|
||||
(E_enter_manual_span
|
||||
{ id; time_us; tid = get_tid_ (); data; name; fun_name; flavor });
|
||||
{
|
||||
span = 0L;
|
||||
meta =
|
||||
Meta_map.(
|
||||
empty |> add key_async_id id |> add key_async_data (name, flavor));
|
||||
}
|
||||
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span
|
||||
: unit =
|
||||
let time_us = time_ns *. 1e-3 in
|
||||
B_queue.push self.events
|
||||
@@ E_enter_manual_span
|
||||
{ id = trace_id; time_us; tid; data; name; fun_name; flavor }
|
||||
|
||||
let exit_manual_span (es : explicit_span) : unit =
|
||||
let id = Meta_map.find_exn key_async_id es.meta in
|
||||
let name, flavor = Meta_map.find_exn key_async_data es.meta in
|
||||
let data =
|
||||
match Meta_map.find key_data es.meta with
|
||||
| None -> []
|
||||
| Some r -> !r
|
||||
in
|
||||
let time_us = now_us () in
|
||||
let tid = get_tid_ () in
|
||||
B_queue.push events
|
||||
(E_exit_manual_span { tid; id; name; time_us; data; flavor })
|
||||
|
||||
let add_data_to_manual_span (es : explicit_span) data =
|
||||
if data <> [] then (
|
||||
let data_ref, add =
|
||||
match Meta_map.find key_data es.meta with
|
||||
| Some r -> r, false
|
||||
| None -> ref [], true
|
||||
in
|
||||
let new_data = List.rev_append data !data_ref in
|
||||
data_ref := new_data;
|
||||
if add then es.meta <- Meta_map.add key_data data_ref es.meta
|
||||
)
|
||||
|
||||
let message ?span:_ ~data msg : unit =
|
||||
let time_us = now_us () in
|
||||
let tid = get_tid_ () in
|
||||
B_queue.push events (E_message { tid; time_us; msg; data })
|
||||
|
||||
let counter_float ~data:_ name f =
|
||||
let time_us = now_us () in
|
||||
let tid = get_tid_ () in
|
||||
B_queue.push events (E_counter { name; n = f; time_us; tid })
|
||||
|
||||
let counter_int ~data name i = counter_float ~data name (float_of_int i)
|
||||
let name_process name : unit = B_queue.push events (E_name_process { name })
|
||||
|
||||
let name_thread name : unit =
|
||||
let tid = get_tid_ () in
|
||||
B_queue.push events (E_name_thread { tid; name })
|
||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
|
||||
~trace_id (_ : span) : unit =
|
||||
let time_us = time_ns *. 1e-3 in
|
||||
B_queue.push self.events
|
||||
@@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor }
|
||||
end in
|
||||
(module M)
|
||||
let events = B_queue.create () in
|
||||
let t_write =
|
||||
Thread.create
|
||||
(fun () -> Fun.protect ~finally @@ fun () -> bg_thread ~mode ~out events)
|
||||
()
|
||||
in
|
||||
|
||||
(* ticker thread, regularly sends a message to the writer thread.
|
||||
no need to join it. *)
|
||||
let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () in
|
||||
let st : Internal_st.t = { active = A.make true; events; t_write } in
|
||||
Sub.Subscriber.Sub { st; callbacks = (module M) }
|
||||
|
||||
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
||||
: collector =
|
||||
let sub = subscriber_ ~finally ~mode ~out () in
|
||||
Sub.collector sub
|
||||
|
||||
let[@inline] subscriber ~out () : Sub.t =
|
||||
subscriber_ ~finally:ignore ~mode:`Single ~out ()
|
||||
|
||||
let[@inline] collector ~out () : collector =
|
||||
collector_ ~finally:ignore ~mode:`Single ~out ()
|
||||
|
|
@ -462,10 +401,18 @@ let with_setup ?out () f =
|
|||
setup ?out ();
|
||||
Fun.protect ~finally:Trace_core.shutdown f
|
||||
|
||||
module Internal_ = struct
|
||||
let mock_all_ () = Mock_.enabled := true
|
||||
module Private_ = struct
|
||||
let mock_all_ () =
|
||||
Mock_.enabled := true;
|
||||
Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns;
|
||||
Sub.Private_.get_tid_ := Some Mock_.get_tid_;
|
||||
()
|
||||
|
||||
let on_tracing_error = on_tracing_error
|
||||
|
||||
let subscriber_jsonl ~finally ~out () =
|
||||
subscriber_ ~finally ~mode:`Jsonl ~out ()
|
||||
|
||||
let collector_jsonl ~finally ~out () : collector =
|
||||
collector_ ~finally ~mode:`Jsonl ~out ()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,3 @@
|
|||
val collector :
|
||||
out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector
|
||||
(** Make a collector that writes into the given output.
|
||||
See {!setup} for more details. *)
|
||||
|
||||
type output =
|
||||
[ `Stdout
|
||||
| `Stderr
|
||||
|
|
@ -16,6 +11,14 @@ type output =
|
|||
named "foo"
|
||||
*)
|
||||
|
||||
val subscriber : out:[< output ] -> unit -> Trace_subscriber.t
|
||||
(** A subscriber emitting TEF traces into [out].
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
val collector : out:[< output ] -> unit -> Trace_core.collector
|
||||
(** Make a collector that writes into the given output.
|
||||
See {!setup} for more details. *)
|
||||
|
||||
val setup : ?out:[ output | `Env ] -> unit -> unit
|
||||
(** [setup ()] installs the collector depending on [out].
|
||||
|
||||
|
|
@ -39,12 +42,18 @@ val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
|
|||
|
||||
(**/**)
|
||||
|
||||
module Internal_ : sig
|
||||
module Private_ : sig
|
||||
val mock_all_ : unit -> unit
|
||||
(** use fake, deterministic timestamps, TID, PID *)
|
||||
|
||||
val on_tracing_error : (string -> unit) ref
|
||||
|
||||
val subscriber_jsonl :
|
||||
finally:(unit -> unit) ->
|
||||
out:[ `File_append of string | `Output of out_channel ] ->
|
||||
unit ->
|
||||
Trace_subscriber.t
|
||||
|
||||
val collector_jsonl :
|
||||
finally:(unit -> unit) ->
|
||||
out:[ `File_append of string | `Output of out_channel ] ->
|
||||
|
|
|
|||
2102
test/t1.expected
2102
test/t1.expected
File diff suppressed because it is too large
Load diff
|
|
@ -25,9 +25,9 @@ let run () =
|
|||
Trace.enter_manual_sub_span ~parent:pseudo_async_sp
|
||||
~flavor:
|
||||
(if _i mod 3 = 0 then
|
||||
`Sync
|
||||
else
|
||||
`Async)
|
||||
`Sync
|
||||
else
|
||||
`Async)
|
||||
~__FILE__ ~__LINE__ "sub-sleep"
|
||||
in
|
||||
|
||||
|
|
@ -43,5 +43,5 @@ let run () =
|
|||
done
|
||||
|
||||
let () =
|
||||
Trace_tef.Internal_.mock_all_ ();
|
||||
Trace_tef.Private_.mock_all_ ();
|
||||
Trace_tef.with_setup ~out:`Stdout () @@ fun () -> run ()
|
||||
|
|
|
|||
1642
test/t2.expected
1642
test/t2.expected
File diff suppressed because it is too large
Load diff
|
|
@ -14,7 +14,7 @@ let%trace rec fib2 x =
|
|||
fib2 (x - 1) + fib2 (x - 2)
|
||||
|
||||
let () =
|
||||
Trace_tef.Internal_.mock_all_ ();
|
||||
Trace_tef.Private_.mock_all_ ();
|
||||
let@ () = Trace_tef.with_setup ~out:`Stdout () in
|
||||
|
||||
Trace_core.set_process_name "main";
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue