perf batch: proper backoff strategy

This commit is contained in:
Simon Cruanes 2025-12-01 21:22:07 -05:00
parent 69bd89ebab
commit 107e173bde
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 33 additions and 21 deletions

View file

@ -1,6 +1,7 @@
open Opentelemetry_util open Opentelemetry_util
module Otel = Opentelemetry module Otel = Opentelemetry
module A = Opentelemetry_atomic.Atomic module A = Opentelemetry_atomic.Atomic
module Domain = Opentelemetry_domain
type 'a state = { type 'a state = {
start: Mtime.t; start: Mtime.t;
@ -23,6 +24,8 @@ let default_high_watermark batch_size =
let _dummy_start = Mtime.min_stamp let _dummy_start = Mtime.min_stamp
let _empty_state : _ state = { q = []; size = 0; start = _dummy_start }
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
let high_watermark = let high_watermark =
match high_watermark with match high_watermark with
@ -47,30 +50,40 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool =
(* Big enough to send a batch *) (* Big enough to send a batch *)
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) =
self.size > 0
&& (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self)
let[@inline] atomic_update_loop_ (type res) (self : _ t) let[@inline] atomic_update_loop_ (type res) (self : _ t)
(f : 'a state -> 'a state * res) : res = (f : 'a state -> 'a state * res) : res =
let exception Return of res in let exception Return of res in
try try
let backoff = ref 1 in
while true do while true do
let st = A.get self.st in let st = A.get self.st in
let new_st, res = f st in let new_st, res = f st in
if A.compare_and_set self.st st new_st then raise_notrace (Return res) if A.compare_and_set self.st st new_st then raise_notrace (Return res);
(* poor man's backoff strategy *)
Domain.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done done
with Return res -> res with Return res -> res
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let rev_batch_opt = let rev_batch_opt =
(* update state. When uncontended this runs only once. *)
atomic_update_loop_ self @@ fun state -> atomic_update_loop_ self @@ fun state ->
let timeout = self.timeout in (* *)
let batch = self.batch in
if ready_to_pop_ ~force ~now ~batch ~timeout state then ( (* check if the batch is ready *)
let ready_to_pop =
state.size > 0
&& (force
|| is_full_ ~batch:self.batch state
|| timeout_expired_ ~now ~timeout:self.timeout state)
in
if ready_to_pop then (
assert (state.q <> []); assert (state.q <> []);
let batch = state.q in let batch = state.q in
let new_st = { q = []; size = 0; start = _dummy_start } in let new_st = _empty_state in
new_st, Some batch new_st, Some batch
) else ) else
state, None state, None
@ -81,13 +94,6 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
(* Reverse the list to retrieve the FIFO order. *) (* Reverse the list to retrieve the FIFO order. *)
Some (List.rev batch) Some (List.rev batch)
let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state =
{
self with
size = self.size + List.length elems;
q = List.rev_append elems self.q;
}
let push (self : _ t) elems : [ `Dropped | `Ok ] = let push (self : _ t) elems : [ `Dropped | `Ok ] =
if elems = [] then if elems = [] then
`Ok `Ok `Ok `Ok
@ -98,16 +104,21 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
(* drop this to prevent queue from growing too fast *) (* drop this to prevent queue from growing too fast *)
state, `Dropped state, `Dropped
else ( else (
let state = let start =
if state.size = 0 && Option.is_some self.timeout then if state.size = 0 && Option.is_some self.timeout then
(* current batch starts now *) Lazy.force now
{ state with start = Lazy.force now }
else else
state state.start
in in
(* add to queue *) (* add to queue *)
let state = push_unprotected_ state elems in let state =
{
size = state.size + List.length elems;
q = List.rev_append elems state.q;
start;
}
in
state, `Ok state, `Ok
) )

View file

@ -7,6 +7,7 @@
opentelemetry.util opentelemetry.util
opentelemetry.emitter opentelemetry.emitter
opentelemetry.proto opentelemetry.proto
opentelemetry.domain
pbrt pbrt
saturn saturn
mtime mtime