perf batch: try to improve hotpath

in particular, no need to look at the clock when checking if the batch
is full
This commit is contained in:
Simon Cruanes 2025-12-10 08:46:20 -05:00
parent 370c2a78d0
commit 41e650d461
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -46,7 +46,12 @@ let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t =
high_watermark; high_watermark;
} }
(** passed to ignore timeout *)
let mtime_dummy_ = Mtime.min_stamp
let timeout_expired_ ~now ~timeout (self : _ state) : bool = let timeout_expired_ ~now ~timeout (self : _ state) : bool =
now <> mtime_dummy_
&&
match timeout with match timeout with
| Some t -> | Some t ->
let elapsed = Mtime.span now self.start in let elapsed = Mtime.span now self.start in
@ -56,7 +61,7 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool =
(** Big enough to send? *) (** Big enough to send? *)
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready_ ~force ~now (self : _ t) : _ list option =
let rev_batch_opt = let rev_batch_opt =
(* update state. When uncontended this runs only once. *) (* update state. When uncontended this runs only once. *)
Util_atomic.update_cas self.st @@ fun state -> Util_atomic.update_cas self.st @@ fun state ->
@ -84,6 +89,9 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
(* Reverse the list to retrieve the FIFO order. *) (* Reverse the list to retrieve the FIFO order. *)
Some (List.rev batch) Some (List.rev batch)
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
pop_if_ready_ ~force ~now self
let push (self : _ t) elems : [ `Dropped | `Ok ] = let push (self : _ t) elems : [ `Dropped | `Ok ] =
if elems = [] then if elems = [] then
`Ok `Ok
@ -119,6 +127,12 @@ let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
open Opentelemetry_emitter open Opentelemetry_emitter
(** Emit current batch, if the conditions are met *)
let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~now : unit =
match pop_if_ready self ~force:false ~now with
| None -> ()
| Some l -> Emitter.emit e l
let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
(* we need to be able to close this emitter before we close [e]. This (* we need to be able to close this emitter before we close [e]. This
will become [true] when we close, then we call [Emitter.flush_and_close e], will become [true] when we close, then we call [Emitter.flush_and_close e],
@ -141,18 +155,10 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
) )
in in
let maybe_emit ~now =
if not (Atomic.get closed_here) then (
match pop_if_ready self ~force:false ~now with
| None -> ()
| Some l -> Emitter.emit e l
)
in
let tick ~now = let tick ~now =
if not (Atomic.get closed_here) then ( if not (Atomic.get closed_here) then (
(* first, check if batch has timed out *) (* first, check if batch has timed out *)
maybe_emit ~now; maybe_emit_ self ~e ~now;
(* only then, tick the underlying emitter *) (* only then, tick the underlying emitter *)
Emitter.tick e ~now Emitter.tick e ~now
@ -160,14 +166,13 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
in in
let emit l = let emit l =
if l <> [] && (not (Atomic.get closed_here)) && e.enabled () then ( if l <> [] && not (Atomic.get closed_here) then (
push' self l; push' self l;
(* TODO: it'd be nice if we checked only for size here, not (* we only check for size here, not for timeout. The [tick] function is
for timeout. The [tick] function is enough for timeouts, enough for timeouts, whereas [emit] is in the hot path of every single
whereas [emit] is in the hot path of every single span/metric/log *) span/metric/log *)
let now = Mtime_clock.now () in maybe_emit_ self ~e ~now:mtime_dummy_
maybe_emit ~now
) )
in in