From 239d9d5aece86783b000c27831fc97aee1f2606f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 9 Dec 2025 22:00:39 -0500 Subject: [PATCH] feat exporter: add self_metrics --- src/client/batch.ml | 2 ++ src/client/batch.mli | 3 +++ src/client/bounded_queue.ml | 8 ++++++++ src/client/bounded_queue_sync.ml | 11 +++++++++-- src/client/exporter_add_batching.ml | 2 ++ src/client/exporter_combine.ml | 2 ++ src/client/exporter_debug.ml | 1 + src/client/exporter_queued.ml | 25 ++++++++++++++++++++++++- src/client/exporter_stdout.ml | 2 ++ src/client/generic_consumer.ml | 6 ------ src/core/exporter.ml | 4 ++++ src/core/log_record.ml | 2 ++ src/core/metrics.ml | 2 ++ src/core/span.ml | 2 ++ src/lib/main_exporter.ml | 9 +++++++++ 15 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 57e7eb4f..00029f74 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -22,6 +22,8 @@ let _dummy_start = Mtime.min_stamp let _empty_state : _ state = { q = []; size = 0; start = _dummy_start } +let[@inline] cur_size (self : _ t) : int = (Atomic.get self.st).size + let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = let batch = min batch max_batch_size in let high_watermark = diff --git a/src/client/batch.mli b/src/client/batch.mli index 56256851..88a5dfc0 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -53,6 +53,9 @@ val push : 'a t -> 'a list -> [ `Dropped | `Ok ] val push' : 'a t -> 'a list -> unit (** Like {!push} but ignores the result *) +val cur_size : _ t -> int +(** Number of elements in the current batch *) + open Opentelemetry_emitter val wrap_emitter : 'a t -> 'a Emitter.t -> 'a Emitter.t diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index 709fcead..07d43ada 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -21,11 +21,15 @@ module Common = struct This should be as fast and cheap as possible. *) num_discarded: unit -> int; (** How many items were discarded? *) + size: unit -> int; + (** Snapshot of how many items are currently in the queue *) } let[@inline] num_discarded self = self.num_discarded () let[@inline] closed (self : t) : bool = self.closed () + + let[@inline] size (self : t) : int = self.size () end (** Receiving side *) @@ -46,6 +50,8 @@ module Recv = struct let[@inline] num_discarded self = self.common.num_discarded () + let[@inline] size self = self.common.size () + let map (type a b) (f : a -> b) (self : a t) : b t = { self with @@ -78,6 +84,8 @@ module Send = struct let[@inline] num_discarded self = self.common.num_discarded () + let[@inline] size self = self.common.size () + let map (type a b) (f : a list -> b list) (self : b t) : a t = { self with diff --git a/src/client/bounded_queue_sync.ml b/src/client/bounded_queue_sync.ml index a62e3a4a..d47dff99 100644 --- a/src/client/bounded_queue_sync.ml +++ b/src/client/bounded_queue_sync.ml @@ -12,6 +12,8 @@ module Q : sig val close : _ t -> unit + val size : _ t -> int + val closed : _ t -> bool val try_pop : 'a t -> 'a BQ.pop_result @@ -41,6 +43,9 @@ end = struct a value of type [bool] which OCaml's memory model should guarantee. *) let[@inline] closed self = self.closed + (* NOTE: race condition here is also benign in absoence of tearing. *) + let[@inline] size self = Queue.length self.q + let close (self : _ t) = UM.protect self.mutex @@ fun () -> if not self.closed then self.closed <- true @@ -73,7 +78,8 @@ end = struct done; let num_discarded = List.length !to_push in - (* Printf.eprintf "bq: pushed %d items\n%!" (List.length xs - num_discarded); *) + (* Printf.eprintf "bq: pushed %d items (discarded: %d)\n%!" (List.length xs - num_discarded) num_discarded; *) + Pushed { num_discarded } ) end @@ -108,12 +114,13 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t = let push x = push self x in let on_non_empty = Cb_set.register self.on_non_empty in let try_pop () = try_pop self in + let size () = Q.size self.q in let close () = Q.close self.q; (* waiters will want to know *) Cb_set.trigger self.on_non_empty in - let common = { BQ.Common.closed; num_discarded } in + let common = { BQ.Common.closed; num_discarded; size } in { BQ.send = { push; close; common }; recv = { try_pop; on_non_empty; common }; diff --git a/src/client/exporter_add_batching.ml b/src/client/exporter_add_batching.ml index 38c5dcef..0c4766fb 100644 --- a/src/client/exporter_add_batching.ml +++ b/src/client/exporter_add_batching.ml @@ -26,6 +26,7 @@ let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) : let active = exp.active in let tick = exp.tick in let on_tick = exp.on_tick in + let self_metrics () = exp.self_metrics () in let shutdown () = let open Opentelemetry_emitter in Emitter.flush_and_close emit_spans; @@ -43,4 +44,5 @@ let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) : on_tick; tick; shutdown; + self_metrics; } diff --git a/src/client/exporter_combine.ml b/src/client/exporter_combine.ml index 6b05c8ce..0c2dbbc9 100644 --- a/src/client/exporter_combine.ml +++ b/src/client/exporter_combine.ml @@ -30,6 +30,8 @@ let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es); tick = (fun () -> List.iter tick es); shutdown = (fun () -> shutdown_l es ~trigger); + self_metrics = + (fun () -> List.fold_left (fun acc e -> e.self_metrics () @ acc) [] es); } ) diff --git a/src/client/exporter_debug.ml b/src/client/exporter_debug.ml index 212e6637..946e1af4 100644 --- a/src/client/exporter_debug.ml +++ b/src/client/exporter_debug.ml @@ -22,6 +22,7 @@ let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t = List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) m); on_tick = Cb_set.register ticker; tick = (fun () -> Cb_set.trigger ticker); + self_metrics = (fun () -> []); shutdown = (fun () -> Format.fprintf out "CLEANUP@."; diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 490c2320..ca17868d 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -47,6 +47,20 @@ let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) let tick () = Cb_set.trigger tick_set in let on_tick f = Cb_set.register tick_set f in + let self_metrics () : _ list = + let now = OTEL.Timestamp_ns.now_unix_ns () in + let m_size = + OTEL.Metrics.gauge ~name:"otel-ocaml.exporter-queue.size" + [ OTEL.Metrics.int ~now (Bounded_queue.Recv.size q.recv) ] + in + let m_discarded = + OTEL.Metrics.sum ~is_monotonic:true + ~name:"otel-ocaml.exporter-queue.discarded" + [ OTEL.Metrics.int ~now (Bounded_queue.Recv.num_discarded q.recv) ] + in + m_size :: m_discarded :: Consumer.self_metrics consumer + in + let shutdown () = if Aswitch.is_on active && not (Atomic.exchange shutdown_started true) then ( (* flush all emitters *) @@ -68,4 +82,13 @@ let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) Aswitch.on_turn_off (Consumer.active consumer) shutdown; let active () = active in - { active; emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown } + { + active; + emit_logs; + emit_metrics; + emit_spans; + self_metrics; + tick; + on_tick; + shutdown; + } diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index 6cab03aa..f4d333d6 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -64,6 +64,7 @@ let stdout : OTEL.Exporter.t = let emit_logs = mk_emitter pp_log in let emit_metrics = mk_emitter pp_metric in + let self_metrics () = [] in let shutdown () = Emitter.flush_and_close emit_spans; Emitter.flush_and_close emit_logs; @@ -77,6 +78,7 @@ let stdout : OTEL.Exporter.t = emit_logs; emit_metrics; on_tick = Cb_set.register ticker; + self_metrics; tick; shutdown; } diff --git a/src/client/generic_consumer.ml b/src/client/generic_consumer.ml index d6eee7c1..c54ed78e 100644 --- a/src/client/generic_consumer.ml +++ b/src/client/generic_consumer.ml @@ -194,12 +194,6 @@ end = struct [ sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true [ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ]; - sum ~name:"otel-ocaml.export.discarded-by-bounded-queue" - ~is_monotonic:true - [ - int ~now:(Mtime.to_uint64_ns now) - (Bounded_queue.Recv.num_discarded self.q); - ]; ] let to_consumer (self : state) : Consumer.t = diff --git a/src/core/exporter.ml b/src/core/exporter.ml index f592cda0..d4630173 100644 --- a/src/core/exporter.ml +++ b/src/core/exporter.ml @@ -24,6 +24,7 @@ type t = { responsible for sending remaining batches, flushing sockets, etc. To know when shutdown is complete, register callbacks on [active]. @since 0.12 *) + self_metrics: unit -> Metrics.t list; (** metrics about the exporter itself *) } (** Main exporter interface. *) @@ -39,6 +40,7 @@ let dummy () : t = on_tick = Cb_set.register ticker; tick = (fun () -> Cb_set.trigger ticker); shutdown = (fun () -> Aswitch.turn_off trigger); + self_metrics = (fun () -> []); } let[@inline] send_trace (self : t) (l : Proto.Trace.span list) = @@ -73,3 +75,5 @@ let on_stop self f = Aswitch.on_turn_off (self.active ()) f let[@inline] shutdown (self : t) : unit = self.shutdown () let (cleanup [@deprecated "use shutdown instead"]) = shutdown + +let[@inline] self_metrics (self : t) : _ list = self.self_metrics () diff --git a/src/core/log_record.ml b/src/core/log_record.ml index 0de8ef18..8d9e7a0f 100644 --- a/src/core/log_record.ml +++ b/src/core/log_record.ml @@ -45,6 +45,8 @@ type flags = Proto.Logs.log_record_flags = let pp_flags = Proto.Logs.pp_log_record_flags +let pp = Proto.Logs.pp_log_record + (** Make a single log entry *) let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ()) ?severity ?log_level ?flags ?trace_id ?span_id ?(attrs = []) diff --git a/src/core/metrics.ml b/src/core/metrics.ml index f91538f8..dcf9efa3 100644 --- a/src/core/metrics.ml +++ b/src/core/metrics.ml @@ -13,6 +13,8 @@ type t = Metrics.metric distribution. It is composed of one or more data points that have precise values and time stamps. Each distinct metric should have a distinct name. *) +let pp = Proto.Metrics.pp_metric + open struct let _program_start = Timestamp_ns.now_unix_ns () end diff --git a/src/core/span.ml b/src/core/span.ml index 7d2c3920..86f3826e 100644 --- a/src/core/span.ml +++ b/src/core/span.ml @@ -28,6 +28,8 @@ let[@inline] trace_id self = Trace_id.of_bytes self.trace_id let[@inline] is_not_dummy self = Span_id.is_valid (id self) +let pp = Proto.Trace.pp_span + let default_kind = ref Proto.Trace.Span_kind_unspecified let make ?(kind = !default_kind) ?trace_state ?(attrs = []) ?(events = []) diff --git a/src/lib/main_exporter.ml b/src/lib/main_exporter.ml index 242f51f1..e35035f3 100644 --- a/src/lib/main_exporter.ml +++ b/src/lib/main_exporter.ml @@ -92,6 +92,11 @@ let dynamic_forward_to_main_exporter : Exporter.t = | None -> () | Some exp -> exp.tick () in + let self_metrics () = + match get () with + | None -> [] + | Some exp -> exp.self_metrics () + in let shutdown () = () in { Exporter.active; @@ -101,8 +106,12 @@ let dynamic_forward_to_main_exporter : Exporter.t = on_tick; tick; shutdown; + self_metrics; } +let self_metrics () : Metrics.t list = + dynamic_forward_to_main_exporter.self_metrics () + (** Set the global exporter *) let set (exp : t) : unit = (* sanity check! this specific exporter would just call itself, leading to