mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
add Bounded_queue
This commit is contained in:
parent
4059903e09
commit
8ade177b0e
4 changed files with 243 additions and 0 deletions
170
src/bounded_queue.ml
Normal file
170
src/bounded_queue.ml
Normal file
|
|
@ -0,0 +1,170 @@
|
||||||
|
type 'a t = {
|
||||||
|
max_size: int;
|
||||||
|
q: 'a Queue.t;
|
||||||
|
mutex: Mutex.t;
|
||||||
|
cond_push: Condition.t;
|
||||||
|
cond_pop: Condition.t;
|
||||||
|
mutable closed: bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
exception Closed
|
||||||
|
|
||||||
|
let create ~max_size () : _ t =
|
||||||
|
if max_size < 1 then invalid_arg "Bounded_queue.create";
|
||||||
|
{
|
||||||
|
max_size;
|
||||||
|
mutex = Mutex.create ();
|
||||||
|
cond_push = Condition.create ();
|
||||||
|
cond_pop = Condition.create ();
|
||||||
|
q = Queue.create ();
|
||||||
|
closed = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let close (self : _ t) =
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
if not self.closed then (
|
||||||
|
self.closed <- true;
|
||||||
|
(* awake waiters so they fail *)
|
||||||
|
Condition.broadcast self.cond_push;
|
||||||
|
Condition.broadcast self.cond_pop
|
||||||
|
);
|
||||||
|
Mutex.unlock self.mutex
|
||||||
|
|
||||||
|
(** Check if the queue is full. Precondition: [self.mutex] is acquired. *)
|
||||||
|
let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size
|
||||||
|
|
||||||
|
let push (self : _ t) x : unit =
|
||||||
|
let continue = ref true in
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
while !continue do
|
||||||
|
if self.closed then (
|
||||||
|
(* push always fails on a closed queue *)
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
) else if is_full_ self then
|
||||||
|
Condition.wait self.cond_push self.mutex
|
||||||
|
else (
|
||||||
|
let was_empty = Queue.is_empty self.q in
|
||||||
|
Queue.push x self.q;
|
||||||
|
if was_empty then Condition.broadcast self.cond_pop;
|
||||||
|
|
||||||
|
(* exit loop *)
|
||||||
|
continue := false;
|
||||||
|
Mutex.unlock self.mutex
|
||||||
|
)
|
||||||
|
done
|
||||||
|
|
||||||
|
let pop (self : 'a t) : 'a =
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
let rec loop () =
|
||||||
|
if Queue.is_empty self.q then (
|
||||||
|
if self.closed then (
|
||||||
|
(* pop fails on a closed queue if it's also empty,
|
||||||
|
otherwise it still returns the remaining elements *)
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
);
|
||||||
|
|
||||||
|
Condition.wait self.cond_pop self.mutex;
|
||||||
|
(loop [@tailcall]) ()
|
||||||
|
) else (
|
||||||
|
let was_full = is_full_ self in
|
||||||
|
let x = Queue.pop self.q in
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
(* wakeup pushers *)
|
||||||
|
if was_full then Condition.broadcast self.cond_push;
|
||||||
|
x
|
||||||
|
)
|
||||||
|
in
|
||||||
|
loop ()
|
||||||
|
|
||||||
|
let try_pop ~force_lock (self : _ t) : _ option =
|
||||||
|
let has_lock =
|
||||||
|
if force_lock then (
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
true
|
||||||
|
) else
|
||||||
|
Mutex.try_lock self.mutex
|
||||||
|
in
|
||||||
|
if has_lock then (
|
||||||
|
let was_full_before_pop = is_full_ self in
|
||||||
|
match Queue.pop self.q with
|
||||||
|
| x ->
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
if was_full_before_pop then Condition.broadcast self.cond_push;
|
||||||
|
Some x
|
||||||
|
| exception Queue.Empty ->
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
None
|
||||||
|
) else
|
||||||
|
None
|
||||||
|
|
||||||
|
let try_push (self : _ t) x : bool =
|
||||||
|
if Mutex.try_lock self.mutex then (
|
||||||
|
if self.closed then (
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
);
|
||||||
|
|
||||||
|
if is_full_ self then (
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
false
|
||||||
|
) else (
|
||||||
|
let was_empty = Queue.is_empty self.q in
|
||||||
|
Queue.push x self.q;
|
||||||
|
if was_empty then Condition.broadcast self.cond_pop;
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
true
|
||||||
|
)
|
||||||
|
) else
|
||||||
|
false
|
||||||
|
|
||||||
|
let[@inline] max_size self = self.max_size
|
||||||
|
|
||||||
|
let size (self : _ t) : int =
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
let n = Queue.length self.q in
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
n
|
||||||
|
|
||||||
|
let transfer (self : 'a t) q2 : unit =
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
let continue = ref true in
|
||||||
|
while !continue do
|
||||||
|
if Queue.is_empty self.q then (
|
||||||
|
if self.closed then (
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
);
|
||||||
|
Condition.wait self.cond_pop self.mutex
|
||||||
|
) else (
|
||||||
|
let was_full = is_full_ self in
|
||||||
|
Queue.transfer self.q q2;
|
||||||
|
if was_full then Condition.broadcast self.cond_push;
|
||||||
|
continue := false;
|
||||||
|
Mutex.unlock self.mutex
|
||||||
|
)
|
||||||
|
done
|
||||||
|
|
||||||
|
type 'a gen = unit -> 'a option
|
||||||
|
type 'a iter = ('a -> unit) -> unit
|
||||||
|
|
||||||
|
let to_iter self k =
|
||||||
|
try
|
||||||
|
while true do
|
||||||
|
let x = pop self in
|
||||||
|
k x
|
||||||
|
done
|
||||||
|
with Closed -> ()
|
||||||
|
|
||||||
|
let to_gen self : _ gen =
|
||||||
|
fun () ->
|
||||||
|
match pop self with
|
||||||
|
| exception Closed -> None
|
||||||
|
| x -> Some x
|
||||||
|
|
||||||
|
let rec to_seq self : _ Seq.t =
|
||||||
|
fun () ->
|
||||||
|
match pop self with
|
||||||
|
| exception Closed -> Seq.Nil
|
||||||
|
| x -> Seq.Cons (x, to_seq self)
|
||||||
70
src/bounded_queue.mli
Normal file
70
src/bounded_queue.mli
Normal file
|
|
@ -0,0 +1,70 @@
|
||||||
|
(** A blocking queue of finite size. *)
|
||||||
|
|
||||||
|
type 'a t
|
||||||
|
(** A bounded queue. *)
|
||||||
|
|
||||||
|
val create : max_size:int -> unit -> 'a t
|
||||||
|
|
||||||
|
val close : _ t -> unit
|
||||||
|
(** [close q] closes [q]. No new elements can be pushed into [q],
|
||||||
|
and after all the elements still in [q] currently are [pop]'d,
|
||||||
|
{!pop} will also raise {!Closed}. *)
|
||||||
|
|
||||||
|
exception Closed
|
||||||
|
|
||||||
|
val push : 'a t -> 'a -> unit
|
||||||
|
(** [push q x] pushes [x] at the end of the queue.
|
||||||
|
If [q] is full, this will block until there is
|
||||||
|
room for [x].
|
||||||
|
@raise Closed if [q] is closed. *)
|
||||||
|
|
||||||
|
val try_push : 'a t -> 'a -> bool
|
||||||
|
(** [try_push q x] attempts to push [x] into [q], but abandons
|
||||||
|
if it cannot acquire [q] or if [q] is full.
|
||||||
|
@raise Closed if [q] is closed. *)
|
||||||
|
|
||||||
|
val pop : 'a t -> 'a
|
||||||
|
(** [pop q] pops the first element off [q]. It blocks if [q]
|
||||||
|
is empty, until some element becomes available.
|
||||||
|
@raise Closed if [q] is empty and closed. *)
|
||||||
|
|
||||||
|
val try_pop : force_lock:bool -> 'a t -> 'a option
|
||||||
|
(** [try_pop q] tries to pop the first element, or returns [None]
|
||||||
|
if no element is available or if it failed to acquire [q].
|
||||||
|
|
||||||
|
@param force_lock if true, use {!Mutex.lock} (which can block
|
||||||
|
under contention);
|
||||||
|
if false, use {!Mutex.try_lock}, which might return [None] even in
|
||||||
|
presence of an element if there's contention.
|
||||||
|
|
||||||
|
@raise Closed if [q] is empty and closed. *)
|
||||||
|
|
||||||
|
val size : _ t -> int
|
||||||
|
(** Number of elements currently in [q] *)
|
||||||
|
|
||||||
|
val max_size : _ t -> int
|
||||||
|
(** Maximum size of the queue. See {!create}. *)
|
||||||
|
|
||||||
|
val transfer : 'a t -> 'a Queue.t -> unit
|
||||||
|
(** [transfer bq q2] transfers all elements currently available
|
||||||
|
in [bq] into local queue [q2], and clears [bq], atomically.
|
||||||
|
It blocks if [bq] is empty.
|
||||||
|
|
||||||
|
See {!Bb_queue.transfer} for more details.
|
||||||
|
@raise Closed if [bq] is empty and closed. *)
|
||||||
|
|
||||||
|
type 'a gen = unit -> 'a option
|
||||||
|
type 'a iter = ('a -> unit) -> unit
|
||||||
|
|
||||||
|
val to_iter : 'a t -> 'a iter
|
||||||
|
(** [to_iter q] returns an iterator over all items in the queue.
|
||||||
|
This might not terminate if [q] is never closed.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
val to_gen : 'a t -> 'a gen
|
||||||
|
(** [to_gen q] returns a generator from the queue.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
||||||
|
val to_seq : 'a t -> 'a Seq.t
|
||||||
|
(** [to_gen q] returns a (transient) sequence from the queue.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
|
|
@ -4,6 +4,7 @@ let start_thread_on_some_domain f x =
|
||||||
|
|
||||||
module Atomic = Atomic_
|
module Atomic = Atomic_
|
||||||
module Blocking_queue = Bb_queue
|
module Blocking_queue = Bb_queue
|
||||||
|
module Bounded_queue = Bounded_queue
|
||||||
module Chan = Chan
|
module Chan = Chan
|
||||||
module Fork_join = Fork_join
|
module Fork_join = Fork_join
|
||||||
module Fut = Fut
|
module Fut = Fut
|
||||||
|
|
|
||||||
|
|
@ -133,6 +133,8 @@ module Blocking_queue : sig
|
||||||
@since NEXT_RELEASE *)
|
@since NEXT_RELEASE *)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
module Bounded_queue = Bounded_queue
|
||||||
|
|
||||||
module Atomic = Atomic_
|
module Atomic = Atomic_
|
||||||
(** Atomic values.
|
(** Atomic values.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue