mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
Compare commits
2 commits
92300ad698
...
d957f7b54e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d957f7b54e | ||
|
|
a26503df0b |
4 changed files with 29 additions and 26 deletions
|
|
@ -21,7 +21,6 @@ let create ~max_size () : _ t =
|
||||||
}
|
}
|
||||||
|
|
||||||
let try_push (self : _ t) x : bool =
|
let try_push (self : _ t) x : bool =
|
||||||
let res = ref false in
|
|
||||||
if Mutex.try_lock self.mutex then (
|
if Mutex.try_lock self.mutex then (
|
||||||
if self.closed then (
|
if self.closed then (
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
|
|
@ -33,42 +32,46 @@ let try_push (self : _ t) x : bool =
|
||||||
let to_awake = Queue.create () in
|
let to_awake = Queue.create () in
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Queue.transfer self.pop_waiters to_awake;
|
Queue.transfer self.pop_waiters to_awake;
|
||||||
res := true;
|
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
(* wake up pop triggers if needed. Be careful to do that
|
(* wake up pop triggers if needed. Be careful to do that
|
||||||
outside the critical section*)
|
outside the critical section*)
|
||||||
Queue.iter Trigger.signal to_awake
|
Queue.iter Trigger.signal to_awake;
|
||||||
|
true
|
||||||
| n when n < self.max_size ->
|
| n when n < self.max_size ->
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Mutex.unlock self.mutex
|
Mutex.unlock self.mutex;
|
||||||
| _ -> Mutex.unlock self.mutex
|
true
|
||||||
);
|
| _ ->
|
||||||
!res
|
Mutex.unlock self.mutex;
|
||||||
|
false
|
||||||
|
) else
|
||||||
|
false
|
||||||
|
|
||||||
let try_pop (type elt) self : elt option =
|
let try_pop (type elt) self : elt option =
|
||||||
let res = ref None in
|
|
||||||
if Mutex.try_lock self.mutex then (
|
if Mutex.try_lock self.mutex then (
|
||||||
(match Queue.pop self.q with
|
match Queue.pop self.q with
|
||||||
| exception Queue.Empty ->
|
| exception Queue.Empty ->
|
||||||
if self.closed then (
|
Mutex.unlock self.mutex;
|
||||||
Mutex.unlock self.mutex;
|
if self.closed then
|
||||||
raise Closed
|
raise Closed
|
||||||
)
|
else
|
||||||
| x -> res := Some x);
|
None
|
||||||
Mutex.unlock self.mutex
|
| x ->
|
||||||
);
|
Mutex.unlock self.mutex;
|
||||||
!res
|
Some x
|
||||||
|
) else
|
||||||
|
None
|
||||||
|
|
||||||
let close (self : _ t) : unit =
|
let close (self : _ t) : unit =
|
||||||
let q = Queue.create () in
|
let triggers_to_signal = Queue.create () in
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
if not self.closed then (
|
if not self.closed then (
|
||||||
self.closed <- true;
|
self.closed <- true;
|
||||||
Queue.transfer self.pop_waiters q;
|
Queue.transfer self.pop_waiters triggers_to_signal;
|
||||||
Queue.transfer self.push_waiters q
|
Queue.transfer self.push_waiters triggers_to_signal
|
||||||
);
|
);
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
Queue.iter Trigger.signal q
|
Queue.iter Trigger.signal triggers_to_signal
|
||||||
|
|
||||||
let rec push (self : _ t) x : unit =
|
let rec push (self : _ t) x : unit =
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
(** Channels.
|
(** Channels.
|
||||||
|
|
||||||
The channels have bounded size. Push/pop return futures or can use effects
|
The channels have bounded size. They use effects/await to provide
|
||||||
to provide an [await]-friendly version.
|
a direct style implementation. Pushing into a full channel,
|
||||||
|
or popping from an empty one, will suspend the current task.
|
||||||
|
|
||||||
The channels became bounded since @0.7 .
|
The channels became bounded since @0.7 .
|
||||||
*)
|
*)
|
||||||
|
|
|
||||||
|
|
@ -431,8 +431,7 @@ let await (self : 'a t) : 'a =
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
let trigger = Trigger.create () in
|
let trigger = Trigger.create () in
|
||||||
(* suspend until the future is resolved *)
|
(* suspend until the future is resolved *)
|
||||||
if C.try_attach self trigger then
|
if C.try_attach self trigger then Trigger.await_exn trigger;
|
||||||
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
|
||||||
|
|
||||||
(* un-suspended: we should have a result! *)
|
(* un-suspended: we should have a result! *)
|
||||||
get_or_fail_exn self
|
get_or_fail_exn self
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ module State_ = struct
|
||||||
done;
|
done;
|
||||||
|
|
||||||
(* wait for the other computation to be done *)
|
(* wait for the other computation to be done *)
|
||||||
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
|
if !must_await then Trigger.await_exn trigger
|
||||||
| Right_solved _ | Both_solved _ -> assert false
|
| Right_solved _ | Both_solved _ -> assert false
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
|
||||||
i := !i + len_range
|
i := !i + len_range
|
||||||
done;
|
done;
|
||||||
|
|
||||||
Trigger.await trigger |> Option.iter Exn_bt.raise;
|
Trigger.await_exn trigger;
|
||||||
Option.iter Exn_bt.raise @@ A.get failure;
|
Option.iter Exn_bt.raise @@ A.get failure;
|
||||||
()
|
()
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue