diff --git a/src/client/batch.ml b/src/client/batch.ml index ba22fe1f..ac74c3f7 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,6 +1,4 @@ -open Opentelemetry_util -module Otel = Opentelemetry -module A = Opentelemetry_atomic.Atomic +open Opentelemetry_atomic module Domain = Opentelemetry_domain type 'a state = { @@ -10,23 +8,19 @@ type 'a state = { } type 'a t = { - st: 'a state A.t; + st: 'a state Atomic.t; batch: int; (** Minimum size to batch before popping *) high_watermark: int; (** Size above which we start dropping signals *) timeout: Mtime.span option; } -let default_high_watermark batch_size = - if batch_size = 1 then - 100 - else - batch_size * 10 +let default_high_watermark batch_size = min 10 (max (batch_size * 10) 1_000_000) let _dummy_start = Mtime.min_stamp let _empty_state : _ state = { q = []; size = 0; start = _dummy_start } -let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = +let make ?(batch = 100) ?high_watermark ?now ?timeout () : _ t = let high_watermark = match high_watermark with | Some x -> x @@ -38,7 +32,12 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = | None -> _dummy_start in assert (batch > 0); - { st = A.make { size = 0; q = []; start }; batch; timeout; high_watermark } + { + st = Atomic.make @@ { size = 0; q = []; start }; + batch; + timeout; + high_watermark; + } let timeout_expired_ ~now ~timeout (self : _ state) : bool = match timeout with @@ -47,7 +46,7 @@ let timeout_expired_ ~now ~timeout (self : _ state) : bool = Mtime.Span.compare elapsed t >= 0 | None -> false -(* Big enough to send a batch *) +(** Big enough to send? *) let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch let[@inline] atomic_update_loop_ (type res) (self : _ t) @@ -56,9 +55,10 @@ let[@inline] atomic_update_loop_ (type res) (self : _ t) try let backoff = ref 1 in while true do - let st = A.get self.st in + let st = Atomic.get self.st in let new_st, res = f st in - if A.compare_and_set self.st st new_st then raise_notrace (Return res); + if Atomic.compare_and_set self.st st new_st then + raise_notrace (Return res); (* poor man's backoff strategy *) Domain.relax_loop !backoff; @@ -96,7 +96,7 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let push (self : _ t) elems : [ `Dropped | `Ok ] = if elems = [] then - `Ok `Ok + `Ok else ( let now = lazy (Mtime_clock.now ()) in atomic_update_loop_ self @@ fun state -> @@ -129,23 +129,32 @@ let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) open Opentelemetry_emitter let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = + (* we need to be able to close this emitter before we close [e]. This + will become [true] when we close, then we call [Emitter.flush_and_close e], + then [e] itself will be closed. *) + let closed_here = Atomic.make false in + let enabled () = e.enabled () in let closed () = e.closed () in let flush_and_close () = - (* FIXME: we need to close the batch first, to prevent - further pushes; then write the content to [e]; then + if not (Atomic.exchange closed_here true) then ( + (* NOTE: we need to close this wrapping emitter first, to prevent + further pushes; then write the content to [e]; then flusn and close [e]. In this order. *) - (match pop_if_ready self ~force:true ~now:Mtime.max_stamp with - | None -> () - | Some l -> Emitter.emit e l); + (match pop_if_ready self ~force:true ~now:Mtime.max_stamp with + | None -> () + | Some l -> Emitter.emit e l); - Emitter.flush_and_close e + Emitter.flush_and_close e + ) in let maybe_emit ~now = - match pop_if_ready self ~force:false ~now with - | None -> () - | Some l -> Emitter.emit e l + if not (Atomic.get closed_here) then ( + match pop_if_ready self ~force:false ~now with + | None -> () + | Some l -> Emitter.emit e l + ) in let tick ~now = @@ -157,7 +166,7 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = in let emit l = - if l <> [] && e.enabled () then ( + if l <> [] && (not (Atomic.get closed_here)) && e.enabled () then ( push' self l; (* TODO: it'd be nice if we checked only for size here, not diff --git a/src/client/batch.mli b/src/client/batch.mli index fa64083b..56256851 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -1,4 +1,4 @@ -(** A thread-safe batch of resources to be popper when ready . *) +(** A thread-safe batch of resources, to be sent together when ready . *) type 'a t @@ -13,8 +13,7 @@ val make : @param batch the number of elements after which the batch will be considered {b full}, - and ready to pop. Set to [0] to disable batching. It is required that - [batch >= 0]. Default [1]. + and ready to pop. It is required that [batch >= 0]. Default [100]. @param high_watermark the batch size limit after which new elements will be [`Dropped] by