fixes for blocking queue

This commit is contained in:
Simon Cruanes 2023-10-12 20:19:10 -04:00
parent 9f976f4092
commit 078e7414bc
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 28 additions and 13 deletions

View file

@ -70,9 +70,9 @@ let pop (self : 'a t) : 'a =
) else (
let was_full = is_full_ self in
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
(* wakeup pushers *)
(* wakeup pushers that were blocked *)
if was_full then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
x
)
in
@ -87,11 +87,16 @@ let try_pop ~force_lock (self : _ t) : _ option =
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
let was_full_before_pop = is_full_ self in
match Queue.pop self.q with
| x ->
Mutex.unlock self.mutex;
(* wakeup pushers that are blocked *)
if was_full_before_pop then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
Some x
| exception Queue.Empty ->
Mutex.unlock self.mutex;
@ -99,8 +104,15 @@ let try_pop ~force_lock (self : _ t) : _ option =
) else
None
let try_push (self : _ t) x : bool =
if Mutex.try_lock self.mutex then (
let try_push ~force_lock (self : _ t) x : bool =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed

View file

@ -31,9 +31,15 @@ val push : 'a t -> 'a -> unit
room for [x].
@raise Closed if [q] is closed. *)
val try_push : 'a t -> 'a -> bool
val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons
if it cannot acquire [q] or if [q] is full.
@param force_lock if true, use {!Mutex.lock} (which can block
under contention);
if false, use {!Mutex.try_lock}, which might return [false] even
if there's room in the queue.
@raise Closed if [q] is closed. *)
val pop : 'a t -> 'a
@ -42,7 +48,7 @@ val pop : 'a t -> 'a
@raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop q] tries to pop the first element, or returns [None]
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None]
if no element is available or if it failed to acquire [q].
@param force_lock if true, use {!Mutex.lock} (which can block
@ -71,13 +77,10 @@ type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue.
This might not terminate if [q] is never closed.
@since NEXT_RELEASE *)
This might not terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue.
@since NEXT_RELEASE *)
(** [to_gen q] returns a generator from the queue. *)
val to_seq : 'a t -> 'a Seq.t
(** [to_gen q] returns a (transient) sequence from the queue.
@since NEXT_RELEASE *)
(** [to_gen q] returns a (transient) sequence from the queue. *)