From 5f3b16229095768f39abdd012726ff4826f613a0 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 10 Apr 2026 15:07:15 -0400 Subject: [PATCH] bounded queue: provide a per-item measure function for better errors/metrics we can now know how big the batches we drop are --- .../opentelemetry_client_cohttp_eio.ml | 1 + .../opentelemetry_client_cohttp_lwt.ml | 1 + .../opentelemetry_client_ocurl_lwt.ml | 1 + src/client-ocurl/opentelemetry_client_ocurl.ml | 1 + src/client/sync/bounded_queue_sync.ml | 15 ++++++++++----- src/client/sync/bounded_queue_sync.mli | 9 +++++++-- src/core/any_signal_l.ml | 5 +++++ 7 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index fb3b8860..180bdea5 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -169,6 +169,7 @@ let create_exporter ?(config = Config.make ()) ~sw ~env () = let consumer = create_consumer ~config ~sw ~env () in let bq = Opentelemetry_client_sync.Bounded_queue_sync.create + ~measure:Any_signal_l.length ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index ea964cf6..c59c8966 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -102,6 +102,7 @@ let create_exporter ?(config = Config.make ()) () = let consumer = create_consumer ~config () in let bq = Opentelemetry_client_sync.Bounded_queue_sync.create + ~measure:OTEL.Any_signal_l.length ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 2a8c0cd3..a7e7b5d9 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -79,6 +79,7 @@ let create_exporter ?(config = Config.make ()) () = let consumer = create_consumer ~config () in let bq = Opentelemetry_client_sync.Bounded_queue_sync.create + ~measure:OTEL.Any_signal_l.length ~high_watermark:Bounded_queue.Defaults.high_watermark () in Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer () diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 0ded3037..025a6a92 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -81,6 +81,7 @@ let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t = let consumer = consumer ~config () in let bq = Opentelemetry_client_sync.Bounded_queue_sync.create + ~measure:OTEL.Any_signal_l.length ~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark () in diff --git a/src/client/sync/bounded_queue_sync.ml b/src/client/sync/bounded_queue_sync.ml index 34c348d8..8b9edf79 100644 --- a/src/client/sync/bounded_queue_sync.ml +++ b/src/client/sync/bounded_queue_sync.ml @@ -88,23 +88,27 @@ type 'a state = { high_watermark: int; q: 'a Q.t; on_non_empty: Cb_set.t; + measure: 'a -> int; } +let measure_all_ measure xs = List.fold_left (fun acc x -> acc + measure x) 0 xs + let push (self : _ state) x = if x <> [] then ( match Q.push_while_not_full self.q ~high_watermark:self.high_watermark x with | Closed -> - ignore (Atomic.fetch_and_add self.n_discarded (List.length x) : int) + let n = measure_all_ self.measure x in + ignore (Atomic.fetch_and_add self.n_discarded n : int) | Pushed { num_discarded } -> if num_discarded > 0 then ( - let total = Atomic.fetch_and_add self.n_discarded num_discarded in + let n_signals = measure_all_ self.measure x in + let total = Atomic.fetch_and_add self.n_discarded n_signals in Opentelemetry.Self_debug.log Warning (fun () -> Printf.sprintf "otel: dropped %d signals (queue full: %d/%d, total dropped: %d)" - num_discarded (Q.size self.q) self.high_watermark - (total + num_discarded)) + n_signals (Q.size self.q) self.high_watermark (total + n_signals)) ); (* wake up potentially asleep consumers *) Cb_set.trigger self.on_non_empty @@ -131,13 +135,14 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t = recv = { try_pop; on_non_empty; common }; } -let create ~high_watermark () : _ BQ.t = +let create ?(measure = fun _ -> 1) ~high_watermark () : _ BQ.t = let st = { high_watermark; q = Q.create (); n_discarded = Atomic.make 0; on_non_empty = Cb_set.create (); + measure; } in to_bounded_queue st diff --git a/src/client/sync/bounded_queue_sync.mli b/src/client/sync/bounded_queue_sync.mli index d3cf6347..de1a5377 100644 --- a/src/client/sync/bounded_queue_sync.mli +++ b/src/client/sync/bounded_queue_sync.mli @@ -2,6 +2,11 @@ This is not the fastest queue but it should be versatile. *) -val create : high_watermark:int -> unit -> 'a Bounded_queue.t +val create : + ?measure:('a -> int) -> high_watermark:int -> unit -> 'a Bounded_queue.t (** [create ~high_watermark ()] creates a new bounded queue based on - {!Sync_queue} *) + {!Sync_queue}. + @param measure + maps each item to its signal count (e.g. number of spans in a batch). Used + to report accurate signal counts when items are dropped. Default: + [fun _ -> 1]. *) diff --git a/src/core/any_signal_l.ml b/src/core/any_signal_l.ml index a8309471..7406c355 100644 --- a/src/core/any_signal_l.ml +++ b/src/core/any_signal_l.ml @@ -19,6 +19,11 @@ let pp out = function | Metrics m -> pp_list Proto.Metrics.pp_metric out m | Logs l -> pp_list Proto.Logs.pp_log_record out l +let length = function + | Spans l -> List.length l + | Metrics l -> List.length l + | Logs l -> List.length l + let of_logs_or_empty = function | [] -> [] | l -> [ Logs l ]