From 90d59b40d91924baf853f0de2017dedcdf9fe5b4 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 4 Dec 2025 21:12:28 -0500 Subject: [PATCH] feat client: add bounded queue interface and sync-queue based implem --- src/client/bounded_queue.ml | 61 +++++++++++++++++++++++++++++++ src/client/bounded_queue_sync.ml | 50 +++++++++++++++++++++++++ src/client/bounded_queue_sync.mli | 5 +++ 3 files changed, 116 insertions(+) create mode 100644 src/client/bounded_queue.ml create mode 100644 src/client/bounded_queue_sync.ml create mode 100644 src/client/bounded_queue_sync.mli diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml new file mode 100644 index 00000000..b69a60a4 --- /dev/null +++ b/src/client/bounded_queue.ml @@ -0,0 +1,61 @@ +(** Interface for a thread-safe, bounded queue. + + After the high watermark is reached, pushing items into the queue will + instead discard them. *) + +open Common_ + +exception Closed +(** Raised when pushing into a closed queue *) + +type 'a pop_result = + [ `Empty + | `Closed + | `Item of 'a + ] + +type 'a t = { + push: 'a list -> unit; + (** Push items. This might discard some of them. + @raise Closed if the queue is closed. *) + num_discarded: unit -> int; (** How many items were discarded? *) + on_non_empty: (unit -> unit) -> unit; + (** [on_non_empty f] registers [f] to be called whenever the queue + transitions from empty to non-empty. *) + try_pop: unit -> 'a pop_result; + (** Try to pop an item right now. @raise Closed if the *) + close: unit -> unit; + closed: unit -> bool; +} + +let[@inline] push (self : _ t) x : unit = self.push x + +let[@inline] num_discarded self = self.num_discarded () + +let[@inline] try_pop (self : _ t) : _ pop_result = self.try_pop () + +let[@inline] on_non_empty (self : _ t) f = self.on_non_empty f + +let[@inline] close (self : _ t) : unit = self.close () + +let[@inline] closed (self : _ t) : bool = self.closed () + +let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = + let closed () = self.closed () in + let enabled () = not (closed ()) in + let emit x = if x <> [] then push self x in + let tick ~now:_ = () in + let flush_and_close () = close self in + { closed; enabled; emit; tick; flush_and_close } + +let logs_emitter (self : Any_resource.t t) : OTEL.Logger.t = + to_emitter self + |> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_logs_or_empty + +let spans_emitter (self : Any_resource.t t) : OTEL.Tracer.t = + to_emitter self + |> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_spans_or_empty + +let metrics_emitter (self : Any_resource.t t) : OTEL.Metrics_emitter.t = + to_emitter self + |> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_metrics_or_empty diff --git a/src/client/bounded_queue_sync.ml b/src/client/bounded_queue_sync.ml new file mode 100644 index 00000000..506bd214 --- /dev/null +++ b/src/client/bounded_queue_sync.ml @@ -0,0 +1,50 @@ +module BQ = Bounded_queue + +type 'a state = { + n_discarded: int Atomic.t; + high_watermark: int; + q: 'a Sync_queue.t; + on_non_empty: Cb_set.t; +} + +let push (self : _ state) x = + let discarded, old_size = + try + Sync_queue.push_while_not_full self.q ~high_watermark:self.high_watermark + x + with Sync_queue.Closed -> raise BQ.Closed + in + + if discarded > 0 then + ignore (Atomic.fetch_and_add self.n_discarded discarded : int); + + (* wake up lagards if the queue was empty *) + if old_size = 0 then Cb_set.trigger self.on_non_empty; + () + +let try_pop (self : _ state) : _ BQ.pop_result = + match Sync_queue.try_pop self.q with + | Some x -> `Item x + | None -> `Empty + | exception Sync_queue.Closed -> `Closed + +let to_bounded_queue (self : 'a state) : 'a BQ.t = + let closed () = Sync_queue.closed self.q in + let num_discarded () = Atomic.get self.n_discarded in + let push x = push self x in + let on_non_empty = Cb_set.register self.on_non_empty in + let try_pop () = try_pop self in + let close () = Sync_queue.close self.q in + { BQ.push; num_discarded; try_pop; on_non_empty; close; closed } + +let create ~high_watermark () : _ BQ.t = + let st = + { + high_watermark; + q = Sync_queue.create (); + n_discarded = Atomic.make 0; + on_non_empty = Cb_set.create (); + } + in + to_bounded_queue st + diff --git a/src/client/bounded_queue_sync.mli b/src/client/bounded_queue_sync.mli new file mode 100644 index 00000000..abdb710c --- /dev/null +++ b/src/client/bounded_queue_sync.mli @@ -0,0 +1,5 @@ +(** Bounded queue based on {!Sync_queue} *) + +val create : high_watermark:int -> unit -> 'a Bounded_queue.t +(** [create ~high_watermark ()] creates a new bounded queue based on + {!Sync_queue} *)