diff --git a/src/core/bounded_queue.ml b/src/core/bounded_queue.ml deleted file mode 100644 index 550c119c..00000000 --- a/src/core/bounded_queue.ml +++ /dev/null @@ -1,182 +0,0 @@ -type 'a t = { - max_size: int; - q: 'a Queue.t; - mutex: Mutex.t; - cond_push: Condition.t; - cond_pop: Condition.t; - mutable closed: bool; -} - -exception Closed - -let create ~max_size () : _ t = - if max_size < 1 then invalid_arg "Bounded_queue.create"; - { - max_size; - mutex = Mutex.create (); - cond_push = Condition.create (); - cond_pop = Condition.create (); - q = Queue.create (); - closed = false; - } - -let close (self : _ t) = - Mutex.lock self.mutex; - if not self.closed then ( - self.closed <- true; - (* awake waiters so they fail *) - Condition.broadcast self.cond_push; - Condition.broadcast self.cond_pop - ); - Mutex.unlock self.mutex - -(** Check if the queue is full. Precondition: [self.mutex] is acquired. *) -let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size - -let push (self : _ t) x : unit = - let continue = ref true in - Mutex.lock self.mutex; - while !continue do - if self.closed then ( - (* push always fails on a closed queue *) - Mutex.unlock self.mutex; - raise Closed - ) else if is_full_ self then - Condition.wait self.cond_push self.mutex - else ( - let was_empty = Queue.is_empty self.q in - Queue.push x self.q; - if was_empty then Condition.broadcast self.cond_pop; - - (* exit loop *) - continue := false; - Mutex.unlock self.mutex - ) - done - -let pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - let rec loop () = - if Queue.is_empty self.q then ( - if self.closed then ( - (* pop fails on a closed queue if it's also empty, - otherwise it still returns the remaining elements *) - Mutex.unlock self.mutex; - raise Closed - ); - - Condition.wait self.cond_pop self.mutex; - (loop [@tailcall]) () - ) else ( - let was_full = is_full_ self in - let x = Queue.pop self.q in - (* wakeup pushers that were blocked *) - if was_full then Condition.broadcast self.cond_push; - Mutex.unlock self.mutex; - x - ) - in - loop () - -let try_pop ~force_lock (self : _ t) : _ option = - let has_lock = - if force_lock then ( - Mutex.lock self.mutex; - true - ) else - Mutex.try_lock self.mutex - in - if has_lock then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - let was_full_before_pop = is_full_ self in - match Queue.pop self.q with - | x -> - (* wakeup pushers that are blocked *) - if was_full_before_pop then Condition.broadcast self.cond_push; - Mutex.unlock self.mutex; - Some x - | exception Queue.Empty -> - Mutex.unlock self.mutex; - None - ) else - None - -let try_push ~force_lock (self : _ t) x : bool = - let has_lock = - if force_lock then ( - Mutex.lock self.mutex; - true - ) else - Mutex.try_lock self.mutex - in - if has_lock then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - - if is_full_ self then ( - Mutex.unlock self.mutex; - false - ) else ( - let was_empty = Queue.is_empty self.q in - Queue.push x self.q; - if was_empty then Condition.broadcast self.cond_pop; - Mutex.unlock self.mutex; - true - ) - ) else - false - -let[@inline] max_size self = self.max_size - -let size (self : _ t) : int = - Mutex.lock self.mutex; - let n = Queue.length self.q in - Mutex.unlock self.mutex; - n - -let transfer (self : 'a t) q2 : unit = - Mutex.lock self.mutex; - let continue = ref true in - while !continue do - if Queue.is_empty self.q then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - Condition.wait self.cond_pop self.mutex - ) else ( - let was_full = is_full_ self in - Queue.transfer self.q q2; - if was_full then Condition.broadcast self.cond_push; - continue := false; - Mutex.unlock self.mutex - ) - done - -type 'a gen = unit -> 'a option -type 'a iter = ('a -> unit) -> unit - -let to_iter self k = - try - while true do - let x = pop self in - k x - done - with Closed -> () - -let to_gen self : _ gen = - fun () -> - match pop self with - | exception Closed -> None - | x -> Some x - -let rec to_seq self : _ Seq.t = - fun () -> - match pop self with - | exception Closed -> Seq.Nil - | x -> Seq.Cons (x, to_seq self) diff --git a/src/core/bounded_queue.mli b/src/core/bounded_queue.mli deleted file mode 100644 index 165f7681..00000000 --- a/src/core/bounded_queue.mli +++ /dev/null @@ -1,82 +0,0 @@ -(** A blocking queue of finite size. - - This queue, while still using locks underneath (like the regular blocking - queue) should be enough for usage under reasonable contention. - - The bounded size is helpful whenever some form of backpressure is desirable: - if the queue is used to communicate between producer(s) and consumer(s), the - consumer(s) can limit the rate at which producer(s) send new work down their - way. Whenever the queue is full, means that producer(s) will have to wait - before pushing new work. - - @since 0.4 *) - -type 'a t -(** A bounded queue. *) - -val create : max_size:int -> unit -> 'a t - -val close : _ t -> unit -(** [close q] closes [q]. No new elements can be pushed into [q], and after all - the elements still in [q] currently are [pop]'d, {!pop} will also raise - {!Closed}. *) - -exception Closed - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will - block until there is room for [x]. - @raise Closed if [q] is closed. *) - -val try_push : force_lock:bool -> 'a t -> 'a -> bool -(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot - acquire [q] or if [q] is full. - - @param force_lock - if true, use {!Mutex.lock} (which can block under contention); if false, - use {!Mutex.try_lock}, which might return [false] even if there's room in - the queue. - - @raise Closed if [q] is closed. *) - -val pop : 'a t -> 'a -(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until - some element becomes available. - @raise Closed if [q] is empty and closed. *) - -val try_pop : force_lock:bool -> 'a t -> 'a option -(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if - no element is available or if it failed to acquire [q]. - - @param force_lock - if true, use {!Mutex.lock} (which can block under contention); if false, - use {!Mutex.try_lock}, which might return [None] even in presence of an - element if there's contention. - - @raise Closed if [q] is empty and closed. *) - -val size : _ t -> int -(** Number of elements currently in [q] *) - -val max_size : _ t -> int -(** Maximum size of the queue. See {!create}. *) - -val transfer : 'a t -> 'a Queue.t -> unit -(** [transfer bq q2] transfers all elements currently available in [bq] into - local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty. - - See {!Bb_queue.transfer} for more details. - @raise Closed if [bq] is empty and closed. *) - -type 'a gen = unit -> 'a option -type 'a iter = ('a -> unit) -> unit - -val to_iter : 'a t -> 'a iter -(** [to_iter q] returns an iterator over all items in the queue. This might not - terminate if [q] is never closed. *) - -val to_gen : 'a t -> 'a gen -(** [to_gen q] returns a generator from the queue. *) - -val to_seq : 'a t -> 'a Seq.t -(** [to_gen q] returns a (transient) sequence from the queue. *) diff --git a/src/core/moonpool.ml b/src/core/moonpool.ml index 2d009065..d9f72b51 100644 --- a/src/core/moonpool.ml +++ b/src/core/moonpool.ml @@ -23,7 +23,6 @@ let yield = Picos.Fiber.yield module Atomic = Atomic_ module Blocking_queue = Bb_queue module Background_thread = Background_thread -module Bounded_queue = Bounded_queue module Chan = Chan module Exn_bt = Exn_bt module Fifo_pool = Fifo_pool diff --git a/src/core/moonpool.mli b/src/core/moonpool.mli index aa4548a5..3a5f5eef 100644 --- a/src/core/moonpool.mli +++ b/src/core/moonpool.mli @@ -203,8 +203,6 @@ module Blocking_queue : sig @since 0.4 *) end -module Bounded_queue = Bounded_queue - module Atomic = Atomic_ (** Atomic values. diff --git a/test/dune b/test/dune index af881591..38b6a9c8 100644 --- a/test/dune +++ b/test/dune @@ -10,8 +10,7 @@ t_resource t_unfair t_ws_deque - t_ws_wait - t_bounded_queue) + t_ws_wait) (package moonpool) (libraries moonpool diff --git a/test/t_bounded_queue.ml b/test/t_bounded_queue.ml deleted file mode 100644 index 25302896..00000000 --- a/test/t_bounded_queue.ml +++ /dev/null @@ -1,111 +0,0 @@ -module BQ = Moonpool.Bounded_queue -module Bb_queue = Moonpool.Blocking_queue -module A = Moonpool.Atomic - -let spawn f = ignore (Moonpool.start_thread_on_some_domain f () : Thread.t) - -let () = - let bq = BQ.create ~max_size:3 () in - BQ.push bq 1; - BQ.push bq 2; - assert (BQ.size bq = 2); - assert (BQ.pop bq = 1); - assert (BQ.pop bq = 2); - - assert (BQ.try_pop ~force_lock:true bq = None); - spawn (fun () -> BQ.push bq 3); - assert (BQ.pop bq = 3) - -let () = - (* cannot create with size 0 *) - assert ( - try - ignore (BQ.create ~max_size:0 ()); - false - with _ -> true) - -let () = - let bq = BQ.create ~max_size:3 () in - BQ.push bq 1; - BQ.push bq 2; - assert (BQ.size bq = 2); - assert (BQ.pop bq = 1); - - BQ.close bq; - assert (BQ.pop bq = 2); - assert ( - try - ignore (BQ.pop bq); - false - with BQ.Closed -> true); - assert ( - try - ignore (BQ.push bq 42); - false - with BQ.Closed -> true) - -let () = - let bq = BQ.create ~max_size:2 () in - let side_q = Bb_queue.create () in - BQ.push bq 1; - BQ.push bq 2; - - spawn (fun () -> - for i = 3 to 10 do - BQ.push bq i; - Bb_queue.push side_q (`Pushed i) - done); - - (* make space for new element *) - assert (BQ.pop bq = 1); - assert (Bb_queue.pop side_q = `Pushed 3); - assert (BQ.pop bq = 2); - assert (BQ.pop bq = 3); - for j = 4 to 10 do - assert (BQ.pop bq = j); - assert (Bb_queue.pop side_q = `Pushed j) - done; - assert (BQ.size bq = 0); - () - -let () = - let bq = BQ.create ~max_size:5 () in - - let bq1 = BQ.create ~max_size:10 () in - let bq2 = BQ.create ~max_size:10 () in - - let bq_res = BQ.create ~max_size:2 () in - - (* diamond: - bq -------> bq1 - | | - | | - v v - bq2 -----> bq_res *) - spawn (fun () -> - BQ.to_iter bq (BQ.push bq1); - BQ.close bq1); - spawn (fun () -> - BQ.to_iter bq (BQ.push bq2); - BQ.close bq2); - spawn (fun () -> BQ.to_iter bq1 (BQ.push bq_res)); - spawn (fun () -> BQ.to_iter bq2 (BQ.push bq_res)); - - let n = 100_000 in - - (* push into [bq] *) - let sum = A.make 0 in - spawn (fun () -> - for i = 1 to n do - ignore (A.fetch_and_add sum i : int); - BQ.push bq i - done; - BQ.close bq); - - let sum' = ref 0 in - for _j = 1 to n do - let x = BQ.pop bq_res in - sum' := x + !sum' - done; - assert (BQ.size bq_res = 0); - assert (A.get sum = !sum')