add Bb_queue.transfer

This commit is contained in:
Simon Cruanes 2023-09-14 22:30:44 -04:00
parent f90773a99a
commit ace43c0852
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 47 additions and 0 deletions

View file

@ -94,6 +94,23 @@ let size (self : _ t) : int =
Mutex.unlock self.mutex;
n
let transfer (self : 'a t) q2 : unit =
Mutex.lock self.mutex;
let continue = ref true in
while !continue do
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex
) else (
Queue.transfer self.q q2;
Mutex.unlock self.mutex;
continue := false
)
done
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit

View file

@ -32,6 +32,36 @@ val try_push : 'a t -> 'a -> bool
@raise Closed if the locking succeeded but the queue is closed.
*)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all items presently
in [bq] into [q2] in one atomic section, and clears [bq].
It blocks if no element is in [bq].
This is useful to consume elements from the queue in batch.
Create a [Queue.t] locally:
{[
let dowork (work_queue: job Bb_queue.t) =
(* local queue, not thread safe *)
let local_q = Queue.create() in
try
while true do
(* work on local events, already on this thread *)
while not (Queue.is_empty local_q) do
let job = Queue.pop local_q in
process_job job
done;
(* get all the events in the incoming blocking queue, in
one single critical section. *)
Bb_queue.transfer work_queue local_q
done
with Bb_queue.Closed -> ()
]}
@since NEXT_RELEASE *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)