diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index a8616aaa..174c624e 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -22,11 +22,22 @@ type 'a t = { on_non_empty: (unit -> unit) -> unit; (** [on_non_empty f] registers [f] to be called whenever the queue transitions from empty to non-empty. *) - try_pop: unit -> 'a pop_result; - (** Try to pop an item right now. @raise Closed if the *) + try_pop: unit -> 'a pop_result; (** Try to pop an item right now. *) close: unit -> unit; + (** Close the queue. Items currently in the queue will still be accessible + to consumers until the queue is emptied out. Idempotent. *) closed: unit -> bool; + (** Is the queue closed {b for writing}. Consumers should only use + [try_pop] because a queue that's closed-for-writing might still + contain straggler items that need to be consumed. + + This should be as fast and cheap as possible. *) } +(** A bounded queue, with multiple producers and potentially multiple consumers. + + All functions must be thread-safe except for [try_pop] which might not have + to be depending on the context (e.g. a Lwt-specific queue implementation + will consume only from the Lwt thread). *) let[@inline] push (self : _ t) x : unit = self.push x @@ -40,10 +51,14 @@ let[@inline] close (self : _ t) : unit = self.close () let[@inline] closed (self : _ t) : bool = self.closed () +(** Turn the writing end of the queue into an emitter. *) let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = let closed () = self.closed () in let enabled () = not (closed ()) in let emit x = if x <> [] then push self x in let tick ~now:_ = () in + + (* NOTE: we cannot actually flush, only close. Emptying the queue is + fundamentally asynchronous because it's done by consumers *) let flush_and_close () = close self in { closed; enabled; emit; tick; flush_and_close } diff --git a/src/client/bounded_queue_sync.ml b/src/client/bounded_queue_sync.ml index 506bd214..b4d80f7b 100644 --- a/src/client/bounded_queue_sync.ml +++ b/src/client/bounded_queue_sync.ml @@ -1,17 +1,89 @@ module BQ = Bounded_queue +exception Closed = Bounded_queue.Closed + +(* a variant of {!Sync_queue} with more bespoke pushing behavior *) +module Q : sig + type 'a t + + val create : unit -> 'a t + + val close : _ t -> unit + + val closed : _ t -> bool + + val try_pop : 'a t -> 'a option + + val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int + (** [push_while_not_full q ~high_watermark xs] tries to push each item of [x] + into [q]. + + An item is not pushed if the queue is "full" (size >= high_watermark). + + This returns a pair [num_discarded, old_size] where [num_discarded] is the + number of items that could not be pushed, and [old_size] is the size + before anything was pushed. *) +end = struct + module UM = Opentelemetry_util.Util_mutex + + type 'a t = { + mutex: Mutex.t; + q: 'a Queue.t; + mutable closed: bool; + } + + let create () : _ t = + { mutex = Mutex.create (); q = Queue.create (); closed = false } + + (* NOTE: the race condition here is benign, assuming no tearing of + a value of type [bool] which OCaml's memory model should guarantee. *) + let[@inline] closed self = self.closed + + let close (self : _ t) = + UM.protect self.mutex @@ fun () -> + if not self.closed then self.closed <- true + + let push (self : _ t) x : unit = + UM.protect self.mutex @@ fun () -> + if self.closed then raise Closed; + Queue.push x self.q + + let try_pop (self : 'a t) : 'a option = + UM.protect self.mutex @@ fun () -> + if self.closed then raise Closed; + try Some (Queue.pop self.q) with Queue.Empty -> None + + let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : + int * int = + UM.protect self.mutex @@ fun () -> + if self.closed then raise Closed; + + let old_size = Queue.length self.q in + let xs = ref xs in + + let continue = ref true in + while !continue && Queue.length self.q < high_watermark do + match !xs with + | [] -> continue := false + | x :: tl_xs -> + xs := tl_xs; + Queue.push x self.q + done; + + let n_discarded = List.length !xs in + n_discarded, old_size +end + type 'a state = { n_discarded: int Atomic.t; high_watermark: int; - q: 'a Sync_queue.t; + q: 'a Q.t; on_non_empty: Cb_set.t; } let push (self : _ state) x = let discarded, old_size = - try - Sync_queue.push_while_not_full self.q ~high_watermark:self.high_watermark - x + try Q.push_while_not_full self.q ~high_watermark:self.high_watermark x with Sync_queue.Closed -> raise BQ.Closed in @@ -23,28 +95,27 @@ let push (self : _ state) x = () let try_pop (self : _ state) : _ BQ.pop_result = - match Sync_queue.try_pop self.q with + match Q.try_pop self.q with | Some x -> `Item x | None -> `Empty | exception Sync_queue.Closed -> `Closed let to_bounded_queue (self : 'a state) : 'a BQ.t = - let closed () = Sync_queue.closed self.q in + let closed () = Q.closed self.q in let num_discarded () = Atomic.get self.n_discarded in let push x = push self x in let on_non_empty = Cb_set.register self.on_non_empty in let try_pop () = try_pop self in - let close () = Sync_queue.close self.q in + let close () = Q.close self.q in { BQ.push; num_discarded; try_pop; on_non_empty; close; closed } let create ~high_watermark () : _ BQ.t = let st = { high_watermark; - q = Sync_queue.create (); + q = Q.create (); n_discarded = Atomic.make 0; on_non_empty = Cb_set.create (); } in to_bounded_queue st - diff --git a/src/client/bounded_queue_sync.mli b/src/client/bounded_queue_sync.mli index abdb710c..d3cf6347 100644 --- a/src/client/bounded_queue_sync.mli +++ b/src/client/bounded_queue_sync.mli @@ -1,4 +1,6 @@ -(** Bounded queue based on {!Sync_queue} *) +(** Bounded queue based on simple synchronization primitives. + + This is not the fastest queue but it should be versatile. *) val create : high_watermark:int -> unit -> 'a Bounded_queue.t (** [create ~high_watermark ()] creates a new bounded queue based on diff --git a/src/client/sync_queue.ml b/src/client/sync_queue.ml index 44d71a06..46d46af1 100644 --- a/src/client/sync_queue.ml +++ b/src/client/sync_queue.ml @@ -17,10 +17,6 @@ let create () : _ t = closed = false; } -(* NOTE: the race condition here is benign, assuming no tearing of - a value of type [bool] which OCaml's memory model should guarantee. *) -let[@inline] closed self = self.closed - let close (self : _ t) = UM.protect self.mutex @@ fun () -> if not self.closed then ( @@ -51,11 +47,6 @@ let pop (self : 'a t) : 'a = in UM.protect self.mutex loop -let try_pop (self : 'a t) : 'a option = - UM.protect self.mutex @@ fun () -> - if self.closed then raise Closed; - try Some (Queue.pop self.q) with Queue.Empty -> None - let pop_all (self : 'a t) into : unit = let rec loop () = if Queue.is_empty self.q then ( @@ -66,26 +57,3 @@ let pop_all (self : 'a t) into : unit = Queue.transfer self.q into in UM.protect self.mutex loop - -let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : int * int - = - UM.protect self.mutex @@ fun () -> - if self.closed then raise Closed; - - let old_size = Queue.length self.q in - let xs = ref xs in - - let continue = ref true in - while !continue && Queue.length self.q < high_watermark do - match !xs with - | [] -> continue := false - | x :: tl_xs -> - xs := tl_xs; - Queue.push x self.q - done; - - (* pushed at least one item *) - if Queue.length self.q <> old_size then Condition.broadcast self.cond; - - let n_discarded = List.length !xs in - n_discarded, old_size diff --git a/src/client/sync_queue.mli b/src/client/sync_queue.mli index b1ebb345..d64296d7 100644 --- a/src/client/sync_queue.mli +++ b/src/client/sync_queue.mli @@ -14,25 +14,11 @@ val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. @raise Closed if the queue was closed before a new element was available. *) -val try_pop : 'a t -> 'a option - val pop_all : 'a t -> 'a Queue.t -> unit (** [pop_all q into] pops all the elements of [q] and moves them into [into]. if no element is available, it will block until it successfully transfers at least one item to [into]. @raise Closed if the queue was closed before a new element was available. *) -val closed : _ t -> bool - val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *) - -val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int -(** [push_while_not_full q ~high_watermark xs] tries to push each item of [x] - into [q]. - - An item is not pushed if the queue is "full" (size >= high_watermark). - - This returns a pair [num_discarded, old_size] where [num_discarded] is the - number of items that could not be pushed, and [old_size] is the size before - anything was pushed. *)