mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-05-05 08:54:27 -04:00
bounded queue: provide a per-item measure function for better errors/metrics
we can now know how big the batches we drop are
This commit is contained in:
parent
2401745f1a
commit
5f3b162290
7 changed files with 26 additions and 7 deletions
|
|
@ -169,6 +169,7 @@ let create_exporter ?(config = Config.make ()) ~sw ~env () =
|
|||
let consumer = create_consumer ~config ~sw ~env () in
|
||||
let bq =
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~measure:Any_signal_l.length
|
||||
~high_watermark:Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer ()
|
||||
|
|
|
|||
|
|
@ -102,6 +102,7 @@ let create_exporter ?(config = Config.make ()) () =
|
|||
let consumer = create_consumer ~config () in
|
||||
let bq =
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~measure:OTEL.Any_signal_l.length
|
||||
~high_watermark:Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer ()
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ let create_exporter ?(config = Config.make ()) () =
|
|||
let consumer = create_consumer ~config () in
|
||||
let bq =
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~measure:OTEL.Any_signal_l.length
|
||||
~high_watermark:Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer ()
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t =
|
|||
let consumer = consumer ~config () in
|
||||
let bq =
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~measure:OTEL.Any_signal_l.length
|
||||
~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
|
||||
|
|
|
|||
|
|
@ -88,23 +88,27 @@ type 'a state = {
|
|||
high_watermark: int;
|
||||
q: 'a Q.t;
|
||||
on_non_empty: Cb_set.t;
|
||||
measure: 'a -> int;
|
||||
}
|
||||
|
||||
let measure_all_ measure xs = List.fold_left (fun acc x -> acc + measure x) 0 xs
|
||||
|
||||
let push (self : _ state) x =
|
||||
if x <> [] then (
|
||||
match
|
||||
Q.push_while_not_full self.q ~high_watermark:self.high_watermark x
|
||||
with
|
||||
| Closed ->
|
||||
ignore (Atomic.fetch_and_add self.n_discarded (List.length x) : int)
|
||||
let n = measure_all_ self.measure x in
|
||||
ignore (Atomic.fetch_and_add self.n_discarded n : int)
|
||||
| Pushed { num_discarded } ->
|
||||
if num_discarded > 0 then (
|
||||
let total = Atomic.fetch_and_add self.n_discarded num_discarded in
|
||||
let n_signals = measure_all_ self.measure x in
|
||||
let total = Atomic.fetch_and_add self.n_discarded n_signals in
|
||||
Opentelemetry.Self_debug.log Warning (fun () ->
|
||||
Printf.sprintf
|
||||
"otel: dropped %d signals (queue full: %d/%d, total dropped: %d)"
|
||||
num_discarded (Q.size self.q) self.high_watermark
|
||||
(total + num_discarded))
|
||||
n_signals (Q.size self.q) self.high_watermark (total + n_signals))
|
||||
);
|
||||
(* wake up potentially asleep consumers *)
|
||||
Cb_set.trigger self.on_non_empty
|
||||
|
|
@ -131,13 +135,14 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t =
|
|||
recv = { try_pop; on_non_empty; common };
|
||||
}
|
||||
|
||||
let create ~high_watermark () : _ BQ.t =
|
||||
let create ?(measure = fun _ -> 1) ~high_watermark () : _ BQ.t =
|
||||
let st =
|
||||
{
|
||||
high_watermark;
|
||||
q = Q.create ();
|
||||
n_discarded = Atomic.make 0;
|
||||
on_non_empty = Cb_set.create ();
|
||||
measure;
|
||||
}
|
||||
in
|
||||
to_bounded_queue st
|
||||
|
|
|
|||
|
|
@ -2,6 +2,11 @@
|
|||
|
||||
This is not the fastest queue but it should be versatile. *)
|
||||
|
||||
val create : high_watermark:int -> unit -> 'a Bounded_queue.t
|
||||
val create :
|
||||
?measure:('a -> int) -> high_watermark:int -> unit -> 'a Bounded_queue.t
|
||||
(** [create ~high_watermark ()] creates a new bounded queue based on
|
||||
{!Sync_queue} *)
|
||||
{!Sync_queue}.
|
||||
@param measure
|
||||
maps each item to its signal count (e.g. number of spans in a batch). Used
|
||||
to report accurate signal counts when items are dropped. Default:
|
||||
[fun _ -> 1]. *)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,11 @@ let pp out = function
|
|||
| Metrics m -> pp_list Proto.Metrics.pp_metric out m
|
||||
| Logs l -> pp_list Proto.Logs.pp_log_record out l
|
||||
|
||||
let length = function
|
||||
| Spans l -> List.length l
|
||||
| Metrics l -> List.length l
|
||||
| Logs l -> List.length l
|
||||
|
||||
let of_logs_or_empty = function
|
||||
| [] -> []
|
||||
| l -> [ Logs l ]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue