batch: change ~now to ~mtime

This commit is contained in:
Simon Cruanes 2025-12-17 11:17:02 -05:00
parent 061d2adc68
commit e4063e082e
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 26 additions and 28 deletions

View file

@ -18,13 +18,14 @@ let max_batch_size = 100_000
let default_high_watermark batch_size = let default_high_watermark batch_size =
max 10 (min (batch_size * 10) max_batch_size) max 10 (min (batch_size * 10) max_batch_size)
let _dummy_start = Mtime.min_stamp (** passed to ignore timeout *)
let mtime_dummy_ = Mtime.min_stamp
let _empty_state : _ state = { q = []; size = 0; start = _dummy_start } let _empty_state : _ state = { q = []; size = 0; start = mtime_dummy_ }
let[@inline] cur_size (self : _ t) : int = (Atomic.get self.st).size let[@inline] cur_size (self : _ t) : int = (Atomic.get self.st).size
let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = let make ?(batch = 100) ?high_watermark ?mtime ?timeout () : _ t =
let batch = min batch max_batch_size in let batch = min batch max_batch_size in
let high_watermark = let high_watermark =
match high_watermark with match high_watermark with
@ -34,9 +35,9 @@ let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t =
assert (high_watermark >= batch); assert (high_watermark >= batch);
let start = let start =
match now with match mtime with
| Some x -> x | Some x -> x
| None -> _dummy_start | None -> mtime_dummy_
in in
assert (batch > 0); assert (batch > 0);
{ {
@ -46,22 +47,19 @@ let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t =
high_watermark; high_watermark;
} }
(** passed to ignore timeout *) let timeout_expired_ ~mtime ~timeout (self : _ state) : bool =
let mtime_dummy_ = Mtime.min_stamp mtime <> mtime_dummy_
let timeout_expired_ ~now ~timeout (self : _ state) : bool =
now <> mtime_dummy_
&& &&
match timeout with match timeout with
| Some t -> | Some t ->
let elapsed = Mtime.span now self.start in let elapsed = Mtime.span mtime self.start in
Mtime.Span.compare elapsed t >= 0 Mtime.Span.compare elapsed t >= 0
| None -> false | None -> false
(** Big enough to send? *) (** Big enough to send? *)
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let pop_if_ready_ ~force ~now (self : _ t) : _ list option = let pop_if_ready_ ~force ~mtime (self : _ t) : _ list option =
let rev_batch_opt = let rev_batch_opt =
(* update state. When uncontended this runs only once. *) (* update state. When uncontended this runs only once. *)
Util_atomic.update_cas self.st @@ fun state -> Util_atomic.update_cas self.st @@ fun state ->
@ -72,7 +70,7 @@ let pop_if_ready_ ~force ~now (self : _ t) : _ list option =
state.size > 0 state.size > 0
&& (force && (force
|| is_full_ ~batch:self.batch state || is_full_ ~batch:self.batch state
|| timeout_expired_ ~now ~timeout:self.timeout state) || timeout_expired_ ~mtime ~timeout:self.timeout state)
in in
if ready_to_pop then ( if ready_to_pop then (
@ -89,8 +87,8 @@ let pop_if_ready_ ~force ~now (self : _ t) : _ list option =
(* Reverse the list to retrieve the FIFO order. *) (* Reverse the list to retrieve the FIFO order. *)
Some (List.rev batch) Some (List.rev batch)
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready ?(force = false) ~mtime (self : _ t) : _ list option =
pop_if_ready_ ~force ~now self pop_if_ready_ ~force ~mtime self
let push (self : _ t) elems : [ `Dropped | `Ok ] = let push (self : _ t) elems : [ `Dropped | `Ok ] =
if elems = [] then if elems = [] then
@ -128,8 +126,8 @@ let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
open Opentelemetry_emitter open Opentelemetry_emitter
(** Emit current batch, if the conditions are met *) (** Emit current batch, if the conditions are met *)
let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~now : unit = let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~mtime : unit =
match pop_if_ready self ~force:false ~now with match pop_if_ready self ~force:false ~mtime with
| None -> () | None -> ()
| Some l -> Emitter.emit e l | Some l -> Emitter.emit e l
@ -146,7 +144,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
(* NOTE: we need to close this wrapping emitter first, to prevent (* NOTE: we need to close this wrapping emitter first, to prevent
further pushes; then write the content to [e]; then further pushes; then write the content to [e]; then
flusn and close [e]. In this order. *) flusn and close [e]. In this order. *)
(match pop_if_ready self ~force:true ~now:Mtime.max_stamp with (match pop_if_ready self ~force:true ~mtime:mtime_dummy_ with
| None -> () | None -> ()
| Some l -> Emitter.emit e l); | Some l -> Emitter.emit e l);
@ -155,13 +153,13 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
) )
in in
let tick ~now = let tick ~mtime =
if not (Atomic.get closed_here) then ( if not (Atomic.get closed_here) then (
(* first, check if batch has timed out *) (* first, check if batch has timed out *)
maybe_emit_ self ~e ~now; maybe_emit_ self ~e ~mtime;
(* only then, tick the underlying emitter *) (* only then, tick the underlying emitter *)
Emitter.tick e ~now Emitter.tick e ~mtime
) )
in in
@ -172,7 +170,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
(* we only check for size here, not for timeout. The [tick] function is (* we only check for size here, not for timeout. The [tick] function is
enough for timeouts, whereas [emit] is in the hot path of every single enough for timeouts, whereas [emit] is in the hot path of every single
span/metric/log *) span/metric/log *)
maybe_emit_ self ~e ~now:mtime_dummy_ maybe_emit_ self ~e ~mtime:mtime_dummy_
) )
in in

View file

@ -5,7 +5,7 @@ type 'a t
val make : val make :
?batch:int -> ?batch:int ->
?high_watermark:int -> ?high_watermark:int ->
?now:Mtime.t -> ?mtime:Mtime.t ->
?timeout:Mtime.span -> ?timeout:Mtime.span ->
unit -> unit ->
'a t 'a t
@ -21,15 +21,15 @@ val make :
transmission in case of signal floods. Default transmission in case of signal floods. Default
[if batch = 1 then 100 else batch * 10]. [if batch = 1 then 100 else batch * 10].
@param now the current time. Default [Mtime_clock.now ()]. @param mtime the current time.
@param timeout @param timeout
the time span after which a batch is ready to pop, whether or not it is the time span after which a batch is ready to pop, whether or not it is
{b full}. *) {b full}. *)
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option val pop_if_ready : ?force:bool -> mtime:Mtime.t -> 'a t -> 'a list option
(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements (** [pop_if_ready ~mtime b] is [Some xs], where is [xs] includes all the
{!push}ed since the last batch, if the batch ready to be emitted. elements {!push}ed since the last batch, if the batch ready to be emitted.
A batch is ready to pop if it contains some elements and A batch is ready to pop if it contains some elements and
@ -39,7 +39,7 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
the last pop was ready, or the last pop was ready, or
- the pop is [force]d, - the pop is [force]d,
@param now the current time @param mtime the current monotonic time
@param force @param force
override the other batch conditions, for when when we just want to emit override the other batch conditions, for when when we just want to emit