diff --git a/src/client/batch.ml b/src/client/batch.ml index e5f7c88f..40e17a22 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -18,13 +18,14 @@ let max_batch_size = 100_000 let default_high_watermark batch_size = max 10 (min (batch_size * 10) max_batch_size) -let _dummy_start = Mtime.min_stamp +(** passed to ignore timeout *) +let mtime_dummy_ = Mtime.min_stamp -let _empty_state : _ state = { q = []; size = 0; start = _dummy_start } +let _empty_state : _ state = { q = []; size = 0; start = mtime_dummy_ } let[@inline] cur_size (self : _ t) : int = (Atomic.get self.st).size -let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = +let make ?(batch = 100) ?high_watermark ?mtime ?timeout () : _ t = let batch = min batch max_batch_size in let high_watermark = match high_watermark with @@ -34,9 +35,9 @@ let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = assert (high_watermark >= batch); let start = - match now with + match mtime with | Some x -> x - | None -> _dummy_start + | None -> mtime_dummy_ in assert (batch > 0); { @@ -46,22 +47,19 @@ let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = high_watermark; } -(** passed to ignore timeout *) -let mtime_dummy_ = Mtime.min_stamp - -let timeout_expired_ ~now ~timeout (self : _ state) : bool = - now <> mtime_dummy_ +let timeout_expired_ ~mtime ~timeout (self : _ state) : bool = + mtime <> mtime_dummy_ && match timeout with | Some t -> - let elapsed = Mtime.span now self.start in + let elapsed = Mtime.span mtime self.start in Mtime.Span.compare elapsed t >= 0 | None -> false (** Big enough to send? *) let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch -let pop_if_ready_ ~force ~now (self : _ t) : _ list option = +let pop_if_ready_ ~force ~mtime (self : _ t) : _ list option = let rev_batch_opt = (* update state. When uncontended this runs only once. *) Util_atomic.update_cas self.st @@ fun state -> @@ -72,7 +70,7 @@ let pop_if_ready_ ~force ~now (self : _ t) : _ list option = state.size > 0 && (force || is_full_ ~batch:self.batch state - || timeout_expired_ ~now ~timeout:self.timeout state) + || timeout_expired_ ~mtime ~timeout:self.timeout state) in if ready_to_pop then ( @@ -89,8 +87,8 @@ let pop_if_ready_ ~force ~now (self : _ t) : _ list option = (* Reverse the list to retrieve the FIFO order. *) Some (List.rev batch) -let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - pop_if_ready_ ~force ~now self +let pop_if_ready ?(force = false) ~mtime (self : _ t) : _ list option = + pop_if_ready_ ~force ~mtime self let push (self : _ t) elems : [ `Dropped | `Ok ] = if elems = [] then @@ -128,8 +126,8 @@ let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) open Opentelemetry_emitter (** Emit current batch, if the conditions are met *) -let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~now : unit = - match pop_if_ready self ~force:false ~now with +let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~mtime : unit = + match pop_if_ready self ~force:false ~mtime with | None -> () | Some l -> Emitter.emit e l @@ -146,7 +144,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = (* NOTE: we need to close this wrapping emitter first, to prevent further pushes; then write the content to [e]; then flusn and close [e]. In this order. *) - (match pop_if_ready self ~force:true ~now:Mtime.max_stamp with + (match pop_if_ready self ~force:true ~mtime:mtime_dummy_ with | None -> () | Some l -> Emitter.emit e l); @@ -155,13 +153,13 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = ) in - let tick ~now = + let tick ~mtime = if not (Atomic.get closed_here) then ( (* first, check if batch has timed out *) - maybe_emit_ self ~e ~now; + maybe_emit_ self ~e ~mtime; (* only then, tick the underlying emitter *) - Emitter.tick e ~now + Emitter.tick e ~mtime ) in @@ -172,7 +170,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = (* we only check for size here, not for timeout. The [tick] function is enough for timeouts, whereas [emit] is in the hot path of every single span/metric/log *) - maybe_emit_ self ~e ~now:mtime_dummy_ + maybe_emit_ self ~e ~mtime:mtime_dummy_ ) in diff --git a/src/client/batch.mli b/src/client/batch.mli index 88a5dfc0..706f4167 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -5,7 +5,7 @@ type 'a t val make : ?batch:int -> ?high_watermark:int -> - ?now:Mtime.t -> + ?mtime:Mtime.t -> ?timeout:Mtime.span -> unit -> 'a t @@ -21,15 +21,15 @@ val make : transmission in case of signal floods. Default [if batch = 1 then 100 else batch * 10]. - @param now the current time. Default [Mtime_clock.now ()]. + @param mtime the current time. @param timeout the time span after which a batch is ready to pop, whether or not it is {b full}. *) -val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option -(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements - {!push}ed since the last batch, if the batch ready to be emitted. +val pop_if_ready : ?force:bool -> mtime:Mtime.t -> 'a t -> 'a list option +(** [pop_if_ready ~mtime b] is [Some xs], where is [xs] includes all the + elements {!push}ed since the last batch, if the batch ready to be emitted. A batch is ready to pop if it contains some elements and @@ -39,7 +39,7 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option the last pop was ready, or - the pop is [force]d, - @param now the current time + @param mtime the current monotonic time @param force override the other batch conditions, for when when we just want to emit