From f23d22adb0e4f8b985549bb086fadb432063e857 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Wed, 28 May 2025 14:25:01 -0400 Subject: [PATCH] Mutex the batches --- .../opentelemetry_client_cohttp_eio.ml | 134 +++++++++++------- 1 file changed, 84 insertions(+), 50 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index a2941cd2..fbe293f5 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -179,9 +179,7 @@ end (** Batch of resources to be pushed later. - This type is thread-safe - - TODO: This is NOT safe accross domains. Need to wrap in mutex or re-architect. *) + This type is safe accross threads and domains. *) module Batch : sig type 'a t @@ -197,68 +195,104 @@ module Batch : sig val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t (** Create a new batch *) end = struct - type 'a t = { - mutable size: int; - mutable q: 'a list; - batch: int option; - high_watermark: int; - timeout: Mtime.span option; - mutable start: Mtime.t; - } - let make ?batch ?timeout () : _ t = - Option.iter (fun b -> assert (b > 0)) batch; - let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in - { - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; + module Q : sig + type _ t + val make : ?batch:int -> ?timeout:Mtime.span -> unit -> _ t + val timeout : _ t -> Mtime.span option + val size : _ t -> int + val start : _ t -> Mtime.t + val is_full : _ t -> bool + + (* Take all items queued for the batch, emptying the queue in the process *) + val drain_q : 'a t -> 'a list + + val push : 'a t -> 'a -> bool + end = struct + let mutex = Eio.Mutex.create () + let w_mutex f = Eio.Mutex.use_rw ~protect:true mutex f + let r_mutex f = Eio.Mutex.use_ro mutex f + + type 'a t = { + mutable size: int; + mutable q: 'a list; + batch: int option; + high_watermark: int; + timeout: Mtime.span option; + mutable start: Mtime.t; } + let make ?batch ?timeout () : _ t = + Option.iter (fun b -> assert (b > 0)) batch; + let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in + { + size = 0; + start = Mtime_clock.now (); + q = []; + batch; + timeout; + high_watermark; + } + + let drain_q t = w_mutex (fun () -> + let l = t.q in + t.q <- []; + t.size <- 0; + l) + + (* Immutable fields that don't need a mutex for safe access *) + let size t = t.size + + let is_full_ t : bool = + match t.batch with + | None -> t.size > 0 + | Some b -> t.size >= b + + let is_full t : bool = r_mutex (fun () -> is_full_ t) + + let push (t : _ t) x : bool = w_mutex (fun () -> + if t.size >= t.high_watermark then ( + (* drop this to prevent queue from growing too fast *) + Atomic.incr n_dropped; + true + ) else ( + if t.size = 0 && Option.is_some (t.timeout) then + (* current batch starts now *) + t.start <- Mtime_clock.now (); + + (* add to queue *) + t.size <- 1 + t.size; + t.q <- x :: t.q; + let ready = is_full_ t in + ready + )) + + (* Access to mutable fields, requiring a mutex for safe access *) + let timeout t = r_mutex (fun () -> t.timeout) + let start t = r_mutex (fun () -> t.start) + end + + type 'a t = 'a Q.t + + let make = Q.make + let timeout_expired_ ~now self : bool = - match self.timeout with + match Q.timeout self with | Some t -> - let elapsed = Mtime.span now self.start in + let elapsed = Mtime.span now (Q.start self) in Mtime.Span.compare elapsed t >= 0 | None -> false - let is_full_ self : bool = - match self.batch with - | None -> self.size > 0 - | Some b -> self.size >= b - let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) + if Q.size self > 0 && (force || Q.is_full self || timeout_expired_ ~now self) then ( - let l = self.q in - self.q <- []; - self.size <- 0; + let l = Q.drain_q self in assert (l <> []); Some l ) else None - let push (self : _ t) x : bool = - if self.size >= self.high_watermark then ( - (* drop this to prevent queue from growing too fast *) - Atomic.incr n_dropped; - true - ) else ( - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); - - (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready - ) - - let push' self x = ignore (push self x : bool) + let push' self x = ignore (Q.push self x : bool) end (** An emitter. This is used by {!Backend} below to forward traces/metrics/…