emitter: add self_metrics, track batch n_dropped, sampler rate

This commit is contained in:
Simon Cruanes 2026-02-17 20:41:20 -05:00
parent 71bb7d1996
commit 07070e3d4a
14 changed files with 128 additions and 41 deletions

View file

@ -106,17 +106,20 @@ module Send = struct
(** Turn the writing end of the queue into an emitter.
@param close_queue_on_close
if true, closing the emitter will close the queue *)
let to_emitter ~close_queue_on_close (self : 'a t) :
let to_emitter ~signal_name ~close_queue_on_close (self : 'a t) :
'a Opentelemetry_emitter.Emitter.t =
let closed () = closed self in
let enabled () = not (closed ()) in
let emit x = if x <> [] then push self x in
let tick ~mtime:_ = () in
(* the exporter will emit these, the queue is shared *)
let self_metrics ~now:_ () = [] in
(* NOTE: we cannot actually flush, only close. Emptying the queue is
fundamentally asynchronous because it's done by consumers *)
let flush_and_close () = if close_queue_on_close then close self in
{ closed; enabled; emit; tick; flush_and_close }
{ signal_name; closed; enabled; emit; tick; flush_and_close; self_metrics }
end
type 'a t = {

View file

@ -12,8 +12,18 @@ let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t =
then [e] itself will be closed. *)
let closed_here = Atomic.make false in
let signal_name = e.signal_name in
let enabled () = (not (Atomic.get closed_here)) && e.enabled () in
let closed () = Atomic.get closed_here || e.closed () in
let dropped_name = Printf.sprintf "otel.sdk.%s.batch.dropped" signal_name in
let self_metrics ~now () =
let m =
Opentelemetry_core.Metrics.(
sum ~name:dropped_name [ int ~now (Batch.n_dropped self) ])
in
m :: e.self_metrics ~now ()
in
let flush_and_close () =
if not (Atomic.exchange closed_here true) then (
(* NOTE: we need to close this wrapping emitter first, to prevent
@ -52,7 +62,15 @@ let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t =
)
in
{ Emitter.closed; enabled; flush_and_close; tick; emit }
{
Emitter.closed;
signal_name;
self_metrics;
enabled;
flush_and_close;
tick;
emit;
}
let add_batching ~timeout ~batch_size (emitter : 'a Emitter.t) : 'a Emitter.t =
let b = Batch.make ~batch:batch_size ~timeout () in

View file

@ -18,16 +18,26 @@ type closing_behavior =
when is this emitter closing. Default [`Close_when_all_closed]. *)
let combine_l ?(closing : closing_behavior = `Close_when_all_closed)
(es : 'a t list) : 'a t =
assert (es <> []);
let signal_name = (List.hd es).signal_name in
let closed =
fun () ->
match closing with
| `Close_when_all_closed -> List.for_all closed es
| `Close_when_one_closed -> List.exists closed es
in
let self_metrics ~now () =
List.flatten @@ List.map (fun e -> e.self_metrics ~now ()) es
in
let enabled () = not (closed ()) in
let emit x = if x <> [] then List.iter (fun e -> emit e x) es in
let tick ~mtime = List.iter (tick ~mtime) es in
let flush_and_close () = List.iter flush_and_close es in
{ closed; enabled; emit; tick; flush_and_close }
{ signal_name; self_metrics; closed; enabled; emit; tick; flush_and_close }
let combine_l ?closing es : _ t =
match es with
| [] -> dummy
| _ -> combine_l ?closing es
let combine e1 e2 : _ t = combine_l [ e1; e2 ]

View file

@ -1,11 +1,19 @@
open Opentelemetry_emitter
let add_sampler (self : Sampler.t) (e : _ Emitter.t) : _ Emitter.t =
let signal_name = e.signal_name in
let enabled () = e.enabled () in
let closed () = Emitter.closed e in
let flush_and_close () = Emitter.flush_and_close e in
let tick ~mtime = Emitter.tick e ~mtime in
let m_rate = Printf.sprintf "otel.sdk.%s.sampler.actual-rate" signal_name in
let self_metrics ~now () =
Opentelemetry_core.Metrics.(
gauge ~name:m_rate [ float ~now (Sampler.actual_rate self) ])
:: e.self_metrics ~now ()
in
let emit l =
if l <> [] && e.enabled () then (
let accepted = List.filter (fun _x -> Sampler.accept self) l in
@ -13,6 +21,14 @@ let add_sampler (self : Sampler.t) (e : _ Emitter.t) : _ Emitter.t =
)
in
{ Emitter.closed; enabled; flush_and_close; tick; emit }
{
Emitter.closed;
self_metrics;
signal_name;
enabled;
flush_and_close;
tick;
emit;
}
let sample ~proba_accept e = add_sampler (Sampler.create ~proba_accept ()) e

View file

@ -17,15 +17,15 @@ let debug ?(clock = OTEL.Clock.ptime_clock) ?(out = Format.err_formatter) () :
active = (fun () -> active);
clock;
emit_spans =
Emitter.make_simple () ~emit:(fun sp ->
Emitter.make ~signal_name:"spans" () ~emit:(fun sp ->
List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp);
emit_logs =
Emitter.make_simple () ~emit:(fun log ->
Emitter.make ~signal_name:"logs" () ~emit:(fun log ->
List.iter
(Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record)
log);
emit_metrics =
Emitter.make_simple () ~emit:(fun m ->
Emitter.make ~signal_name:"metrics" () ~emit:(fun m ->
List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) m);
on_tick = Cb_set.register ticker;
tick = (fun () -> Cb_set.trigger ticker);

View file

@ -17,18 +17,21 @@ module BQ_emitters = struct
let logs_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) :
_ OTEL.Emitter.t =
Bounded_queue.Send.to_emitter q ~close_queue_on_close:false
Bounded_queue.Send.to_emitter q ~signal_name:"logs"
~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_logs_or_empty
let spans_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) :
_ OTEL.Emitter.t =
Bounded_queue.Send.to_emitter q ~close_queue_on_close:false
Bounded_queue.Send.to_emitter q ~signal_name:"spans"
~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map
OTEL.Any_signal_l.of_spans_or_empty
let metrics_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) :
_ OTEL.Emitter.t =
Bounded_queue.Send.to_emitter q ~close_queue_on_close:false
Bounded_queue.Send.to_emitter q ~signal_name:"metrics"
~close_queue_on_close:false
|> Opentelemetry_emitter.Emitter.flat_map
OTEL.Any_signal_l.of_metrics_or_empty
end
@ -64,7 +67,7 @@ let create ~clock ~(q : OTEL.Any_signal_l.t Bounded_queue.t)
[ OTEL.Metrics.int ~now (Bounded_queue.Recv.high_watermark q.recv) ]
and m_discarded =
OTEL.Metrics.sum ~is_monotonic:true
~name:"otel_ocaml.exporter_queue.discarded"
~name:"otel.sdk.exporter_queue.discarded"
[ OTEL.Metrics.int ~now (Bounded_queue.Recv.num_discarded q.recv) ]
in
m_size :: m_cap :: m_discarded :: Consumer.self_metrics consumer ~clock

View file

@ -47,12 +47,13 @@ let stdout ?(clock = OTEL.Clock.ptime_clock) () : OTEL.Exporter.t =
let active, trigger = Aswitch.create () in
let tick () = Cb_set.trigger ticker in
let mk_emitter pp_signal =
let mk_emitter ~signal_name pp_signal =
let emit l =
if Aswitch.is_off active then raise Emitter.Closed;
pp_vlist mutex pp_signal out l
in
let enabled () = Aswitch.is_on active in
let self_metrics ~now:_ () = [] in
let tick ~mtime:_ = () in
let flush_and_close () =
if Aswitch.is_on active then
@ -60,12 +61,20 @@ let stdout ?(clock = OTEL.Clock.ptime_clock) () : OTEL.Exporter.t =
Format.pp_print_flush out ()
in
let closed () = Aswitch.is_off active in
{ Emitter.emit; closed; enabled; tick; flush_and_close }
{
Emitter.emit;
signal_name;
self_metrics;
closed;
enabled;
tick;
flush_and_close;
}
in
let emit_spans = mk_emitter pp_span in
let emit_logs = mk_emitter pp_log in
let emit_metrics = mk_emitter pp_metric in
let emit_spans = mk_emitter ~signal_name:"spans" pp_span in
let emit_logs = mk_emitter ~signal_name:"logs" pp_log in
let emit_metrics = mk_emitter ~signal_name:"metrics" pp_metric in
let self_metrics () = [] in
let shutdown () =

View file

@ -1,4 +1,5 @@
(** @deprecated Use {!Exporter_config} instead *)
[@@@deprecated "use Exporter_config instead"]
include module type of Exporter_config
[@@deprecated "use Exporter_config instead"]

View file

@ -77,4 +77,12 @@ let[@inline] shutdown (self : t) : unit = self.shutdown ()
let (cleanup [@deprecated "use shutdown instead"]) = shutdown
let[@inline] self_metrics (self : t) : _ list = self.self_metrics ()
let self_metrics (self : t) : _ list =
let now = Clock.now self.clock in
List.flatten
[
self.self_metrics ();
self.emit_spans.self_metrics ~now ();
self.emit_logs.self_metrics ~now ();
self.emit_metrics.self_metrics ~now ();
]

View file

@ -2,6 +2,8 @@
(name opentelemetry_emitter)
(public_name opentelemetry.emitter)
(libraries
(re_export opentelemetry.proto)
(re_export opentelemetry.util)
(re_export opentelemetry.atomic)
(re_export mtime))
(flags :standard -open Opentelemetry_atomic)

View file

@ -8,6 +8,7 @@
exception Closed
type -'a t = {
signal_name: string; (** Description of what signal is emitted *)
enabled: unit -> bool;
(** Return [true] if [emit] has a chance of doing something with the
signals it's given. *)
@ -20,6 +21,11 @@ type -'a t = {
(** True if the emitter is already closed. Beware TOCTOU bugs. *)
flush_and_close: unit -> unit;
(** Flush internally buffered signals, then close. *)
self_metrics:
now:Opentelemetry_util.Timestamp_ns.t ->
unit ->
Opentelemetry_proto.Metrics.metric list;
(** metrics about the emitter itself. *)
}
(** An emitter for values of type ['a]. *)
@ -33,6 +39,8 @@ let[@inline] closed self : bool = self.closed ()
let[@inline] flush_and_close (self : _ t) : unit = self.flush_and_close ()
let[@inline] self_metrics self ~now : _ list = self.self_metrics ~now ()
(** [map f emitter] returns a new emitter that applies [f] to signals item-wise
before passing them to [emitter] *)
let map (f : 'a -> 'b) (self : 'b t) : 'a t =
@ -56,9 +64,9 @@ let tap (f : 'a -> unit) (self : 'a t) : 'a t =
in
{ self with emit }
(** [make_simple ~emit ()] is an emitter that calls [emit]. *)
let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () :
_ t =
(** [make ~emit ()] is an emitter that calls [emit]. *)
let make ?tick ?closed ?enabled ?(flush_and_close = ignore)
?(self_metrics = fun ~now:_ () -> []) ~signal_name ~emit () : _ t =
let tick =
match tick with
| None -> fun ~mtime:_ -> ()
@ -71,14 +79,16 @@ let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () :
| None, Some f -> (fun () -> not (f ())), f
| Some f1, Some f2 -> f1, f2
in
{ tick; emit; flush_and_close; closed; enabled }
{ signal_name; tick; emit; flush_and_close; closed; enabled; self_metrics }
(** Dummy emitter, doesn't accept or emit anything. *)
let dummy : _ t =
{
signal_name = "dummy";
enabled = (fun () -> false);
emit = ignore;
tick = (fun ~mtime:_ -> ());
closed = (fun () -> true);
flush_and_close = ignore;
self_metrics = (fun ~now:_ () -> []);
}

View file

@ -1,14 +1,13 @@
(** Emitter that stores signals into a list, in reverse order (most recent
signals first). *)
let to_list (l : 'a list ref) : 'a Emitter.t =
let closed = Atomic.make false in
{
enabled = (fun () -> not (Atomic.get closed));
emit =
(fun sigs ->
if Atomic.get closed then raise Emitter.Closed;
l := List.rev_append sigs !l);
tick = (fun ~mtime:_ -> ());
closed = (fun () -> Atomic.get closed);
flush_and_close = (fun () -> Atomic.set closed true);
}
let to_list ~signal_name (l : 'a list ref) : 'a Emitter.t =
let closed_ = Atomic.make false in
let enabled = fun () -> not (Atomic.get closed_) in
let emit =
fun sigs ->
if Atomic.get closed_ then raise Emitter.Closed;
l := List.rev_append sigs !l
in
let closed () = Atomic.get closed_ in
let flush_and_close = fun () -> Atomic.set closed_ true in
Emitter.make ~signal_name ~emit ~enabled ~closed ~flush_and_close ()

View file

@ -44,7 +44,8 @@ module Util = struct
exporter. When this emitter is used to [emit signals], the current
exporter is looked up, [get_emitter exporter] is then used to locate the
relevant emitter [e'], and [signals] is in turn emitted in [e']. *)
let dynamic_forward_emitter_to_main_exporter ~get_emitter () : _ Emitter.t =
let dynamic_forward_emitter_to_main_exporter ~signal_name
~(get_emitter : Exporter.t -> _ Emitter.t) () : _ Emitter.t =
let enabled () = present () in
let closed () = not (enabled ()) in
let flush_and_close () = () in
@ -53,6 +54,11 @@ module Util = struct
| None -> ()
| Some exp -> Exporter.tick exp ~mtime
in
let self_metrics ~now () =
match get () with
| None -> []
| Some exp -> (get_emitter exp).self_metrics ~now ()
in
let emit signals =
if signals <> [] then (
match get () with
@ -62,7 +68,8 @@ module Util = struct
Emitter.emit emitter signals
)
in
{ Emitter.enabled; closed; emit; tick; flush_and_close }
Emitter.make ~signal_name ~enabled ~closed ~self_metrics ~flush_and_close
~tick ~emit ()
end
(** Aswitch of the current exporter, or {!Aswitch.dummy} *)
@ -77,15 +84,15 @@ let[@inline] active () : Aswitch.t =
@since NEXT_RELEASE *)
let dynamic_forward_to_main_exporter : Exporter.t =
let emit_logs =
Util.dynamic_forward_emitter_to_main_exporter ()
Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"logs"
~get_emitter:Exporter.(fun e -> e.emit_logs)
in
let emit_metrics =
Util.dynamic_forward_emitter_to_main_exporter ()
Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"metrics"
~get_emitter:Exporter.(fun e -> e.emit_metrics)
in
let emit_spans =
Util.dynamic_forward_emitter_to_main_exporter ()
Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"spans"
~get_emitter:Exporter.(fun e -> e.emit_spans)
in
let on_tick f =

View file

@ -7,7 +7,8 @@ let test_exporter : Otel.Exporter.t =
let open Otel.Exporter in
{
(dummy ()) with
emit_spans = Opentelemetry_emitter.To_list.to_list spans_emitted;
emit_spans =
Opentelemetry_emitter.To_list.to_list ~signal_name:"spans" spans_emitted;
}
let with_test_exporter f =