diff --git a/src/bb_queue.ml b/src/bb_queue.ml index c3d157e4..1cec05db 100644 --- a/src/bb_queue.ml +++ b/src/bb_queue.ml @@ -83,3 +83,9 @@ let try_push (self : _ t) x : bool = true ) else false + +let size (self : _ t) : int = + Mutex.lock self.mutex; + let n = Queue.length self.q in + Mutex.unlock self.mutex; + n diff --git a/src/bb_queue.mli b/src/bb_queue.mli index 4dee92b6..75ce1521 100644 --- a/src/bb_queue.mli +++ b/src/bb_queue.mli @@ -10,6 +10,10 @@ val push : 'a t -> 'a -> unit (** [push q x] pushes [x] into [q], and returns [()]. @raise Closed if [close q] was previously called.*) +val size : _ t -> int +(** Number of items currently in the queue. + @since 0.2 *) + val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. @raise Closed if the queue was closed before a new element was available. *) diff --git a/src/moonpool.mli b/src/moonpool.mli index ca809bf7..90362198 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -40,6 +40,12 @@ module Blocking_queue : sig val create : unit -> _ t (** Create a new unbounded queue. *) + val size : _ t -> int + (** Number of items currently in the queue. Note that [pop] + might still block if this returns a non-zero number, since another + thread might have consumed the items in the mean time. + @since 0.2 *) + exception Closed val push : 'a t -> 'a -> unit