perf batch: proper backoff strategy

This commit is contained in:
Simon Cruanes 2025-12-01 21:22:07 -05:00
parent af3269e047
commit db2b2b8a31
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 33 additions and 22 deletions

View file

@ -1,4 +1,5 @@
module A = Opentelemetry_atomic.Atomic module A = Opentelemetry_atomic.Atomic
module Domain = Opentelemetry_domain
type 'a state = { type 'a state = {
start: Mtime.t; start: Mtime.t;
@ -21,6 +22,8 @@ let default_high_watermark batch_size =
let _dummy_start = Mtime.min_stamp let _dummy_start = Mtime.min_stamp
let _empty_state : _ state = { q = []; size = 0; start = _dummy_start }
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
let high_watermark = let high_watermark =
match high_watermark with match high_watermark with
@ -45,30 +48,40 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool =
(* Big enough to send a batch *) (* Big enough to send a batch *)
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) =
self.size > 0
&& (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self)
let[@inline] atomic_update_loop_ (type res) (self : _ t) let[@inline] atomic_update_loop_ (type res) (self : _ t)
(f : 'a state -> 'a state * res) : res = (f : 'a state -> 'a state * res) : res =
let exception Return of res in let exception Return of res in
try try
let backoff = ref 1 in
while true do while true do
let st = A.get self.st in let st = A.get self.st in
let new_st, res = f st in let new_st, res = f st in
if A.compare_and_set self.st st new_st then raise_notrace (Return res) if A.compare_and_set self.st st new_st then raise_notrace (Return res);
(* poor man's backoff strategy *)
Domain.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done done
with Return res -> res with Return res -> res
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let rev_batch_opt = let rev_batch_opt =
(* update state. When uncontended this runs only once. *)
atomic_update_loop_ self @@ fun state -> atomic_update_loop_ self @@ fun state ->
let timeout = self.timeout in (* *)
let batch = self.batch in
if ready_to_pop_ ~force ~now ~batch ~timeout state then ( (* check if the batch is ready *)
let ready_to_pop =
state.size > 0
&& (force
|| is_full_ ~batch:self.batch state
|| timeout_expired_ ~now ~timeout:self.timeout state)
in
if ready_to_pop then (
assert (state.q <> []); assert (state.q <> []);
let batch = state.q in let batch = state.q in
let new_st = { q = []; size = 0; start = _dummy_start } in let new_st = _empty_state in
new_st, Some batch new_st, Some batch
) else ) else
state, None state, None
@ -79,13 +92,6 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
(* Reverse the list to retrieve the FIFO order. *) (* Reverse the list to retrieve the FIFO order. *)
Some (List.rev batch) Some (List.rev batch)
let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state =
{
self with
size = self.size + List.length elems;
q = List.rev_append elems self.q;
}
let push (self : _ t) elems : [ `Dropped | `Ok ] = let push (self : _ t) elems : [ `Dropped | `Ok ] =
if elems = [] then if elems = [] then
`Ok `Ok
@ -96,16 +102,21 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
(* drop this to prevent queue from growing too fast *) (* drop this to prevent queue from growing too fast *)
state, `Dropped state, `Dropped
else ( else (
let state = let start =
if state.size = 0 && Option.is_some self.timeout then if state.size = 0 && Option.is_some self.timeout then
(* current batch starts now *) Lazy.force now
{ state with start = Lazy.force now }
else else
state state.start
in in
(* add to queue *) (* add to queue *)
let state = push_unprotected_ state elems in let state =
{
size = state.size + List.length elems;
q = List.rev_append elems state.q;
start;
}
in
state, `Ok state, `Ok
) )

View file

@ -1,5 +1,5 @@
(library (library
(name opentelemetry_client) (name opentelemetry_client)
(public_name opentelemetry.client) (public_name opentelemetry.client)
(libraries opentelemetry pbrt mtime mtime.clock.os) (libraries opentelemetry opentelemetry.domain pbrt mtime mtime.clock.os)
(synopsis "Common types and logic shared between client implementations")) (synopsis "Common types and logic shared between client implementations"))