From 41e650d461b453b4870c4abba0456e8d32229efa Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 10 Dec 2025 08:46:20 -0500 Subject: [PATCH] perf batch: try to improve hotpath in particular, no need to look at the clock when checking if the batch is full --- src/client/batch.ml | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 00029f74..e5f7c88f 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -46,7 +46,12 @@ let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = high_watermark; } +(** passed to ignore timeout *) +let mtime_dummy_ = Mtime.min_stamp + let timeout_expired_ ~now ~timeout (self : _ state) : bool = + now <> mtime_dummy_ + && match timeout with | Some t -> let elapsed = Mtime.span now self.start in @@ -56,7 +61,7 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool = (** Big enough to send? *) let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch -let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = +let pop_if_ready_ ~force ~now (self : _ t) : _ list option = let rev_batch_opt = (* update state. When uncontended this runs only once. *) Util_atomic.update_cas self.st @@ fun state -> @@ -84,6 +89,9 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = (* Reverse the list to retrieve the FIFO order. *) Some (List.rev batch) +let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + pop_if_ready_ ~force ~now self + let push (self : _ t) elems : [ `Dropped | `Ok ] = if elems = [] then `Ok @@ -119,6 +127,12 @@ let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) open Opentelemetry_emitter +(** Emit current batch, if the conditions are met *) +let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~now : unit = + match pop_if_ready self ~force:false ~now with + | None -> () + | Some l -> Emitter.emit e l + let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = (* we need to be able to close this emitter before we close [e]. This will become [true] when we close, then we call [Emitter.flush_and_close e], @@ -141,18 +155,10 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = ) in - let maybe_emit ~now = - if not (Atomic.get closed_here) then ( - match pop_if_ready self ~force:false ~now with - | None -> () - | Some l -> Emitter.emit e l - ) - in - let tick ~now = if not (Atomic.get closed_here) then ( (* first, check if batch has timed out *) - maybe_emit ~now; + maybe_emit_ self ~e ~now; (* only then, tick the underlying emitter *) Emitter.tick e ~now @@ -160,14 +166,13 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = in let emit l = - if l <> [] && (not (Atomic.get closed_here)) && e.enabled () then ( + if l <> [] && not (Atomic.get closed_here) then ( push' self l; - (* TODO: it'd be nice if we checked only for size here, not - for timeout. The [tick] function is enough for timeouts, - whereas [emit] is in the hot path of every single span/metric/log *) - let now = Mtime_clock.now () in - maybe_emit ~now + (* we only check for size here, not for timeout. The [tick] function is + enough for timeouts, whereas [emit] is in the hot path of every single + span/metric/log *) + maybe_emit_ self ~e ~now:mtime_dummy_ ) in