diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 6d7671c2..4591c2eb 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -170,12 +170,14 @@ end = struct mutable size: int; mutable q: 'a list; batch: int option; + high_watermark: int; timeout: Mtime.span option; mutable start: Mtime.t; } let make ?batch ?timeout () : _ t = Option.iter (fun b -> assert (b > 0)) batch; + let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in { lock = Mutex.create (); size = 0; @@ -183,6 +185,7 @@ end = struct q = []; batch; timeout; + high_watermark; } let is_empty_ self = self.size = 0 @@ -219,11 +222,19 @@ end = struct let push (self : _ t) x : bool = let@ () = with_mutex_ self.lock in if self.size = 0 && Option.is_some self.timeout then + (* current batch starts now *) self.start <- Mtime_clock.now (); - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready + if self.size >= self.high_watermark then ( + (* drop this to prevent queue from growing too fast *) + Atomic.incr n_dropped; + true + ) else ( + (* add to queue *) + self.size <- 1 + self.size; + self.q <- x :: self.q; + let ready = is_full_ self in + ready + ) let push' self x = ignore (push self x : bool) end