feat client: add bounded queue interface and sync-queue based implem

This commit is contained in:
Simon Cruanes 2025-12-04 21:12:28 -05:00
parent d1a451550b
commit 53cb32308a
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 116 additions and 0 deletions

View file

@ -0,0 +1,61 @@
(** Interface for a thread-safe, bounded queue.
After the high watermark is reached, pushing items into the queue will
instead discard them. *)
open Common_
exception Closed
(** Raised when pushing into a closed queue *)
type 'a pop_result =
[ `Empty
| `Closed
| `Item of 'a
]
type 'a t = {
push: 'a list -> unit;
(** Push items. This might discard some of them.
@raise Closed if the queue is closed. *)
num_discarded: unit -> int; (** How many items were discarded? *)
on_non_empty: (unit -> unit) -> unit;
(** [on_non_empty f] registers [f] to be called whenever the queue
transitions from empty to non-empty. *)
try_pop: unit -> 'a pop_result;
(** Try to pop an item right now. @raise Closed if the *)
close: unit -> unit;
closed: unit -> bool;
}
let[@inline] push (self : _ t) x : unit = self.push x
let[@inline] num_discarded self = self.num_discarded ()
let[@inline] try_pop (self : _ t) : _ pop_result = self.try_pop ()
let[@inline] on_non_empty (self : _ t) f = self.on_non_empty f
let[@inline] close (self : _ t) : unit = self.close ()
let[@inline] closed (self : _ t) : bool = self.closed ()
let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t =
let closed () = self.closed () in
let enabled () = not (closed ()) in
let emit x = if x <> [] then push self x in
let tick ~now:_ = () in
let flush_and_close () = close self in
{ closed; enabled; emit; tick; flush_and_close }
let logs_emitter (self : Any_resource.t t) : OTEL.Logger.t =
to_emitter self
|> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_logs_or_empty
let spans_emitter (self : Any_resource.t t) : OTEL.Tracer.t =
to_emitter self
|> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_spans_or_empty
let metrics_emitter (self : Any_resource.t t) : OTEL.Metrics_emitter.t =
to_emitter self
|> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_metrics_or_empty

View file

@ -0,0 +1,50 @@
module BQ = Bounded_queue
type 'a state = {
n_discarded: int Atomic.t;
high_watermark: int;
q: 'a Sync_queue.t;
on_non_empty: Cb_set.t;
}
let push (self : _ state) x =
let discarded, old_size =
try
Sync_queue.push_while_not_full self.q ~high_watermark:self.high_watermark
x
with Sync_queue.Closed -> raise BQ.Closed
in
if discarded > 0 then
ignore (Atomic.fetch_and_add self.n_discarded discarded : int);
(* wake up lagards if the queue was empty *)
if old_size = 0 then Cb_set.trigger self.on_non_empty;
()
let try_pop (self : _ state) : _ BQ.pop_result =
match Sync_queue.try_pop self.q with
| Some x -> `Item x
| None -> `Empty
| exception Sync_queue.Closed -> `Closed
let to_bounded_queue (self : 'a state) : 'a BQ.t =
let closed () = Sync_queue.closed self.q in
let num_discarded () = Atomic.get self.n_discarded in
let push x = push self x in
let on_non_empty = Cb_set.register self.on_non_empty in
let try_pop () = try_pop self in
let close () = Sync_queue.close self.q in
{ BQ.push; num_discarded; try_pop; on_non_empty; close; closed }
let create ~high_watermark () : _ BQ.t =
let st =
{
high_watermark;
q = Sync_queue.create ();
n_discarded = Atomic.make 0;
on_non_empty = Cb_set.create ();
}
in
to_bounded_queue st

View file

@ -0,0 +1,5 @@
(** Bounded queue based on {!Sync_queue} *)
val create : high_watermark:int -> unit -> 'a Bounded_queue.t
(** [create ~high_watermark ()] creates a new bounded queue based on
{!Sync_queue} *)