From 157957530a06e2852e0a441dcc6681fe5d278e11 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 29 Oct 2023 17:42:02 -0400 Subject: [PATCH] b_queue: add `transfer_into` for moving batches into the queue --- src/tef/b_queue.ml | 15 +++++++++++++++ src/tef/b_queue.mli | 3 +++ 2 files changed, 18 insertions(+) diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml index c2daa83..136183b 100644 --- a/src/tef/b_queue.ml +++ b/src/tef/b_queue.ml @@ -71,3 +71,18 @@ let transfer (self : 'a t) q2 : unit = do () done + +let transfer_into q (self : _ t) : unit = + if not (Queue.is_empty q) then ( + Mutex.lock self.mutex; + + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + let was_empty = Queue.is_empty self.q in + Queue.transfer q self.q; + if was_empty then Condition.broadcast self.cond; + Mutex.unlock self.mutex + ) diff --git a/src/tef/b_queue.mli b/src/tef/b_queue.mli index b0125b8..7c99261 100644 --- a/src/tef/b_queue.mli +++ b/src/tef/b_queue.mli @@ -19,5 +19,8 @@ val transfer : 'a t -> 'a Queue.t -> unit in [bq] into [q2], and clears [bq]. It blocks if no element is in [bq]. *) +val transfer_into : 'a Queue.t -> 'a t -> unit +(** [transfer q bq] transfers all items from [q] to [bq]. *) + val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *)