From 078e7414bc4ca8a059858093175d24305f434863 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 12 Oct 2023 20:19:10 -0400 Subject: [PATCH] fixes for blocking queue --- src/bounded_queue.ml | 22 +++++++++++++++++----- src/bounded_queue.mli | 19 +++++++++++-------- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/bounded_queue.ml b/src/bounded_queue.ml index 0f63847e..550c119c 100644 --- a/src/bounded_queue.ml +++ b/src/bounded_queue.ml @@ -70,9 +70,9 @@ let pop (self : 'a t) : 'a = ) else ( let was_full = is_full_ self in let x = Queue.pop self.q in - Mutex.unlock self.mutex; - (* wakeup pushers *) + (* wakeup pushers that were blocked *) if was_full then Condition.broadcast self.cond_push; + Mutex.unlock self.mutex; x ) in @@ -87,11 +87,16 @@ let try_pop ~force_lock (self : _ t) : _ option = Mutex.try_lock self.mutex in if has_lock then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); let was_full_before_pop = is_full_ self in match Queue.pop self.q with | x -> - Mutex.unlock self.mutex; + (* wakeup pushers that are blocked *) if was_full_before_pop then Condition.broadcast self.cond_push; + Mutex.unlock self.mutex; Some x | exception Queue.Empty -> Mutex.unlock self.mutex; @@ -99,8 +104,15 @@ let try_pop ~force_lock (self : _ t) : _ option = ) else None -let try_push (self : _ t) x : bool = - if Mutex.try_lock self.mutex then ( +let try_push ~force_lock (self : _ t) x : bool = + let has_lock = + if force_lock then ( + Mutex.lock self.mutex; + true + ) else + Mutex.try_lock self.mutex + in + if has_lock then ( if self.closed then ( Mutex.unlock self.mutex; raise Closed diff --git a/src/bounded_queue.mli b/src/bounded_queue.mli index 0be0b692..85aea884 100644 --- a/src/bounded_queue.mli +++ b/src/bounded_queue.mli @@ -31,9 +31,15 @@ val push : 'a t -> 'a -> unit room for [x]. @raise Closed if [q] is closed. *) -val try_push : 'a t -> 'a -> bool +val try_push : force_lock:bool -> 'a t -> 'a -> bool (** [try_push q x] attempts to push [x] into [q], but abandons if it cannot acquire [q] or if [q] is full. + + @param force_lock if true, use {!Mutex.lock} (which can block + under contention); + if false, use {!Mutex.try_lock}, which might return [false] even + if there's room in the queue. + @raise Closed if [q] is closed. *) val pop : 'a t -> 'a @@ -42,7 +48,7 @@ val pop : 'a t -> 'a @raise Closed if [q] is empty and closed. *) val try_pop : force_lock:bool -> 'a t -> 'a option -(** [try_pop q] tries to pop the first element, or returns [None] +(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if no element is available or if it failed to acquire [q]. @param force_lock if true, use {!Mutex.lock} (which can block @@ -71,13 +77,10 @@ type 'a iter = ('a -> unit) -> unit val to_iter : 'a t -> 'a iter (** [to_iter q] returns an iterator over all items in the queue. - This might not terminate if [q] is never closed. - @since NEXT_RELEASE *) + This might not terminate if [q] is never closed. *) val to_gen : 'a t -> 'a gen -(** [to_gen q] returns a generator from the queue. - @since NEXT_RELEASE *) +(** [to_gen q] returns a generator from the queue. *) val to_seq : 'a t -> 'a Seq.t -(** [to_gen q] returns a (transient) sequence from the queue. - @since NEXT_RELEASE *) +(** [to_gen q] returns a (transient) sequence from the queue. *)