feat batch: proper closing of wrap_emitter; default batch=100

This commit is contained in:
Simon Cruanes 2025-12-08 11:24:55 -05:00
parent eeae5bf41c
commit 25afa2085c
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 36 additions and 28 deletions

View file

@ -1,6 +1,4 @@
open Opentelemetry_util open Opentelemetry_atomic
module Otel = Opentelemetry
module A = Opentelemetry_atomic.Atomic
module Domain = Opentelemetry_domain module Domain = Opentelemetry_domain
type 'a state = { type 'a state = {
@ -10,23 +8,19 @@ type 'a state = {
} }
type 'a t = { type 'a t = {
st: 'a state A.t; st: 'a state Atomic.t;
batch: int; (** Minimum size to batch before popping *) batch: int; (** Minimum size to batch before popping *)
high_watermark: int; (** Size above which we start dropping signals *) high_watermark: int; (** Size above which we start dropping signals *)
timeout: Mtime.span option; timeout: Mtime.span option;
} }
let default_high_watermark batch_size = let default_high_watermark batch_size = min 10 (max (batch_size * 10) 1_000_000)
if batch_size = 1 then
100
else
batch_size * 10
let _dummy_start = Mtime.min_stamp let _dummy_start = Mtime.min_stamp
let _empty_state : _ state = { q = []; size = 0; start = _dummy_start } let _empty_state : _ state = { q = []; size = 0; start = _dummy_start }
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t =
let high_watermark = let high_watermark =
match high_watermark with match high_watermark with
| Some x -> x | Some x -> x
@ -38,7 +32,12 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
| None -> _dummy_start | None -> _dummy_start
in in
assert (batch > 0); assert (batch > 0);
{ st = A.make { size = 0; q = []; start }; batch; timeout; high_watermark } {
st = Atomic.make @@ { size = 0; q = []; start };
batch;
timeout;
high_watermark;
}
let timeout_expired_ ~now ~timeout (self : _ state) : bool = let timeout_expired_ ~now ~timeout (self : _ state) : bool =
match timeout with match timeout with
@ -47,7 +46,7 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool =
Mtime.Span.compare elapsed t >= 0 Mtime.Span.compare elapsed t >= 0
| None -> false | None -> false
(* Big enough to send a batch *) (** Big enough to send? *)
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let[@inline] atomic_update_loop_ (type res) (self : _ t) let[@inline] atomic_update_loop_ (type res) (self : _ t)
@ -56,9 +55,10 @@ let[@inline] atomic_update_loop_ (type res) (self : _ t)
try try
let backoff = ref 1 in let backoff = ref 1 in
while true do while true do
let st = A.get self.st in let st = Atomic.get self.st in
let new_st, res = f st in let new_st, res = f st in
if A.compare_and_set self.st st new_st then raise_notrace (Return res); if Atomic.compare_and_set self.st st new_st then
raise_notrace (Return res);
(* poor man's backoff strategy *) (* poor man's backoff strategy *)
Domain.relax_loop !backoff; Domain.relax_loop !backoff;
@ -96,7 +96,7 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let push (self : _ t) elems : [ `Dropped | `Ok ] = let push (self : _ t) elems : [ `Dropped | `Ok ] =
if elems = [] then if elems = [] then
`Ok `Ok `Ok
else ( else (
let now = lazy (Mtime_clock.now ()) in let now = lazy (Mtime_clock.now ()) in
atomic_update_loop_ self @@ fun state -> atomic_update_loop_ self @@ fun state ->
@ -129,23 +129,32 @@ let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
open Opentelemetry_emitter open Opentelemetry_emitter
let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
(* we need to be able to close this emitter before we close [e]. This
will become [true] when we close, then we call [Emitter.flush_and_close e],
then [e] itself will be closed. *)
let closed_here = Atomic.make false in
let enabled () = e.enabled () in let enabled () = e.enabled () in
let closed () = e.closed () in let closed () = e.closed () in
let flush_and_close () = let flush_and_close () =
(* FIXME: we need to close the batch first, to prevent if not (Atomic.exchange closed_here true) then (
further pushes; then write the content to [e]; then (* NOTE: we need to close this wrapping emitter first, to prevent
further pushes; then write the content to [e]; then
flusn and close [e]. In this order. *) flusn and close [e]. In this order. *)
(match pop_if_ready self ~force:true ~now:Mtime.max_stamp with (match pop_if_ready self ~force:true ~now:Mtime.max_stamp with
| None -> () | None -> ()
| Some l -> Emitter.emit e l); | Some l -> Emitter.emit e l);
Emitter.flush_and_close e Emitter.flush_and_close e
)
in in
let maybe_emit ~now = let maybe_emit ~now =
match pop_if_ready self ~force:false ~now with if not (Atomic.get closed_here) then (
| None -> () match pop_if_ready self ~force:false ~now with
| Some l -> Emitter.emit e l | None -> ()
| Some l -> Emitter.emit e l
)
in in
let tick ~now = let tick ~now =
@ -157,7 +166,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
in in
let emit l = let emit l =
if l <> [] && e.enabled () then ( if l <> [] && (not (Atomic.get closed_here)) && e.enabled () then (
push' self l; push' self l;
(* TODO: it'd be nice if we checked only for size here, not (* TODO: it'd be nice if we checked only for size here, not

View file

@ -1,4 +1,4 @@
(** A thread-safe batch of resources to be popper when ready . *) (** A thread-safe batch of resources, to be sent together when ready . *)
type 'a t type 'a t
@ -13,8 +13,7 @@ val make :
@param batch @param batch
the number of elements after which the batch will be considered {b full}, the number of elements after which the batch will be considered {b full},
and ready to pop. Set to [0] to disable batching. It is required that and ready to pop. It is required that [batch >= 0]. Default [100].
[batch >= 0]. Default [1].
@param high_watermark @param high_watermark
the batch size limit after which new elements will be [`Dropped] by the batch size limit after which new elements will be [`Dropped] by