diff --git a/src/bb_queue.ml b/src/bb_queue.ml index 67390c17..3c46878e 100644 --- a/src/bb_queue.ml +++ b/src/bb_queue.ml @@ -94,6 +94,23 @@ let size (self : _ t) : int = Mutex.unlock self.mutex; n +let transfer (self : 'a t) q2 : unit = + Mutex.lock self.mutex; + let continue = ref true in + while !continue do + if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex + ) else ( + Queue.transfer self.q q2; + Mutex.unlock self.mutex; + continue := false + ) + done + type 'a gen = unit -> 'a option type 'a iter = ('a -> unit) -> unit diff --git a/src/bb_queue.mli b/src/bb_queue.mli index 3d38456e..b2428d7a 100644 --- a/src/bb_queue.mli +++ b/src/bb_queue.mli @@ -32,6 +32,36 @@ val try_push : 'a t -> 'a -> bool @raise Closed if the locking succeeded but the queue is closed. *) +val transfer : 'a t -> 'a Queue.t -> unit +(** [transfer bq q2] transfers all items presently + in [bq] into [q2] in one atomic section, and clears [bq]. + It blocks if no element is in [bq]. + + This is useful to consume elements from the queue in batch. + Create a [Queue.t] locally: + + + {[ + let dowork (work_queue: job Bb_queue.t) = + (* local queue, not thread safe *) + let local_q = Queue.create() in + try + while true do + (* work on local events, already on this thread *) + while not (Queue.is_empty local_q) do + let job = Queue.pop local_q in + process_job job + done; + + (* get all the events in the incoming blocking queue, in + one single critical section. *) + Bb_queue.transfer work_queue local_q + done + with Bb_queue.Closed -> () + ]} + + @since NEXT_RELEASE *) + val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *)