mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-09 12:45:46 -05:00
s_queue: improve try_pop, add try_push
This commit is contained in:
parent
d3bb7652ba
commit
e618739442
2 changed files with 43 additions and 10 deletions
|
|
@ -51,12 +51,35 @@ let pop (self : 'a t) : 'a =
|
||||||
in
|
in
|
||||||
loop ()
|
loop ()
|
||||||
|
|
||||||
let try_pop (self : _ t) : _ option =
|
let try_pop ~force_lock (self : _ t) : _ option =
|
||||||
Mutex.lock self.mutex;
|
let has_lock =
|
||||||
match Queue.pop self.q with
|
if force_lock then (
|
||||||
| x ->
|
Mutex.lock self.mutex;
|
||||||
Mutex.unlock self.mutex;
|
true
|
||||||
Some x
|
) else
|
||||||
| exception Queue.Empty ->
|
Mutex.try_lock self.mutex
|
||||||
Mutex.unlock self.mutex;
|
in
|
||||||
|
if has_lock then (
|
||||||
|
match Queue.pop self.q with
|
||||||
|
| x ->
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
Some x
|
||||||
|
| exception Queue.Empty ->
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
None
|
||||||
|
) else
|
||||||
None
|
None
|
||||||
|
|
||||||
|
let try_push (self : _ t) x : bool =
|
||||||
|
if Mutex.try_lock self.mutex then (
|
||||||
|
if self.closed then (
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
);
|
||||||
|
|
||||||
|
Queue.push x self.q;
|
||||||
|
Condition.signal self.cond;
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
true
|
||||||
|
) else
|
||||||
|
false
|
||||||
|
|
|
||||||
|
|
@ -14,9 +14,19 @@ val pop : 'a t -> 'a
|
||||||
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
||||||
@raise Closed if the queue was closed before a new element was available. *)
|
@raise Closed if the queue was closed before a new element was available. *)
|
||||||
|
|
||||||
val try_pop : 'a t -> 'a option
|
val try_pop : force_lock:bool -> 'a t -> 'a option
|
||||||
(** [try_pop q] immediately pops the first element of [q], if any,
|
(** [try_pop q] immediately pops the first element of [q], if any,
|
||||||
or returns [None] without blocking. *)
|
or returns [None] without blocking.
|
||||||
|
@param force_lock if true, use {!Mutex.lock} (which can block under contention);
|
||||||
|
if false, use {!Mutex.try_lock}, which might return [None] even in
|
||||||
|
presence of an element if there's contention *)
|
||||||
|
|
||||||
|
val try_push : 'a t -> 'a -> bool
|
||||||
|
(** [try_push q x] tries to push into [q], in which case
|
||||||
|
it returns [true]; or it fails to push and returns [false]
|
||||||
|
without blocking.
|
||||||
|
@raise Closed if the locking succeeded but the queue is closed.
|
||||||
|
*)
|
||||||
|
|
||||||
val close : _ t -> unit
|
val close : _ t -> unit
|
||||||
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue