From 500c9a8ba8c24975cc1913a076343af95605eb2c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 4 Dec 2025 21:11:32 -0500 Subject: [PATCH] sync_queue: more operations, including a batch push --- src/client/sync_queue.ml | 32 ++++++++++++++++++++++++++++++++ src/client/sync_queue.mli | 14 ++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/client/sync_queue.ml b/src/client/sync_queue.ml index 46d46af1..44d71a06 100644 --- a/src/client/sync_queue.ml +++ b/src/client/sync_queue.ml @@ -17,6 +17,10 @@ let create () : _ t = closed = false; } +(* NOTE: the race condition here is benign, assuming no tearing of + a value of type [bool] which OCaml's memory model should guarantee. *) +let[@inline] closed self = self.closed + let close (self : _ t) = UM.protect self.mutex @@ fun () -> if not self.closed then ( @@ -47,6 +51,11 @@ let pop (self : 'a t) : 'a = in UM.protect self.mutex loop +let try_pop (self : 'a t) : 'a option = + UM.protect self.mutex @@ fun () -> + if self.closed then raise Closed; + try Some (Queue.pop self.q) with Queue.Empty -> None + let pop_all (self : 'a t) into : unit = let rec loop () = if Queue.is_empty self.q then ( @@ -57,3 +66,26 @@ let pop_all (self : 'a t) into : unit = Queue.transfer self.q into in UM.protect self.mutex loop + +let push_while_not_full ~high_watermark (self : 'a t) (xs : 'a list) : int * int + = + UM.protect self.mutex @@ fun () -> + if self.closed then raise Closed; + + let old_size = Queue.length self.q in + let xs = ref xs in + + let continue = ref true in + while !continue && Queue.length self.q < high_watermark do + match !xs with + | [] -> continue := false + | x :: tl_xs -> + xs := tl_xs; + Queue.push x self.q + done; + + (* pushed at least one item *) + if Queue.length self.q <> old_size then Condition.broadcast self.cond; + + let n_discarded = List.length !xs in + n_discarded, old_size diff --git a/src/client/sync_queue.mli b/src/client/sync_queue.mli index d64296d7..b1ebb345 100644 --- a/src/client/sync_queue.mli +++ b/src/client/sync_queue.mli @@ -14,11 +14,25 @@ val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. @raise Closed if the queue was closed before a new element was available. *) +val try_pop : 'a t -> 'a option + val pop_all : 'a t -> 'a Queue.t -> unit (** [pop_all q into] pops all the elements of [q] and moves them into [into]. if no element is available, it will block until it successfully transfers at least one item to [into]. @raise Closed if the queue was closed before a new element was available. *) +val closed : _ t -> bool + val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *) + +val push_while_not_full : high_watermark:int -> 'a t -> 'a list -> int * int +(** [push_while_not_full q ~high_watermark xs] tries to push each item of [x] + into [q]. + + An item is not pushed if the queue is "full" (size >= high_watermark). + + This returns a pair [num_discarded, old_size] where [num_discarded] is the + number of items that could not be pushed, and [old_size] is the size before + anything was pushed. *)