diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml index c2daa83..136183b 100644 --- a/src/tef/b_queue.ml +++ b/src/tef/b_queue.ml @@ -71,3 +71,18 @@ let transfer (self : 'a t) q2 : unit = do () done + +let transfer_into q (self : _ t) : unit = + if not (Queue.is_empty q) then ( + Mutex.lock self.mutex; + + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + let was_empty = Queue.is_empty self.q in + Queue.transfer q self.q; + if was_empty then Condition.broadcast self.cond; + Mutex.unlock self.mutex + ) diff --git a/src/tef/b_queue.mli b/src/tef/b_queue.mli index b0125b8..7c99261 100644 --- a/src/tef/b_queue.mli +++ b/src/tef/b_queue.mli @@ -19,5 +19,8 @@ val transfer : 'a t -> 'a Queue.t -> unit in [bq] into [q2], and clears [bq]. It blocks if no element is in [bq]. *) +val transfer_into : 'a Queue.t -> 'a t -> unit +(** [transfer q bq] transfers all items from [q] to [bq]. *) + val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *)