mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
perf batch: proper backoff strategy
This commit is contained in:
parent
f55775a55c
commit
eeae5bf41c
2 changed files with 33 additions and 21 deletions
|
|
@ -1,6 +1,7 @@
|
|||
open Opentelemetry_util
|
||||
module Otel = Opentelemetry
|
||||
module A = Opentelemetry_atomic.Atomic
|
||||
module Domain = Opentelemetry_domain
|
||||
|
||||
type 'a state = {
|
||||
start: Mtime.t;
|
||||
|
|
@ -23,6 +24,8 @@ let default_high_watermark batch_size =
|
|||
|
||||
let _dummy_start = Mtime.min_stamp
|
||||
|
||||
let _empty_state : _ state = { q = []; size = 0; start = _dummy_start }
|
||||
|
||||
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
|
||||
let high_watermark =
|
||||
match high_watermark with
|
||||
|
|
@ -47,30 +50,40 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool =
|
|||
(* Big enough to send a batch *)
|
||||
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
|
||||
|
||||
let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) =
|
||||
self.size > 0
|
||||
&& (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self)
|
||||
|
||||
let[@inline] atomic_update_loop_ (type res) (self : _ t)
|
||||
(f : 'a state -> 'a state * res) : res =
|
||||
let exception Return of res in
|
||||
try
|
||||
let backoff = ref 1 in
|
||||
while true do
|
||||
let st = A.get self.st in
|
||||
let new_st, res = f st in
|
||||
if A.compare_and_set self.st st new_st then raise_notrace (Return res)
|
||||
if A.compare_and_set self.st st new_st then raise_notrace (Return res);
|
||||
|
||||
(* poor man's backoff strategy *)
|
||||
Domain.relax_loop !backoff;
|
||||
backoff := min 128 (2 * !backoff)
|
||||
done
|
||||
with Return res -> res
|
||||
|
||||
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
||||
let rev_batch_opt =
|
||||
(* update state. When uncontended this runs only once. *)
|
||||
atomic_update_loop_ self @@ fun state ->
|
||||
let timeout = self.timeout in
|
||||
let batch = self.batch in
|
||||
if ready_to_pop_ ~force ~now ~batch ~timeout state then (
|
||||
(* *)
|
||||
|
||||
(* check if the batch is ready *)
|
||||
let ready_to_pop =
|
||||
state.size > 0
|
||||
&& (force
|
||||
|| is_full_ ~batch:self.batch state
|
||||
|| timeout_expired_ ~now ~timeout:self.timeout state)
|
||||
in
|
||||
|
||||
if ready_to_pop then (
|
||||
assert (state.q <> []);
|
||||
let batch = state.q in
|
||||
let new_st = { q = []; size = 0; start = _dummy_start } in
|
||||
let new_st = _empty_state in
|
||||
new_st, Some batch
|
||||
) else
|
||||
state, None
|
||||
|
|
@ -81,13 +94,6 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
|||
(* Reverse the list to retrieve the FIFO order. *)
|
||||
Some (List.rev batch)
|
||||
|
||||
let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state =
|
||||
{
|
||||
self with
|
||||
size = self.size + List.length elems;
|
||||
q = List.rev_append elems self.q;
|
||||
}
|
||||
|
||||
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
||||
if elems = [] then
|
||||
`Ok `Ok
|
||||
|
|
@ -98,16 +104,21 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
|||
(* drop this to prevent queue from growing too fast *)
|
||||
state, `Dropped
|
||||
else (
|
||||
let state =
|
||||
let start =
|
||||
if state.size = 0 && Option.is_some self.timeout then
|
||||
(* current batch starts now *)
|
||||
{ state with start = Lazy.force now }
|
||||
Lazy.force now
|
||||
else
|
||||
state
|
||||
state.start
|
||||
in
|
||||
|
||||
(* add to queue *)
|
||||
let state = push_unprotected_ state elems in
|
||||
let state =
|
||||
{
|
||||
size = state.size + List.length elems;
|
||||
q = List.rev_append elems state.q;
|
||||
start;
|
||||
}
|
||||
in
|
||||
|
||||
state, `Ok
|
||||
)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@
|
|||
opentelemetry.util
|
||||
opentelemetry.emitter
|
||||
opentelemetry.proto
|
||||
opentelemetry.domain
|
||||
pbrt
|
||||
saturn
|
||||
mtime
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue