add Bounded_queue

This commit is contained in:
Simon Cruanes 2023-09-14 22:33:44 -04:00
parent 4059903e09
commit 3834bf0796
4 changed files with 243 additions and 0 deletions

170
src/bounded_queue.ml Normal file
View file

@ -0,0 +1,170 @@
type 'a t = {
max_size: int;
q: 'a Queue.t;
mutex: Mutex.t;
cond_push: Condition.t;
cond_pop: Condition.t;
mutable closed: bool;
}
exception Closed
let create ~max_size () : _ t =
if max_size < 1 then invalid_arg "Bounded_queue.create";
{
max_size;
mutex = Mutex.create ();
cond_push = Condition.create ();
cond_pop = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
(* awake waiters so they fail *)
Condition.broadcast self.cond_push;
Condition.broadcast self.cond_pop
);
Mutex.unlock self.mutex
(** Check if the queue is full. Precondition: [self.mutex] is acquired. *)
let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size
let push (self : _ t) x : unit =
let continue = ref true in
Mutex.lock self.mutex;
while !continue do
if self.closed then (
(* push always fails on a closed queue *)
Mutex.unlock self.mutex;
raise Closed
) else if is_full_ self then
Condition.wait self.cond_push self.mutex
else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
(* exit loop *)
continue := false;
Mutex.unlock self.mutex
)
done
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
(* pop fails on a closed queue if it's also empty,
otherwise it still returns the remaining elements *)
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex;
(loop [@tailcall]) ()
) else (
let was_full = is_full_ self in
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
(* wakeup pushers *)
if was_full then Condition.broadcast self.cond_push;
x
)
in
loop ()
let try_pop ~force_lock (self : _ t) : _ option =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
let was_full_before_pop = is_full_ self in
match Queue.pop self.q with
| x ->
Mutex.unlock self.mutex;
if was_full_before_pop then Condition.broadcast self.cond_push;
Some x
| exception Queue.Empty ->
Mutex.unlock self.mutex;
None
) else
None
let try_push (self : _ t) x : bool =
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
if is_full_ self then (
Mutex.unlock self.mutex;
false
) else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
Mutex.unlock self.mutex;
true
)
) else
false
let[@inline] max_size self = self.max_size
let size (self : _ t) : int =
Mutex.lock self.mutex;
let n = Queue.length self.q in
Mutex.unlock self.mutex;
n
let transfer (self : 'a t) q2 : unit =
Mutex.lock self.mutex;
let continue = ref true in
while !continue do
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex
) else (
let was_full = is_full_ self in
Queue.transfer self.q q2;
if was_full then Condition.broadcast self.cond_push;
continue := false;
Mutex.unlock self.mutex
)
done
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
let to_iter self k =
try
while true do
let x = pop self in
k x
done
with Closed -> ()
let to_gen self : _ gen =
fun () ->
match pop self with
| exception Closed -> None
| x -> Some x
let rec to_seq self : _ Seq.t =
fun () ->
match pop self with
| exception Closed -> Seq.Nil
| x -> Seq.Cons (x, to_seq self)

70
src/bounded_queue.mli Normal file
View file

@ -0,0 +1,70 @@
(** A blocking queue of finite size. *)
type 'a t
(** A bounded queue. *)
val create : max_size:int -> unit -> 'a t
val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q],
and after all the elements still in [q] currently are [pop]'d,
{!pop} will also raise {!Closed}. *)
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue.
If [q] is full, this will block until there is
room for [x].
@raise Closed if [q] is closed. *)
val try_push : 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons
if it cannot acquire [q] or if [q] is full.
@raise Closed if [q] is closed. *)
val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q]
is empty, until some element becomes available.
@raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] tries to pop the first element, or returns [None]
if no element is available or if it failed to acquire [q].
@param force_lock if true, use {!Mutex.lock} (which can block
under contention);
if false, use {!Mutex.try_lock}, which might return [None] even in
presence of an element if there's contention.
@raise Closed if [q] is empty and closed. *)
val size : _ t -> int
(** Number of elements currently in [q] *)
val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available
in [bq] into local queue [q2], and clears [bq], atomically.
It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *)
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue.
This might not terminate if [q] is never closed.
@since NEXT_RELEASE *)
val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue.
@since NEXT_RELEASE *)
val to_seq : 'a t -> 'a Seq.t
(** [to_gen q] returns a (transient) sequence from the queue.
@since NEXT_RELEASE *)

View file

@ -4,6 +4,7 @@ let start_thread_on_some_domain f x =
module Atomic = Atomic_
module Blocking_queue = Bb_queue
module Bounded_queue = Bounded_queue
module Chan = Chan
module Fork_join = Fork_join
module Fut = Fut

View file

@ -133,6 +133,8 @@ module Blocking_queue : sig
@since NEXT_RELEASE *)
end
module Bounded_queue = Bounded_queue
module Atomic = Atomic_
(** Atomic values.