mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
remove bounded_queue
This commit is contained in:
parent
867cbd2318
commit
5ea9a3f587
6 changed files with 1 additions and 380 deletions
|
|
@ -1,182 +0,0 @@
|
|||
type 'a t = {
|
||||
max_size: int;
|
||||
q: 'a Queue.t;
|
||||
mutex: Mutex.t;
|
||||
cond_push: Condition.t;
|
||||
cond_pop: Condition.t;
|
||||
mutable closed: bool;
|
||||
}
|
||||
|
||||
exception Closed
|
||||
|
||||
let create ~max_size () : _ t =
|
||||
if max_size < 1 then invalid_arg "Bounded_queue.create";
|
||||
{
|
||||
max_size;
|
||||
mutex = Mutex.create ();
|
||||
cond_push = Condition.create ();
|
||||
cond_pop = Condition.create ();
|
||||
q = Queue.create ();
|
||||
closed = false;
|
||||
}
|
||||
|
||||
let close (self : _ t) =
|
||||
Mutex.lock self.mutex;
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
(* awake waiters so they fail *)
|
||||
Condition.broadcast self.cond_push;
|
||||
Condition.broadcast self.cond_pop
|
||||
);
|
||||
Mutex.unlock self.mutex
|
||||
|
||||
(** Check if the queue is full. Precondition: [self.mutex] is acquired. *)
|
||||
let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size
|
||||
|
||||
let push (self : _ t) x : unit =
|
||||
let continue = ref true in
|
||||
Mutex.lock self.mutex;
|
||||
while !continue do
|
||||
if self.closed then (
|
||||
(* push always fails on a closed queue *)
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
) else if is_full_ self then
|
||||
Condition.wait self.cond_push self.mutex
|
||||
else (
|
||||
let was_empty = Queue.is_empty self.q in
|
||||
Queue.push x self.q;
|
||||
if was_empty then Condition.broadcast self.cond_pop;
|
||||
|
||||
(* exit loop *)
|
||||
continue := false;
|
||||
Mutex.unlock self.mutex
|
||||
)
|
||||
done
|
||||
|
||||
let pop (self : 'a t) : 'a =
|
||||
Mutex.lock self.mutex;
|
||||
let rec loop () =
|
||||
if Queue.is_empty self.q then (
|
||||
if self.closed then (
|
||||
(* pop fails on a closed queue if it's also empty,
|
||||
otherwise it still returns the remaining elements *)
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
Condition.wait self.cond_pop self.mutex;
|
||||
(loop [@tailcall]) ()
|
||||
) else (
|
||||
let was_full = is_full_ self in
|
||||
let x = Queue.pop self.q in
|
||||
(* wakeup pushers that were blocked *)
|
||||
if was_full then Condition.broadcast self.cond_push;
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
)
|
||||
in
|
||||
loop ()
|
||||
|
||||
let try_pop ~force_lock (self : _ t) : _ option =
|
||||
let has_lock =
|
||||
if force_lock then (
|
||||
Mutex.lock self.mutex;
|
||||
true
|
||||
) else
|
||||
Mutex.try_lock self.mutex
|
||||
in
|
||||
if has_lock then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
let was_full_before_pop = is_full_ self in
|
||||
match Queue.pop self.q with
|
||||
| x ->
|
||||
(* wakeup pushers that are blocked *)
|
||||
if was_full_before_pop then Condition.broadcast self.cond_push;
|
||||
Mutex.unlock self.mutex;
|
||||
Some x
|
||||
| exception Queue.Empty ->
|
||||
Mutex.unlock self.mutex;
|
||||
None
|
||||
) else
|
||||
None
|
||||
|
||||
let try_push ~force_lock (self : _ t) x : bool =
|
||||
let has_lock =
|
||||
if force_lock then (
|
||||
Mutex.lock self.mutex;
|
||||
true
|
||||
) else
|
||||
Mutex.try_lock self.mutex
|
||||
in
|
||||
if has_lock then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
if is_full_ self then (
|
||||
Mutex.unlock self.mutex;
|
||||
false
|
||||
) else (
|
||||
let was_empty = Queue.is_empty self.q in
|
||||
Queue.push x self.q;
|
||||
if was_empty then Condition.broadcast self.cond_pop;
|
||||
Mutex.unlock self.mutex;
|
||||
true
|
||||
)
|
||||
) else
|
||||
false
|
||||
|
||||
let[@inline] max_size self = self.max_size
|
||||
|
||||
let size (self : _ t) : int =
|
||||
Mutex.lock self.mutex;
|
||||
let n = Queue.length self.q in
|
||||
Mutex.unlock self.mutex;
|
||||
n
|
||||
|
||||
let transfer (self : 'a t) q2 : unit =
|
||||
Mutex.lock self.mutex;
|
||||
let continue = ref true in
|
||||
while !continue do
|
||||
if Queue.is_empty self.q then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
Condition.wait self.cond_pop self.mutex
|
||||
) else (
|
||||
let was_full = is_full_ self in
|
||||
Queue.transfer self.q q2;
|
||||
if was_full then Condition.broadcast self.cond_push;
|
||||
continue := false;
|
||||
Mutex.unlock self.mutex
|
||||
)
|
||||
done
|
||||
|
||||
type 'a gen = unit -> 'a option
|
||||
type 'a iter = ('a -> unit) -> unit
|
||||
|
||||
let to_iter self k =
|
||||
try
|
||||
while true do
|
||||
let x = pop self in
|
||||
k x
|
||||
done
|
||||
with Closed -> ()
|
||||
|
||||
let to_gen self : _ gen =
|
||||
fun () ->
|
||||
match pop self with
|
||||
| exception Closed -> None
|
||||
| x -> Some x
|
||||
|
||||
let rec to_seq self : _ Seq.t =
|
||||
fun () ->
|
||||
match pop self with
|
||||
| exception Closed -> Seq.Nil
|
||||
| x -> Seq.Cons (x, to_seq self)
|
||||
|
|
@ -1,82 +0,0 @@
|
|||
(** A blocking queue of finite size.
|
||||
|
||||
This queue, while still using locks underneath (like the regular blocking
|
||||
queue) should be enough for usage under reasonable contention.
|
||||
|
||||
The bounded size is helpful whenever some form of backpressure is desirable:
|
||||
if the queue is used to communicate between producer(s) and consumer(s), the
|
||||
consumer(s) can limit the rate at which producer(s) send new work down their
|
||||
way. Whenever the queue is full, means that producer(s) will have to wait
|
||||
before pushing new work.
|
||||
|
||||
@since 0.4 *)
|
||||
|
||||
type 'a t
|
||||
(** A bounded queue. *)
|
||||
|
||||
val create : max_size:int -> unit -> 'a t
|
||||
|
||||
val close : _ t -> unit
|
||||
(** [close q] closes [q]. No new elements can be pushed into [q], and after all
|
||||
the elements still in [q] currently are [pop]'d, {!pop} will also raise
|
||||
{!Closed}. *)
|
||||
|
||||
exception Closed
|
||||
|
||||
val push : 'a t -> 'a -> unit
|
||||
(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will
|
||||
block until there is room for [x].
|
||||
@raise Closed if [q] is closed. *)
|
||||
|
||||
val try_push : force_lock:bool -> 'a t -> 'a -> bool
|
||||
(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot
|
||||
acquire [q] or if [q] is full.
|
||||
|
||||
@param force_lock
|
||||
if true, use {!Mutex.lock} (which can block under contention); if false,
|
||||
use {!Mutex.try_lock}, which might return [false] even if there's room in
|
||||
the queue.
|
||||
|
||||
@raise Closed if [q] is closed. *)
|
||||
|
||||
val pop : 'a t -> 'a
|
||||
(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until
|
||||
some element becomes available.
|
||||
@raise Closed if [q] is empty and closed. *)
|
||||
|
||||
val try_pop : force_lock:bool -> 'a t -> 'a option
|
||||
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if
|
||||
no element is available or if it failed to acquire [q].
|
||||
|
||||
@param force_lock
|
||||
if true, use {!Mutex.lock} (which can block under contention); if false,
|
||||
use {!Mutex.try_lock}, which might return [None] even in presence of an
|
||||
element if there's contention.
|
||||
|
||||
@raise Closed if [q] is empty and closed. *)
|
||||
|
||||
val size : _ t -> int
|
||||
(** Number of elements currently in [q] *)
|
||||
|
||||
val max_size : _ t -> int
|
||||
(** Maximum size of the queue. See {!create}. *)
|
||||
|
||||
val transfer : 'a t -> 'a Queue.t -> unit
|
||||
(** [transfer bq q2] transfers all elements currently available in [bq] into
|
||||
local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty.
|
||||
|
||||
See {!Bb_queue.transfer} for more details.
|
||||
@raise Closed if [bq] is empty and closed. *)
|
||||
|
||||
type 'a gen = unit -> 'a option
|
||||
type 'a iter = ('a -> unit) -> unit
|
||||
|
||||
val to_iter : 'a t -> 'a iter
|
||||
(** [to_iter q] returns an iterator over all items in the queue. This might not
|
||||
terminate if [q] is never closed. *)
|
||||
|
||||
val to_gen : 'a t -> 'a gen
|
||||
(** [to_gen q] returns a generator from the queue. *)
|
||||
|
||||
val to_seq : 'a t -> 'a Seq.t
|
||||
(** [to_gen q] returns a (transient) sequence from the queue. *)
|
||||
|
|
@ -23,7 +23,6 @@ let yield = Picos.Fiber.yield
|
|||
module Atomic = Atomic_
|
||||
module Blocking_queue = Bb_queue
|
||||
module Background_thread = Background_thread
|
||||
module Bounded_queue = Bounded_queue
|
||||
module Chan = Chan
|
||||
module Exn_bt = Exn_bt
|
||||
module Fifo_pool = Fifo_pool
|
||||
|
|
|
|||
|
|
@ -203,8 +203,6 @@ module Blocking_queue : sig
|
|||
@since 0.4 *)
|
||||
end
|
||||
|
||||
module Bounded_queue = Bounded_queue
|
||||
|
||||
module Atomic = Atomic_
|
||||
(** Atomic values.
|
||||
|
||||
|
|
|
|||
|
|
@ -10,8 +10,7 @@
|
|||
t_resource
|
||||
t_unfair
|
||||
t_ws_deque
|
||||
t_ws_wait
|
||||
t_bounded_queue)
|
||||
t_ws_wait)
|
||||
(package moonpool)
|
||||
(libraries
|
||||
moonpool
|
||||
|
|
|
|||
|
|
@ -1,111 +0,0 @@
|
|||
module BQ = Moonpool.Bounded_queue
|
||||
module Bb_queue = Moonpool.Blocking_queue
|
||||
module A = Moonpool.Atomic
|
||||
|
||||
let spawn f = ignore (Moonpool.start_thread_on_some_domain f () : Thread.t)
|
||||
|
||||
let () =
|
||||
let bq = BQ.create ~max_size:3 () in
|
||||
BQ.push bq 1;
|
||||
BQ.push bq 2;
|
||||
assert (BQ.size bq = 2);
|
||||
assert (BQ.pop bq = 1);
|
||||
assert (BQ.pop bq = 2);
|
||||
|
||||
assert (BQ.try_pop ~force_lock:true bq = None);
|
||||
spawn (fun () -> BQ.push bq 3);
|
||||
assert (BQ.pop bq = 3)
|
||||
|
||||
let () =
|
||||
(* cannot create with size 0 *)
|
||||
assert (
|
||||
try
|
||||
ignore (BQ.create ~max_size:0 ());
|
||||
false
|
||||
with _ -> true)
|
||||
|
||||
let () =
|
||||
let bq = BQ.create ~max_size:3 () in
|
||||
BQ.push bq 1;
|
||||
BQ.push bq 2;
|
||||
assert (BQ.size bq = 2);
|
||||
assert (BQ.pop bq = 1);
|
||||
|
||||
BQ.close bq;
|
||||
assert (BQ.pop bq = 2);
|
||||
assert (
|
||||
try
|
||||
ignore (BQ.pop bq);
|
||||
false
|
||||
with BQ.Closed -> true);
|
||||
assert (
|
||||
try
|
||||
ignore (BQ.push bq 42);
|
||||
false
|
||||
with BQ.Closed -> true)
|
||||
|
||||
let () =
|
||||
let bq = BQ.create ~max_size:2 () in
|
||||
let side_q = Bb_queue.create () in
|
||||
BQ.push bq 1;
|
||||
BQ.push bq 2;
|
||||
|
||||
spawn (fun () ->
|
||||
for i = 3 to 10 do
|
||||
BQ.push bq i;
|
||||
Bb_queue.push side_q (`Pushed i)
|
||||
done);
|
||||
|
||||
(* make space for new element *)
|
||||
assert (BQ.pop bq = 1);
|
||||
assert (Bb_queue.pop side_q = `Pushed 3);
|
||||
assert (BQ.pop bq = 2);
|
||||
assert (BQ.pop bq = 3);
|
||||
for j = 4 to 10 do
|
||||
assert (BQ.pop bq = j);
|
||||
assert (Bb_queue.pop side_q = `Pushed j)
|
||||
done;
|
||||
assert (BQ.size bq = 0);
|
||||
()
|
||||
|
||||
let () =
|
||||
let bq = BQ.create ~max_size:5 () in
|
||||
|
||||
let bq1 = BQ.create ~max_size:10 () in
|
||||
let bq2 = BQ.create ~max_size:10 () in
|
||||
|
||||
let bq_res = BQ.create ~max_size:2 () in
|
||||
|
||||
(* diamond:
|
||||
bq -------> bq1
|
||||
| |
|
||||
| |
|
||||
v v
|
||||
bq2 -----> bq_res *)
|
||||
spawn (fun () ->
|
||||
BQ.to_iter bq (BQ.push bq1);
|
||||
BQ.close bq1);
|
||||
spawn (fun () ->
|
||||
BQ.to_iter bq (BQ.push bq2);
|
||||
BQ.close bq2);
|
||||
spawn (fun () -> BQ.to_iter bq1 (BQ.push bq_res));
|
||||
spawn (fun () -> BQ.to_iter bq2 (BQ.push bq_res));
|
||||
|
||||
let n = 100_000 in
|
||||
|
||||
(* push into [bq] *)
|
||||
let sum = A.make 0 in
|
||||
spawn (fun () ->
|
||||
for i = 1 to n do
|
||||
ignore (A.fetch_and_add sum i : int);
|
||||
BQ.push bq i
|
||||
done;
|
||||
BQ.close bq);
|
||||
|
||||
let sum' = ref 0 in
|
||||
for _j = 1 to n do
|
||||
let x = BQ.pop bq_res in
|
||||
sum' := x + !sum'
|
||||
done;
|
||||
assert (BQ.size bq_res = 0);
|
||||
assert (A.get sum = !sum')
|
||||
Loading…
Add table
Reference in a new issue