diff --git a/src/s_queue.ml b/src/s_queue.ml index fb639f85..c3d157e4 100644 --- a/src/s_queue.ml +++ b/src/s_queue.ml @@ -51,12 +51,35 @@ let pop (self : 'a t) : 'a = in loop () -let try_pop (self : _ t) : _ option = - Mutex.lock self.mutex; - match Queue.pop self.q with - | x -> - Mutex.unlock self.mutex; - Some x - | exception Queue.Empty -> - Mutex.unlock self.mutex; +let try_pop ~force_lock (self : _ t) : _ option = + let has_lock = + if force_lock then ( + Mutex.lock self.mutex; + true + ) else + Mutex.try_lock self.mutex + in + if has_lock then ( + match Queue.pop self.q with + | x -> + Mutex.unlock self.mutex; + Some x + | exception Queue.Empty -> + Mutex.unlock self.mutex; + None + ) else None + +let try_push (self : _ t) x : bool = + if Mutex.try_lock self.mutex then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex; + true + ) else + false diff --git a/src/s_queue.mli b/src/s_queue.mli index d09b80c6..644c1b86 100644 --- a/src/s_queue.mli +++ b/src/s_queue.mli @@ -14,9 +14,19 @@ val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. @raise Closed if the queue was closed before a new element was available. *) -val try_pop : 'a t -> 'a option +val try_pop : force_lock:bool -> 'a t -> 'a option (** [try_pop q] immediately pops the first element of [q], if any, - or returns [None] without blocking. *) + or returns [None] without blocking. + @param force_lock if true, use {!Mutex.lock} (which can block under contention); + if false, use {!Mutex.try_lock}, which might return [None] even in + presence of an element if there's contention *) + +val try_push : 'a t -> 'a -> bool +(** [try_push q x] tries to push into [q], in which case + it returns [true]; or it fails to push and returns [false] + without blocking. + @raise Closed if the locking succeeded but the queue is closed. +*) val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *)