remove subscriber entirely

This commit is contained in:
Simon Cruanes 2026-01-16 19:50:50 -05:00
parent 4b4569f956
commit 481b5a10b2
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 0 additions and 407 deletions

View file

@ -1,114 +0,0 @@
(** Callbacks used for subscribers.
Each subscriber defines a set of callbacks, for each possible tracing event.
These callbacks take a custom state that is paired with the callbacks in
{!Subscriber.t}.
To use a default implementation for some callbacks, use:
{[
module My_callbacks = struct
type st =
include Trace_subscriber.Callbacks.Dummy
let on_init (state:st) ~time_ns : unit =
(* … other custom callbacks … *)
end
]}
{b NOTE}: the [trace_id] passed alongside manual spans is guaranteed to be
at least 64 bits. *)
open Trace_core
type 'st t = {
on_init: 'st -> time_ns:int64 -> unit;
(** Called when the subscriber is initialized in a collector *)
new_span_id: 'st -> Span_sub.span_id;
(** How to generate a new span ID?
@since NEXT_RELEASE *)
new_trace_id: 'st -> Span_sub.trace_id;
(** How to generate a new trace ID?
@since NEXT_RELEASE *)
on_shutdown: 'st -> time_ns:int64 -> unit;
(** Called when the collector is shutdown *)
on_enter_span: 'st -> Span_sub.t -> unit; (** Enter a span *)
on_exit_span: 'st -> time_ns:int64 -> tid:int -> Span_sub.t -> unit;
(** Exit a span. This and [on_enter_span] must follow strict stack
discipline.
@param tid the id of the thread on which the span was exited. *)
on_message:
'st ->
time_ns:int64 ->
tid:int ->
span:Span_sub.t option ->
params:extension_parameter list ->
data:(string * Trace_core.user_data) list ->
string ->
unit;
(** Emit a log message *)
on_counter:
'st ->
time_ns:int64 ->
tid:int ->
params:extension_parameter list ->
data:(string * Trace_core.user_data) list ->
name:string ->
float ->
unit;
(** Emit the current value of a counter *)
on_extension_event: 'st -> time_ns:int64 -> tid:int -> extension_event -> unit;
(** Extension event
@since 0.8 *)
}
(** Callbacks for a subscriber. There is one callback per event in {!Trace}. The
type ['st] is the state that is passed to every single callback. *)
(** Dummy callbacks. It can be useful to reuse some of these functions in a real
subscriber that doesn't want to handle {b all} events, but only some of
them.
To write a subscriber that only supports some callbacks, this can be handy:
{[
module My_callbacks = struct
type st = my_own_state
include Callbacks.Dummy
let on_counter (st:st) ~time_ns ~tid ~data ~name v : unit = ...
end
]} *)
module Dummy = struct
let on_init _ ~time_ns:_ = ()
let new_span_id _ = Int64.min_int
let new_trace_id _ = -1L
let on_shutdown _ ~time_ns:_ = ()
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~params:_ ~data:_ _msg = ()
let on_counter _ ~time_ns:_ ~tid:_ ~params:_ ~data:_ ~name:_ _v = ()
let on_enter_span _ _sp = ()
let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
let on_extension_event _ ~time_ns:_ ~tid:_ _ = ()
end
(** Build a set of callbacks.
@since NEXT_RELEASE *)
let make ~new_span_id ?(new_trace_id = Dummy.new_trace_id)
?(on_init = Dummy.on_init) ?(on_enter_span = Dummy.on_enter_span)
?(on_exit_span = Dummy.on_exit_span) ?(on_message = Dummy.on_message)
?(on_counter = Dummy.on_counter)
?(on_extension_event = Dummy.on_extension_event)
?(on_shutdown = Dummy.on_shutdown) () : _ t =
{
on_init;
new_span_id;
new_trace_id;
on_enter_span;
on_exit_span;
on_message;
on_counter;
on_extension_event;
on_shutdown;
}
(** Dummy callbacks, ignores all events. *)
let dummy () : _ t = make ~new_span_id:(fun () -> Span_sub.dummy_span_id) ()

View file

@ -1,99 +0,0 @@
(** Trace subscribers *)
(** A trace subscriber. It pairs a set of callbacks with the state they need
(which can contain a file handle, a socket to write events to, config,
etc.).
The design goal for this is that it should be possible to avoid allocations
whenever the trace collector invokes the callbacks. *)
type t =
| Sub : {
st: 'st;
callbacks: 'st Callbacks.t;
}
-> t
(** Dummy subscriber that ignores every call. *)
let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }
open struct
type st = t array
let new_span_id (st : st) =
let (Sub { st; callbacks = cb }) = Array.get st 0 in
cb.new_span_id st
let new_trace_id st =
let (Sub { st; callbacks = cb }) = Array.get st 0 in
cb.new_trace_id st
let on_init st ~time_ns =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_init st ~time_ns
done
let on_shutdown st ~time_ns =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_shutdown st ~time_ns
done
let on_enter_span st span =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_enter_span st span
done
let on_exit_span st ~time_ns ~tid span =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_exit_span st ~time_ns ~tid span
done
let on_message st ~time_ns ~tid ~span ~params ~data msg =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_message st ~time_ns ~tid ~span ~params ~data msg
done
let on_counter st ~time_ns ~tid ~params ~data ~name n =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_counter st ~time_ns ~tid ~params ~data ~name n
done
let on_extension_event st ~time_ns ~tid ev : unit =
for i = 0 to Array.length st - 1 do
let (Sub { st; callbacks = cb }) = Array.get st i in
cb.on_extension_event st ~time_ns ~tid ev
done
let tee_cb : t array Callbacks.t =
{
Callbacks.on_init;
new_span_id;
new_trace_id;
on_enter_span;
on_exit_span;
on_message;
on_counter;
on_extension_event;
on_shutdown;
}
end
(** Tee multiple subscribers, ie return a subscriber that forwards to every
subscriber in [subs].
To generate a new span or trace ID, the first subscriber of the list is
used. *)
let tee_l (subs : t list) : t =
match subs with
| [] -> dummy
| [ s ] -> s
| l -> Sub { st = Array.of_list l; callbacks = tee_cb }
(** [tee s1 s2] is a subscriber that forwards every call to [s1] and [s2] both.
*)
let tee (s1 : t) (s2 : t) : t = tee_l [ s1; s2 ]

View file

@ -1,130 +0,0 @@
open Trace_core
type t = Collector.t
module Private_ = struct
let mock = ref false
let get_now_ns_ = ref Time_.get_time_ns
let get_tid_ = ref Thread_.get_tid
(** Now, in nanoseconds *)
let[@inline] now_ns () : int64 =
if !mock then
!get_now_ns_ ()
else
Time_.get_time_ns ()
let[@inline] tid_ () : int =
if !mock then
!get_tid_ ()
else
Thread_.get_tid ()
end
open struct
module A = Trace_core.Internal_.Atomic_
open Private_
type Trace_core.span += Span_sub of Span_sub.t
let enter_span (Subscriber.Sub { st; callbacks = cb }) ~__FUNCTION__ ~__FILE__
~__LINE__ ~params ~data ~parent name : span =
let id = cb.new_span_id st in
let tid = tid_ () in
let time_ns = now_ns () in
let trace_id =
match parent with
| P_some (Span_sub span) -> span.trace_id
| _ -> cb.new_trace_id st
in
let flavor = ref `Sync in
List.iter
(function
| Core_ext.Extension_span_flavor f -> flavor := f
| _ -> ())
params;
let span : Span_sub.t =
{
name;
id;
tid;
__FUNCTION__;
__FILE__;
__LINE__;
data;
parent;
trace_id;
flavor = !flavor;
params;
time_ns;
time_exit_ns = Int64.max_int;
}
in
cb.on_enter_span st span;
Span_sub span
let exit_span (Subscriber.Sub { st; callbacks = cb }) span : unit =
match span with
| Span_sub span ->
let time_ns = now_ns () in
span.time_exit_ns <- time_ns;
let tid = tid_ () in
cb.on_exit_span st ~time_ns ~tid span
| _ -> ()
let add_data_to_span _sub span data =
match span with
| Span_sub span -> span.data <- List.rev_append data span.data
| _ -> ()
let message (Subscriber.Sub { st; callbacks = cb }) ~params ~data ~span msg :
unit =
let time_ns = now_ns () in
let tid = tid_ () in
let span =
match span with
| Some (Span_sub s) -> Some s
| _ -> None
in
cb.on_message st ~time_ns ~tid ~span ~params ~data msg
let counter_float (Subscriber.Sub { st; callbacks = cb }) ~params ~data name f
: unit =
let time_ns = now_ns () in
let tid = tid_ () in
cb.on_counter st ~tid ~time_ns ~params ~data ~name f
let[@inline] counter_int sub ~params ~data name i =
counter_float sub ~params ~data name (float_of_int i)
let init (Subscriber.Sub { st; callbacks = cb }) =
(* init code *)
let time_ns = now_ns () in
cb.on_init st ~time_ns
let shutdown (Subscriber.Sub { st; callbacks = cb }) =
let time_ns = now_ns () in
cb.on_shutdown st ~time_ns
let extension_event (Subscriber.Sub { st; callbacks = cb }) ev =
let tid = tid_ () in
let time_ns = now_ns () in
cb.on_extension_event st ~time_ns ~tid ev
(* TODO: do we want to track this? *)
let current_span _ = None
let coll_cbs : t Collector.Callbacks.t =
Collector.Callbacks.make ~enter_span ~exit_span ~current_span ~message
~add_data_to_span ~counter_int ~counter_float ~extension:extension_event
~init ~shutdown ()
end
(** A collector that calls the callbacks of subscriber *)
let collector (self : Subscriber.t) : collector =
Collector.C_some (self, coll_cbs)

View file

@ -1,64 +0,0 @@
(** Generic subscribers.
This defines the notion of a {b subscriber}, a set of callbacks for every
trace event. It also defines a collector that needs to be installed for the
subscriber(s) to be called.
Thanks to {!Subscriber.tee_l} it's possible to combine multiple subscribers
into a single collector.
@since 0.8 *)
module Callbacks = Callbacks
module Subscriber = Subscriber
module Span_sub = Span_sub
(** {2 Main API} *)
type t = Subscriber.t
(** A trace subscriber. It pairs a set of callbacks with the state they need
(which can contain a file handle, a socket to write events to, config,
etc.).
The design goal for this is that it should be possible to avoid allocations
whenever the trace collector invokes the callbacks. *)
val collector : t -> Trace_core.collector
(** A collector that calls the subscriber's callbacks. It uses [mtime] (if
available) to obtain timestamps. *)
(** A counter-based span generator.
@since NEXT_RELEASE *)
module Span_id_generator : sig
type t
val create : unit -> t
val gen : t -> Span_sub.span_id
end
(** A counter-based generator.
@since NEXT_RELEASE *)
module Trace_id_generator : sig
type t
val create : unit -> t
val gen : t -> Span_sub.trace_id
end
(**/**)
module Private_ : sig
val mock : bool ref
(** Global mock flag. If enable, all timestamps, tid, etc should be faked. *)
val get_now_ns_ : (unit -> int64) ref
(** The callback used to get the current timestamp *)
val get_tid_ : (unit -> int) ref
(** The callback used to get the current thread's id *)
val now_ns : unit -> int64
(** Get the current timestamp, or a mock version *)
end
(**/**)