mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-05 19:00:33 -05:00
wip: change Moonpool.Chan so it's bounded
also push/pop require effects, the OCaml 4 version only allows for try_push/try_pop.
This commit is contained in:
parent
d8aa60558b
commit
35a69924d3
3 changed files with 192 additions and 148 deletions
290
src/core/chan.ml
290
src/core/chan.ml
|
|
@ -1,7 +1,8 @@
|
|||
module A = Atomic_
|
||||
|
||||
type 'a or_error = 'a Fut.or_error
|
||||
type 'a waiter = 'a Fut.promise
|
||||
type 'a pop_waiter = 'a Fut.promise
|
||||
type 'a push_waiter = 'a * unit Fut.promise
|
||||
|
||||
let[@inline] list_is_empty_ = function
|
||||
| [] -> true
|
||||
|
|
@ -11,6 +12,7 @@ let[@inline] list_is_empty_ = function
|
|||
module Q : sig
|
||||
type 'a t
|
||||
|
||||
val empty : 'a t
|
||||
val return : 'a -> 'a t
|
||||
val is_empty : _ t -> bool
|
||||
|
||||
|
|
@ -28,6 +30,7 @@ end = struct
|
|||
|
||||
invariant: if hd=[], then tl=[] *)
|
||||
|
||||
let empty = { hd = []; tl = [] }
|
||||
let[@inline] return x : _ t = { hd = [ x ]; tl = [] }
|
||||
|
||||
let[@inline] make_ hd tl =
|
||||
|
|
@ -56,138 +59,177 @@ end
|
|||
|
||||
exception Closed
|
||||
|
||||
type 'a state =
|
||||
| Empty
|
||||
| St_closed
|
||||
| Elems of 'a Q.t
|
||||
| Waiters of 'a waiter Q.t
|
||||
type 'a t = {
|
||||
q: 'a Queue.t;
|
||||
mutex: Mutex.t; (** protects critical section *)
|
||||
mutable closed: bool;
|
||||
max_size: int;
|
||||
push_waiters: Trigger.t Queue.t;
|
||||
pop_waiters: Trigger.t Queue.t;
|
||||
}
|
||||
|
||||
type 'a t = { st: 'a state A.t } [@@unboxed]
|
||||
let create ~max_size () : _ t =
|
||||
if max_size < 0 then invalid_arg "Chan: max_size < 0";
|
||||
{
|
||||
max_size;
|
||||
mutex = Mutex.create ();
|
||||
closed = false;
|
||||
q = Queue.create ();
|
||||
push_waiters = Queue.create ();
|
||||
pop_waiters = Queue.create ();
|
||||
}
|
||||
|
||||
let create () : _ t = { st = A.make Empty }
|
||||
let try_push (self : _ t) x : bool =
|
||||
let res = ref false in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
(** Produce a state from a queue of waiters *)
|
||||
let[@inline] mk_st_waiters_ ws : _ state =
|
||||
if Q.is_empty ws then
|
||||
Empty
|
||||
else
|
||||
Waiters ws
|
||||
|
||||
(** Produce a state from a queue of elements *)
|
||||
let[@inline] mk_st_elems_ q : _ state =
|
||||
if Q.is_empty q then
|
||||
Empty
|
||||
else
|
||||
Elems q
|
||||
|
||||
let push (self : _ t) x : unit =
|
||||
while
|
||||
let old_st = A.get self.st in
|
||||
match old_st with
|
||||
| St_closed -> raise Closed
|
||||
| Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x)))
|
||||
| Waiters ws ->
|
||||
(* awake first waiter and give it [x] *)
|
||||
let w, ws' = Q.pop_exn ws in
|
||||
let new_st = mk_st_waiters_ ws' in
|
||||
if A.compare_and_set self.st old_st new_st then (
|
||||
Fut.fulfill w (Ok x);
|
||||
false
|
||||
) else
|
||||
true
|
||||
| Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x)))
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
match Queue.length self.q with
|
||||
| 0 ->
|
||||
let to_awake = Queue.create () in
|
||||
Queue.push x self.q;
|
||||
Queue.transfer self.pop_waiters to_awake;
|
||||
res := true;
|
||||
Mutex.unlock self.mutex;
|
||||
(* wake up pop triggers if needed. Be careful to do that
|
||||
outside the critical section*)
|
||||
Queue.iter Trigger.signal to_awake
|
||||
| n when n < self.max_size ->
|
||||
Queue.push x self.q;
|
||||
Mutex.unlock self.mutex
|
||||
| _ -> Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
|
||||
let try_pop (type elt) self : elt option =
|
||||
let module M = struct
|
||||
exception Found of elt
|
||||
end in
|
||||
try
|
||||
(* a bit of spinning *)
|
||||
for _retry = 1 to 10 do
|
||||
let old_st = A.get self.st in
|
||||
match old_st with
|
||||
| Elems q ->
|
||||
let x, q' = Q.pop_exn q in
|
||||
let new_st = mk_st_elems_ q' in
|
||||
if A.compare_and_set self.st old_st new_st then
|
||||
raise_notrace (M.Found x)
|
||||
else
|
||||
Domain_.relax ()
|
||||
| _ -> raise_notrace Exit
|
||||
done;
|
||||
None
|
||||
with
|
||||
| M.Found x -> Some x
|
||||
| Exit -> None
|
||||
|
||||
let pop (type elt) (self : _ t) : elt Fut.t =
|
||||
let module M = struct
|
||||
exception Ret of elt
|
||||
exception Fut of elt Fut.t
|
||||
end in
|
||||
try
|
||||
while
|
||||
let old_st = A.get self.st in
|
||||
(match old_st with
|
||||
| St_closed ->
|
||||
let bt = Printexc.get_callstack 10 in
|
||||
raise_notrace (M.Fut (Fut.fail Closed bt))
|
||||
| Elems q ->
|
||||
let x, q' = Q.pop_exn q in
|
||||
let new_st = mk_st_elems_ q' in
|
||||
if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x)
|
||||
| Empty ->
|
||||
let fut, promise = Fut.make () in
|
||||
let new_st = Waiters (Q.return promise) in
|
||||
if A.compare_and_set self.st old_st new_st then
|
||||
raise_notrace (M.Fut fut)
|
||||
| Waiters ws ->
|
||||
let fut, promise = Fut.make () in
|
||||
(* add new promise at the end of the queue of waiters *)
|
||||
let new_st = Waiters (Q.push ws promise) in
|
||||
if A.compare_and_set self.st old_st new_st then
|
||||
raise_notrace (M.Fut fut));
|
||||
true
|
||||
do
|
||||
Domain_.relax ()
|
||||
done;
|
||||
(* never reached *)
|
||||
assert false
|
||||
with
|
||||
| M.Ret x -> Fut.return x
|
||||
| M.Fut f -> f
|
||||
|
||||
let pop_block_exn (self : 'a t) : 'a =
|
||||
match try_pop self with
|
||||
| Some x -> x
|
||||
| None -> Fut.wait_block_exn @@ pop self
|
||||
let res = ref None in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
(match Queue.pop self.q with
|
||||
| exception Queue.Empty ->
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
)
|
||||
| x -> res := Some x);
|
||||
Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
|
||||
let close (self : _ t) : unit =
|
||||
while
|
||||
let old_st = A.get self.st in
|
||||
match old_st with
|
||||
| St_closed -> false (* exit *)
|
||||
| Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed)
|
||||
| Waiters ws ->
|
||||
if A.compare_and_set self.st old_st St_closed then (
|
||||
(* fail all waiters with [Closed]. *)
|
||||
let bt = Printexc.get_callstack 10 in
|
||||
Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws;
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
let q = Queue.create () in
|
||||
Mutex.lock self.mutex;
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
Queue.transfer self.pop_waiters q;
|
||||
Queue.transfer self.push_waiters q
|
||||
);
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal q
|
||||
|
||||
[@@@ifge 5.0]
|
||||
|
||||
let pop_await self =
|
||||
match try_pop self with
|
||||
| Some x -> x
|
||||
| None -> Fut.await @@ pop self
|
||||
let rec push (self : _ t) x : unit =
|
||||
Mutex.lock self.mutex;
|
||||
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
match Queue.length self.q with
|
||||
| 0 ->
|
||||
Queue.push x self.q;
|
||||
let to_wakeup = Queue.create () in
|
||||
Queue.transfer self.pop_waiters to_wakeup;
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal to_wakeup
|
||||
| n when n < self.max_size ->
|
||||
Queue.push x self.q;
|
||||
Mutex.unlock self.mutex
|
||||
| _ ->
|
||||
let tr = Trigger.create () in
|
||||
Queue.push tr self.push_waiters;
|
||||
Mutex.unlock self.mutex;
|
||||
Trigger.await_exn tr;
|
||||
push self x
|
||||
|
||||
let rec pop (self : 'a t) : 'a =
|
||||
Mutex.lock self.mutex;
|
||||
match Queue.pop self.q with
|
||||
| x ->
|
||||
if Queue.is_empty self.q then (
|
||||
let to_wakeup = Queue.create () in
|
||||
Queue.transfer self.push_waiters to_wakeup;
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal to_wakeup
|
||||
) else
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
| exception Queue.Empty ->
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
let tr = Trigger.create () in
|
||||
Queue.push tr self.pop_waiters;
|
||||
Mutex.unlock self.mutex;
|
||||
Trigger.await_exn tr;
|
||||
pop self
|
||||
|
||||
[@@@endif]
|
||||
|
||||
(* TODO: remove
|
||||
(** A waiter queue, somewhat similar to a condition. *)
|
||||
module Waiters_ = struct
|
||||
type t = { waiters: Trigger.t Q.t A.t } [@@unboxed]
|
||||
|
||||
let create () : t = { waiters = A.make Q.empty }
|
||||
|
||||
let add_waiter self (tr : Trigger.t) : unit =
|
||||
while
|
||||
let q = A.get self.waiters in
|
||||
not (A.compare_and_set self.waiters q (Q.push q tr))
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
|
||||
let wait_await (self : t) : unit =
|
||||
let tr = Trigger.create () in
|
||||
add_waiter self tr;
|
||||
Trigger.await_exn tr
|
||||
|
||||
exception Empty = Q.Empty
|
||||
|
||||
let rec pop_waiter (self : t) : Trigger.t =
|
||||
let q = A.get self.waiters in
|
||||
let x, q' = Q.pop_exn q in
|
||||
if A.compare_and_set self.waiters q q' then
|
||||
x
|
||||
else (
|
||||
Domain_.relax ();
|
||||
pop_waiter self
|
||||
)
|
||||
|
||||
let rec pop_all (self : t) : Trigger.t Q.t =
|
||||
let q = A.get self.waiters in
|
||||
if A.compare_and_set self.waiters q Q.empty then
|
||||
q
|
||||
else (
|
||||
Domain_.relax ();
|
||||
pop_all self
|
||||
)
|
||||
|
||||
let signal (self : t) : unit =
|
||||
match pop_waiter self with
|
||||
| exception Empty -> ()
|
||||
| tr -> Trigger.signal tr
|
||||
|
||||
let broadcast (self : t) : unit =
|
||||
let waiters = pop_all self in
|
||||
Q.iter Trigger.signal waiters
|
||||
end
|
||||
*)
|
||||
|
|
|
|||
|
|
@ -1,12 +1,9 @@
|
|||
(** Channels.
|
||||
|
||||
Channels are pipelines of values where threads can push into
|
||||
one end, and pull from the other end.
|
||||
The channels have bounded size. Push/pop return futures or can use effects
|
||||
to provide an [await]-friendly version.
|
||||
|
||||
Unlike {!Moonpool.Blocking_queue}, channels are designed so
|
||||
that pushing never blocks, and pop'ing values returns a future.
|
||||
|
||||
@since 0.3
|
||||
The channels became bounded since @NEXT_RELEASE .
|
||||
*)
|
||||
|
||||
type 'a or_error = 'a Fut.or_error
|
||||
|
|
@ -14,39 +11,43 @@ type 'a or_error = 'a Fut.or_error
|
|||
type 'a t
|
||||
(** Channel carrying values of type ['a]. *)
|
||||
|
||||
val create : unit -> 'a t
|
||||
val create : max_size:int -> unit -> 'a t
|
||||
(** Create a channel. *)
|
||||
|
||||
exception Closed
|
||||
|
||||
val push : 'a t -> 'a -> unit
|
||||
(** [push chan x] pushes [x] into [chan]. This does not block.
|
||||
val try_push : 'a t -> 'a -> bool
|
||||
(** [try_push chan x] pushes [x] into [chan]. This does not block.
|
||||
Returns [true] if it succeeded in pushing.
|
||||
@raise Closed if the channel is closed. *)
|
||||
|
||||
val pop : 'a t -> 'a Fut.t
|
||||
(** Pop an element. This returns a future that will be
|
||||
fulfilled when an element is available.
|
||||
@raise Closed if the channel is closed, or fails the future
|
||||
if the channel is closed before an element is available for it. *)
|
||||
|
||||
val try_pop : 'a t -> 'a option
|
||||
(** [try_pop chan] pops and return an element if one is available
|
||||
immediately. Otherwise it returns [None]. *)
|
||||
|
||||
val pop_block_exn : 'a t -> 'a
|
||||
(** Like [pop], but blocks if an element is not available immediately.
|
||||
The precautions around blocking from inside a thread pool
|
||||
are the same as explained in {!Fut.wait_block}. *)
|
||||
|
||||
val close : _ t -> unit
|
||||
(** Close the channel. Further push and pop calls will fail.
|
||||
This is idempotent. *)
|
||||
|
||||
[@@@ifge 5.0]
|
||||
|
||||
val pop_await : 'a t -> 'a
|
||||
(** Like {!pop} but suspends the current thread until an element is
|
||||
available. See {!Fut.await} for more details.
|
||||
@since 0.3 *)
|
||||
val push : 'a t -> 'a -> unit
|
||||
(** Push the value into the channel, suspending the current task
|
||||
if the channel is currently full.
|
||||
@raise Closed if the channel is closed
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
val pop : 'a t -> 'a
|
||||
(** Pop an element. This might suspend the current task if the
|
||||
channel is currently empty.
|
||||
@raise Closed if the channel is empty and closed.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
(*
|
||||
val pop_block_exn : 'a t -> 'a
|
||||
(** Like [pop], but blocks if an element is not available immediately.
|
||||
The precautions around blocking from inside a thread pool
|
||||
are the same as explained in {!Fut.wait_block}. *)
|
||||
*)
|
||||
|
||||
[@@@endif]
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
module Mutex = Picos_std_sync.Mutex
|
||||
module Chan = Chan
|
||||
module Condition = Picos_std_sync.Condition
|
||||
module Lock = Lock
|
||||
module Event = Event
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue