mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
Compare commits
2 commits
92300ad698
...
d957f7b54e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d957f7b54e | ||
|
|
a26503df0b |
4 changed files with 29 additions and 26 deletions
|
|
@ -21,7 +21,6 @@ let create ~max_size () : _ t =
|
|||
}
|
||||
|
||||
let try_push (self : _ t) x : bool =
|
||||
let res = ref false in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
|
|
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
|
|||
let to_awake = Queue.create () in
|
||||
Queue.push x self.q;
|
||||
Queue.transfer self.pop_waiters to_awake;
|
||||
res := true;
|
||||
Mutex.unlock self.mutex;
|
||||
(* wake up pop triggers if needed. Be careful to do that
|
||||
outside the critical section*)
|
||||
Queue.iter Trigger.signal to_awake
|
||||
Queue.iter Trigger.signal to_awake;
|
||||
true
|
||||
| n when n < self.max_size ->
|
||||
Queue.push x self.q;
|
||||
Mutex.unlock self.mutex
|
||||
| _ -> Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
Mutex.unlock self.mutex;
|
||||
true
|
||||
| _ ->
|
||||
Mutex.unlock self.mutex;
|
||||
false
|
||||
) else
|
||||
false
|
||||
|
||||
let try_pop (type elt) self : elt option =
|
||||
let res = ref None in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
(match Queue.pop self.q with
|
||||
match Queue.pop self.q with
|
||||
| exception Queue.Empty ->
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
if self.closed then
|
||||
raise Closed
|
||||
)
|
||||
| x -> res := Some x);
|
||||
Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
else
|
||||
None
|
||||
| x ->
|
||||
Mutex.unlock self.mutex;
|
||||
Some x
|
||||
) else
|
||||
None
|
||||
|
||||
let close (self : _ t) : unit =
|
||||
let q = Queue.create () in
|
||||
let triggers_to_signal = Queue.create () in
|
||||
Mutex.lock self.mutex;
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
Queue.transfer self.pop_waiters q;
|
||||
Queue.transfer self.push_waiters q
|
||||
Queue.transfer self.pop_waiters triggers_to_signal;
|
||||
Queue.transfer self.push_waiters triggers_to_signal
|
||||
);
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal q
|
||||
Queue.iter Trigger.signal triggers_to_signal
|
||||
|
||||
let rec push (self : _ t) x : unit =
|
||||
Mutex.lock self.mutex;
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
(** Channels.
|
||||
|
||||
The channels have bounded size. Push/pop return futures or can use effects
|
||||
to provide an [await]-friendly version.
|
||||
The channels have bounded size. They use effects/await to provide
|
||||
a direct style implementation. Pushing into a full channel,
|
||||
or popping from an empty one, will suspend the current task.
|
||||
|
||||
The channels became bounded since @0.7 .
|
||||
*)
|
||||
|
|
|
|||
|
|
@ -431,8 +431,7 @@ let await (self : 'a t) : 'a =
|
|||
| exception C.Running ->
|
||||
let trigger = Trigger.create () in
|
||||
(* suspend until the future is resolved *)
|
||||
if C.try_attach self trigger then
|
||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||
if C.try_attach self trigger then Trigger.await_exn trigger;
|
||||
|
||||
(* un-suspended: we should have a result! *)
|
||||
get_or_fail_exn self
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ module State_ = struct
|
|||
done;
|
||||
|
||||
(* wait for the other computation to be done *)
|
||||
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
|
||||
if !must_await then Trigger.await_exn trigger
|
||||
| Right_solved _ | Both_solved _ -> assert false
|
||||
end
|
||||
|
||||
|
|
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
|
|||
i := !i + len_range
|
||||
done;
|
||||
|
||||
Trigger.await trigger |> Option.iter Exn_bt.raise;
|
||||
Trigger.await_exn trigger;
|
||||
Option.iter Exn_bt.raise @@ A.get failure;
|
||||
()
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue