diff --git a/src/client/batch.ml b/src/client/batch.ml index 20343630..f57f34e6 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,14 +1,18 @@ open Opentelemetry_util +module Otel = Opentelemetry +module A = Opentelemetry_atomic.Atomic + +type 'a state = { + start: Mtime.t; + size: int; + q: 'a list; (** The queue is a FIFO represented as a list in reverse order *) +} type 'a t = { - mutable size: int; - mutable q: 'a list; - (** The queue is a FIFO represented as a list in reverse order *) + st: 'a state A.t; batch: int; (** Minimum size to batch before popping *) high_watermark: int; (** Size above which we start dropping signals *) timeout: Mtime.span option; - mutable start: Mtime.t; - mutex: Mutex.t; } let default_high_watermark batch_size = @@ -17,6 +21,8 @@ let default_high_watermark batch_size = else batch_size * 10 +let _dummy_start = Mtime.min_stamp + let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let high_watermark = match high_watermark with @@ -26,36 +32,48 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let start = match now with | Some x -> x - | None -> Mtime_clock.now () + | None -> _dummy_start in - let mutex = Mutex.create () in assert (batch > 0); - { size = 0; q = []; start; batch; timeout; high_watermark; mutex } + { st = A.make { size = 0; q = []; start }; batch; timeout; high_watermark } -let timeout_expired_ ~now self : bool = - match self.timeout with +let timeout_expired_ ~now ~timeout (self : _ state) : bool = + match timeout with | Some t -> let elapsed = Mtime.span now self.start in Mtime.Span.compare elapsed t >= 0 | None -> false (* Big enough to send a batch *) -let is_full_ self : bool = self.size >= self.batch +let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch -let ready_to_pop ~force ~now self = - self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) +let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) = + self.size > 0 + && (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self) + +let[@inline] atomic_update_loop_ (type res) (self : _ t) + (f : 'a state -> 'a state * res) : res = + let exception Return of res in + try + while true do + let st = A.get self.st in + let new_st, res = f st in + if A.compare_and_set self.st st new_st then raise_notrace (Return res) + done + with Return res -> res let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let rev_batch_opt = - Util_mutex.protect self.mutex @@ fun () -> - if ready_to_pop ~force ~now self then ( - assert (self.q <> []); - let batch = self.q in - self.q <- []; - self.size <- 0; - Some batch + atomic_update_loop_ self @@ fun state -> + let timeout = self.timeout in + let batch = self.batch in + if ready_to_pop_ ~force ~now ~batch ~timeout state then ( + assert (state.q <> []); + let batch = state.q in + let new_st = { q = []; size = 0; start = _dummy_start } in + new_st, Some batch ) else - None + state, None in match rev_batch_opt with | None -> None @@ -63,27 +81,36 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = (* Reverse the list to retrieve the FIFO order. *) Some (List.rev batch) -let rec push_unprotected (self : _ t) ~(elems : _ list) : unit = - match elems with - | [] -> () - | x :: xs -> - self.q <- x :: self.q; - self.size <- 1 + self.size; - push_unprotected self ~elems:xs +let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state = + { + self with + size = self.size + List.length elems; + q = List.rev_append elems self.q; + } let push (self : _ t) elems : [ `Dropped | `Ok ] = - Util_mutex.protect self.mutex @@ fun () -> - if self.size >= self.high_watermark then - (* drop this to prevent queue from growing too fast *) - `Dropped + if elems = [] then + `Ok `Ok else ( - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); + let now = lazy (Mtime_clock.now ()) in + atomic_update_loop_ self @@ fun state -> + if state.size >= self.high_watermark then + (* drop this to prevent queue from growing too fast *) + state, `Dropped + else ( + let state = + if state.size = 0 && Option.is_some self.timeout then + (* current batch starts now *) + { state with start = Lazy.force now } + else + state + in - (* add to queue *) - push_unprotected self ~elems; - `Ok + (* add to queue *) + let state = push_unprotected_ state elems in + + state, `Ok + ) ) let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])