mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
refactor chan; fix bug in Chan.try_push
we could return `false` even though we succeeded in pushing a value into the chan.
This commit is contained in:
parent
92300ad698
commit
a26503df0b
2 changed files with 26 additions and 22 deletions
|
|
@ -21,7 +21,6 @@ let create ~max_size () : _ t =
|
||||||
}
|
}
|
||||||
|
|
||||||
let try_push (self : _ t) x : bool =
|
let try_push (self : _ t) x : bool =
|
||||||
let res = ref false in
|
|
||||||
if Mutex.try_lock self.mutex then (
|
if Mutex.try_lock self.mutex then (
|
||||||
if self.closed then (
|
if self.closed then (
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
|
|
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
|
||||||
let to_awake = Queue.create () in
|
let to_awake = Queue.create () in
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Queue.transfer self.pop_waiters to_awake;
|
Queue.transfer self.pop_waiters to_awake;
|
||||||
res := true;
|
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
(* wake up pop triggers if needed. Be careful to do that
|
(* wake up pop triggers if needed. Be careful to do that
|
||||||
outside the critical section*)
|
outside the critical section*)
|
||||||
Queue.iter Trigger.signal to_awake
|
Queue.iter Trigger.signal to_awake;
|
||||||
|
true
|
||||||
| n when n < self.max_size ->
|
| n when n < self.max_size ->
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Mutex.unlock self.mutex
|
Mutex.unlock self.mutex;
|
||||||
| _ -> Mutex.unlock self.mutex
|
true
|
||||||
);
|
| _ ->
|
||||||
!res
|
Mutex.unlock self.mutex;
|
||||||
|
false
|
||||||
|
) else
|
||||||
|
false
|
||||||
|
|
||||||
let try_pop (type elt) self : elt option =
|
let try_pop (type elt) self : elt option =
|
||||||
let res = ref None in
|
|
||||||
if Mutex.try_lock self.mutex then (
|
if Mutex.try_lock self.mutex then (
|
||||||
(match Queue.pop self.q with
|
match Queue.pop self.q with
|
||||||
| exception Queue.Empty ->
|
| exception Queue.Empty ->
|
||||||
if self.closed then (
|
Mutex.unlock self.mutex;
|
||||||
Mutex.unlock self.mutex;
|
if self.closed then
|
||||||
raise Closed
|
raise Closed
|
||||||
)
|
else
|
||||||
| x -> res := Some x);
|
None
|
||||||
Mutex.unlock self.mutex
|
| x ->
|
||||||
);
|
Mutex.unlock self.mutex;
|
||||||
!res
|
Some x
|
||||||
|
) else
|
||||||
|
None
|
||||||
|
|
||||||
let close (self : _ t) : unit =
|
let close (self : _ t) : unit =
|
||||||
let q = Queue.create () in
|
let triggers_to_signal = Queue.create () in
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
if not self.closed then (
|
if not self.closed then (
|
||||||
self.closed <- true;
|
self.closed <- true;
|
||||||
Queue.transfer self.pop_waiters q;
|
Queue.transfer self.pop_waiters triggers_to_signal;
|
||||||
Queue.transfer self.push_waiters q
|
Queue.transfer self.push_waiters triggers_to_signal
|
||||||
);
|
);
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
Queue.iter Trigger.signal q
|
Queue.iter Trigger.signal triggers_to_signal
|
||||||
|
|
||||||
let rec push (self : _ t) x : unit =
|
let rec push (self : _ t) x : unit =
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
(** Channels.
|
(** Channels.
|
||||||
|
|
||||||
The channels have bounded size. Push/pop return futures or can use effects
|
The channels have bounded size. They use effects/await to provide
|
||||||
to provide an [await]-friendly version.
|
a direct style implementation. Pushing into a full channel,
|
||||||
|
or popping from an empty one, will suspend the current task.
|
||||||
|
|
||||||
The channels became bounded since @0.7 .
|
The channels became bounded since @0.7 .
|
||||||
*)
|
*)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue