feat batch: get rid of Mutex

this should result in lower overhead for single threaded situations such
as lwt or eio.
This commit is contained in:
Simon Cruanes 2025-12-01 21:02:42 -05:00
parent 3a52b1642f
commit abe022dbc0
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -1,14 +1,18 @@
open Opentelemetry_util open Opentelemetry_util
module Otel = Opentelemetry
module A = Opentelemetry_atomic.Atomic
type 'a state = {
start: Mtime.t;
size: int;
q: 'a list; (** The queue is a FIFO represented as a list in reverse order *)
}
type 'a t = { type 'a t = {
mutable size: int; st: 'a state A.t;
mutable q: 'a list;
(** The queue is a FIFO represented as a list in reverse order *)
batch: int; (** Minimum size to batch before popping *) batch: int; (** Minimum size to batch before popping *)
high_watermark: int; (** Size above which we start dropping signals *) high_watermark: int; (** Size above which we start dropping signals *)
timeout: Mtime.span option; timeout: Mtime.span option;
mutable start: Mtime.t;
mutex: Mutex.t;
} }
let default_high_watermark batch_size = let default_high_watermark batch_size =
@ -17,6 +21,8 @@ let default_high_watermark batch_size =
else else
batch_size * 10 batch_size * 10
let _dummy_start = Mtime.min_stamp
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
let high_watermark = let high_watermark =
match high_watermark with match high_watermark with
@ -26,36 +32,48 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
let start = let start =
match now with match now with
| Some x -> x | Some x -> x
| None -> Mtime_clock.now () | None -> _dummy_start
in in
let mutex = Mutex.create () in
assert (batch > 0); assert (batch > 0);
{ size = 0; q = []; start; batch; timeout; high_watermark; mutex } { st = A.make { size = 0; q = []; start }; batch; timeout; high_watermark }
let timeout_expired_ ~now self : bool = let timeout_expired_ ~now ~timeout (self : _ state) : bool =
match self.timeout with match timeout with
| Some t -> | Some t ->
let elapsed = Mtime.span now self.start in let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0 Mtime.Span.compare elapsed t >= 0
| None -> false | None -> false
(* Big enough to send a batch *) (* Big enough to send a batch *)
let is_full_ self : bool = self.size >= self.batch let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
let ready_to_pop ~force ~now self = let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) =
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) self.size > 0
&& (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self)
let[@inline] atomic_update_loop_ (type res) (self : _ t)
(f : 'a state -> 'a state * res) : res =
let exception Return of res in
try
while true do
let st = A.get self.st in
let new_st, res = f st in
if A.compare_and_set self.st st new_st then raise_notrace (Return res)
done
with Return res -> res
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let rev_batch_opt = let rev_batch_opt =
Util_mutex.protect self.mutex @@ fun () -> atomic_update_loop_ self @@ fun state ->
if ready_to_pop ~force ~now self then ( let timeout = self.timeout in
assert (self.q <> []); let batch = self.batch in
let batch = self.q in if ready_to_pop_ ~force ~now ~batch ~timeout state then (
self.q <- []; assert (state.q <> []);
self.size <- 0; let batch = state.q in
Some batch let new_st = { q = []; size = 0; start = _dummy_start } in
new_st, Some batch
) else ) else
None state, None
in in
match rev_batch_opt with match rev_batch_opt with
| None -> None | None -> None
@ -63,27 +81,36 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
(* Reverse the list to retrieve the FIFO order. *) (* Reverse the list to retrieve the FIFO order. *)
Some (List.rev batch) Some (List.rev batch)
let rec push_unprotected (self : _ t) ~(elems : _ list) : unit = let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state =
match elems with {
| [] -> () self with
| x :: xs -> size = self.size + List.length elems;
self.q <- x :: self.q; q = List.rev_append elems self.q;
self.size <- 1 + self.size; }
push_unprotected self ~elems:xs
let push (self : _ t) elems : [ `Dropped | `Ok ] = let push (self : _ t) elems : [ `Dropped | `Ok ] =
Util_mutex.protect self.mutex @@ fun () -> if elems = [] then
if self.size >= self.high_watermark then `Ok `Ok
(* drop this to prevent queue from growing too fast *)
`Dropped
else ( else (
if self.size = 0 && Option.is_some self.timeout then let now = lazy (Mtime_clock.now ()) in
(* current batch starts now *) atomic_update_loop_ self @@ fun state ->
self.start <- Mtime_clock.now (); if state.size >= self.high_watermark then
(* drop this to prevent queue from growing too fast *)
state, `Dropped
else (
let state =
if state.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
{ state with start = Lazy.force now }
else
state
in
(* add to queue *) (* add to queue *)
push_unprotected self ~elems; let state = push_unprotected_ state elems in
`Ok
state, `Ok
)
) )
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])