mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-18 00:26:43 -05:00
Compare commits
6 commits
95de0e7e27
...
4b9e480013
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b9e480013 | ||
|
|
f8e553d473 | ||
|
|
7082447073 | ||
|
|
6715502fdd | ||
|
|
e95f0e421d | ||
|
|
3e2ce57669 |
4 changed files with 26 additions and 29 deletions
|
|
@ -21,6 +21,7 @@ let create ~max_size () : _ t =
|
||||||
}
|
}
|
||||||
|
|
||||||
let try_push (self : _ t) x : bool =
|
let try_push (self : _ t) x : bool =
|
||||||
|
let res = ref false in
|
||||||
if Mutex.try_lock self.mutex then (
|
if Mutex.try_lock self.mutex then (
|
||||||
if self.closed then (
|
if self.closed then (
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
|
|
@ -32,46 +33,42 @@ let try_push (self : _ t) x : bool =
|
||||||
let to_awake = Queue.create () in
|
let to_awake = Queue.create () in
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Queue.transfer self.pop_waiters to_awake;
|
Queue.transfer self.pop_waiters to_awake;
|
||||||
|
res := true;
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
(* wake up pop triggers if needed. Be careful to do that
|
(* wake up pop triggers if needed. Be careful to do that
|
||||||
outside the critical section*)
|
outside the critical section*)
|
||||||
Queue.iter Trigger.signal to_awake;
|
Queue.iter Trigger.signal to_awake
|
||||||
true
|
|
||||||
| n when n < self.max_size ->
|
| n when n < self.max_size ->
|
||||||
Queue.push x self.q;
|
Queue.push x self.q;
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex
|
||||||
true
|
| _ -> Mutex.unlock self.mutex
|
||||||
| _ ->
|
);
|
||||||
Mutex.unlock self.mutex;
|
!res
|
||||||
false
|
|
||||||
) else
|
|
||||||
false
|
|
||||||
|
|
||||||
let try_pop (type elt) self : elt option =
|
let try_pop (type elt) self : elt option =
|
||||||
|
let res = ref None in
|
||||||
if Mutex.try_lock self.mutex then (
|
if Mutex.try_lock self.mutex then (
|
||||||
match Queue.pop self.q with
|
(match Queue.pop self.q with
|
||||||
| exception Queue.Empty ->
|
| exception Queue.Empty ->
|
||||||
Mutex.unlock self.mutex;
|
if self.closed then (
|
||||||
if self.closed then
|
Mutex.unlock self.mutex;
|
||||||
raise Closed
|
raise Closed
|
||||||
else
|
)
|
||||||
None
|
| x -> res := Some x);
|
||||||
| x ->
|
Mutex.unlock self.mutex
|
||||||
Mutex.unlock self.mutex;
|
);
|
||||||
Some x
|
!res
|
||||||
) else
|
|
||||||
None
|
|
||||||
|
|
||||||
let close (self : _ t) : unit =
|
let close (self : _ t) : unit =
|
||||||
let triggers_to_signal = Queue.create () in
|
let q = Queue.create () in
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
if not self.closed then (
|
if not self.closed then (
|
||||||
self.closed <- true;
|
self.closed <- true;
|
||||||
Queue.transfer self.pop_waiters triggers_to_signal;
|
Queue.transfer self.pop_waiters q;
|
||||||
Queue.transfer self.push_waiters triggers_to_signal
|
Queue.transfer self.push_waiters q
|
||||||
);
|
);
|
||||||
Mutex.unlock self.mutex;
|
Mutex.unlock self.mutex;
|
||||||
Queue.iter Trigger.signal triggers_to_signal
|
Queue.iter Trigger.signal q
|
||||||
|
|
||||||
let rec push (self : _ t) x : unit =
|
let rec push (self : _ t) x : unit =
|
||||||
Mutex.lock self.mutex;
|
Mutex.lock self.mutex;
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,7 @@
|
||||||
(** Channels.
|
(** Channels.
|
||||||
|
|
||||||
The channels have bounded size. They use effects/await to provide
|
The channels have bounded size. Push/pop return futures or can use effects
|
||||||
a direct style implementation. Pushing into a full channel,
|
to provide an [await]-friendly version.
|
||||||
or popping from an empty one, will suspend the current task.
|
|
||||||
|
|
||||||
The channels became bounded since @0.7 .
|
The channels became bounded since @0.7 .
|
||||||
*)
|
*)
|
||||||
|
|
|
||||||
|
|
@ -431,7 +431,8 @@ let await (self : 'a t) : 'a =
|
||||||
| exception C.Running ->
|
| exception C.Running ->
|
||||||
let trigger = Trigger.create () in
|
let trigger = Trigger.create () in
|
||||||
(* suspend until the future is resolved *)
|
(* suspend until the future is resolved *)
|
||||||
if C.try_attach self trigger then Trigger.await_exn trigger;
|
if C.try_attach self trigger then
|
||||||
|
Option.iter Exn_bt.raise @@ Trigger.await trigger;
|
||||||
|
|
||||||
(* un-suspended: we should have a result! *)
|
(* un-suspended: we should have a result! *)
|
||||||
get_or_fail_exn self
|
get_or_fail_exn self
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ module State_ = struct
|
||||||
done;
|
done;
|
||||||
|
|
||||||
(* wait for the other computation to be done *)
|
(* wait for the other computation to be done *)
|
||||||
if !must_await then Trigger.await_exn trigger
|
if !must_await then Trigger.await trigger |> Option.iter Exn_bt.raise
|
||||||
| Right_solved _ | Both_solved _ -> assert false
|
| Right_solved _ | Both_solved _ -> assert false
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -144,7 +144,7 @@ let for_ ?chunk_size n (f : int -> int -> unit) : unit =
|
||||||
i := !i + len_range
|
i := !i + len_range
|
||||||
done;
|
done;
|
||||||
|
|
||||||
Trigger.await_exn trigger;
|
Trigger.await trigger |> Option.iter Exn_bt.raise;
|
||||||
Option.iter Exn_bt.raise @@ A.get failure;
|
Option.iter Exn_bt.raise @@ A.get failure;
|
||||||
()
|
()
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue