From eeae5bf41cfa6ecdc772ab8bddc463d205c2e59c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 1 Dec 2025 21:22:07 -0500 Subject: [PATCH] perf batch: proper backoff strategy --- src/client/batch.ml | 53 +++++++++++++++++++++++++++------------------ src/client/dune | 1 + 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index f57f34e6..ba22fe1f 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,6 +1,7 @@ open Opentelemetry_util module Otel = Opentelemetry module A = Opentelemetry_atomic.Atomic +module Domain = Opentelemetry_domain type 'a state = { start: Mtime.t; @@ -23,6 +24,8 @@ let default_high_watermark batch_size = let _dummy_start = Mtime.min_stamp +let _empty_state : _ state = { q = []; size = 0; start = _dummy_start } + let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let high_watermark = match high_watermark with @@ -47,30 +50,40 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool = (* Big enough to send a batch *) let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch -let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) = - self.size > 0 - && (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self) - let[@inline] atomic_update_loop_ (type res) (self : _ t) (f : 'a state -> 'a state * res) : res = let exception Return of res in try + let backoff = ref 1 in while true do let st = A.get self.st in let new_st, res = f st in - if A.compare_and_set self.st st new_st then raise_notrace (Return res) + if A.compare_and_set self.st st new_st then raise_notrace (Return res); + + (* poor man's backoff strategy *) + Domain.relax_loop !backoff; + backoff := min 128 (2 * !backoff) done with Return res -> res let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let rev_batch_opt = + (* update state. When uncontended this runs only once. *) atomic_update_loop_ self @@ fun state -> - let timeout = self.timeout in - let batch = self.batch in - if ready_to_pop_ ~force ~now ~batch ~timeout state then ( + (* *) + + (* check if the batch is ready *) + let ready_to_pop = + state.size > 0 + && (force + || is_full_ ~batch:self.batch state + || timeout_expired_ ~now ~timeout:self.timeout state) + in + + if ready_to_pop then ( assert (state.q <> []); let batch = state.q in - let new_st = { q = []; size = 0; start = _dummy_start } in + let new_st = _empty_state in new_st, Some batch ) else state, None @@ -81,13 +94,6 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = (* Reverse the list to retrieve the FIFO order. *) Some (List.rev batch) -let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state = - { - self with - size = self.size + List.length elems; - q = List.rev_append elems self.q; - } - let push (self : _ t) elems : [ `Dropped | `Ok ] = if elems = [] then `Ok `Ok @@ -98,16 +104,21 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = (* drop this to prevent queue from growing too fast *) state, `Dropped else ( - let state = + let start = if state.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - { state with start = Lazy.force now } + Lazy.force now else - state + state.start in (* add to queue *) - let state = push_unprotected_ state elems in + let state = + { + size = state.size + List.length elems; + q = List.rev_append elems state.q; + start; + } + in state, `Ok ) diff --git a/src/client/dune b/src/client/dune index 29ad29d7..5b3f6fde 100644 --- a/src/client/dune +++ b/src/client/dune @@ -7,6 +7,7 @@ opentelemetry.util opentelemetry.emitter opentelemetry.proto + opentelemetry.domain pbrt saturn mtime