From a00d4d238370d8d9af4ce7e6d0e752c99f3c4094 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:06:38 -0400 Subject: [PATCH] collector: if queue is full, drop item, and wakeup thread --- src/client/opentelemetry_client_ocurl.ml | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 8a241b1f..a907735a 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -155,7 +155,7 @@ module FQueue : sig type 'a t val create : dummy:'a -> int -> 'a t val size : _ t -> int - val push : 'a t -> 'a -> unit + val push : 'a t -> 'a -> bool (* true iff it could write element *) val pop_iter_all : 'a t -> ('a -> unit) -> unit end = struct type 'a t = { @@ -170,11 +170,15 @@ end = struct } let[@inline] size self = self.i + let[@inline] is_full self = self.i = Array.length self.arr - let push (self:_ t) x : unit = - assert (self.i < Array.length self.arr); - self.arr.(self.i) <- x; - self.i <- 1 + self.i + let push (self:_ t) x : bool = + if is_full self then false + else ( + self.arr.(self.i) <- x; + self.i <- 1 + self.i; + true + ) let pop_iter_all (self: _ t) f = for j=0 to self.i-1 do @@ -251,15 +255,14 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb - (module M : PUSH with type elt = a) | Some n -> - let q = FQueue.create ~dummy:(Obj.magic 0) (2 * n) in + let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in let module M = struct type elt = a let is_empty () = FQueue.size q = 0 let is_big_enough () = FQueue.size q >= n let push x = - FQueue.push q x; - if FQueue.size q > n then ( - !on_full() + if not (FQueue.push q x) || FQueue.size q > n then ( + !on_full(); (* drop *) ) let pop_iter_all f = FQueue.pop_iter_all q f end in @@ -374,7 +377,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let wakeup () = last_wakeup := Mtime_clock.now(); - with_mutex_ m (fun () -> Condition.signal cond) + with_mutex_ m (fun () -> Condition.signal cond); + Thread.yield() in (* wake up if a batch is full *)