mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 03:05:30 -05:00
chan: go back to the lock-full version
This commit is contained in:
parent
94998ea407
commit
e7ee012108
1 changed files with 101 additions and 175 deletions
270
src/core/chan.ml
270
src/core/chan.ml
|
|
@ -1,200 +1,126 @@
|
|||
module A = Atomic_
|
||||
|
||||
let[@inline] list_is_empty_ = function
|
||||
| [] -> true
|
||||
| _ :: _ -> false
|
||||
|
||||
(** Simple functional queue *)
|
||||
module Q : sig
|
||||
type 'a t
|
||||
|
||||
val empty : 'a t
|
||||
val is_empty : _ t -> bool
|
||||
|
||||
exception Empty
|
||||
|
||||
val pop_exn : 'a t -> 'a * 'a t
|
||||
val push : 'a t -> 'a -> 'a t
|
||||
val iter : ('a -> unit) -> 'a t -> unit
|
||||
end = struct
|
||||
type 'a t = {
|
||||
hd: 'a list;
|
||||
tl: 'a list;
|
||||
}
|
||||
(** Queue containing elements of type 'a.
|
||||
|
||||
invariant: if hd=[], then tl=[] *)
|
||||
|
||||
let empty = { hd = []; tl = [] }
|
||||
|
||||
let[@inline] make_ hd tl =
|
||||
match hd with
|
||||
| [] -> { hd = List.rev tl; tl = [] }
|
||||
| _ :: _ -> { hd; tl }
|
||||
|
||||
let[@inline] is_empty self = list_is_empty_ self.hd
|
||||
let[@inline] push self x : _ t = make_ self.hd (x :: self.tl)
|
||||
|
||||
let iter f (self : _ t) : unit =
|
||||
List.iter f self.hd;
|
||||
List.iter f self.tl
|
||||
|
||||
exception Empty
|
||||
|
||||
let pop_exn self =
|
||||
match self.hd with
|
||||
| [] ->
|
||||
assert (list_is_empty_ self.tl);
|
||||
raise Empty
|
||||
| x :: hd' ->
|
||||
let self' = make_ hd' self.tl in
|
||||
x, self'
|
||||
end
|
||||
|
||||
exception Closed
|
||||
exception Full
|
||||
|
||||
module State = struct
|
||||
type 'a t = {
|
||||
q: 'a Q.t;
|
||||
size: int;
|
||||
pop_waiters: Trigger.t Q.t;
|
||||
push_waiters: Trigger.t Q.t;
|
||||
}
|
||||
|
||||
(** @raise Q.Empty *)
|
||||
let[@inline] pop_one_ ~max_size (st : 'a t) : 'a * 'a t * Trigger.t Q.t =
|
||||
let x, new_q = Q.pop_exn st.q in
|
||||
let new_st = { st with q = new_q; size = st.size - 1 } in
|
||||
if st.size = max_size then
|
||||
(* we signal all the push waiters, the channel isn't full anymore *)
|
||||
x, { new_st with push_waiters = Q.empty }, st.push_waiters
|
||||
else
|
||||
x, new_st, Q.empty
|
||||
|
||||
(** @raise Full *)
|
||||
let[@inline] push_one_ ~max_size (st : 'a t) (x : 'a) : 'a t * Trigger.t Q.t =
|
||||
if st.size >= max_size then raise_notrace Full;
|
||||
let new_q = Q.push st.q x in
|
||||
let new_st = { st with q = new_q; size = st.size + 1 } in
|
||||
if st.size = 0 then
|
||||
(* we signal all the pop waiters, the channel isn't empty anymore *)
|
||||
{ new_st with pop_waiters = Q.empty }, st.pop_waiters
|
||||
else
|
||||
new_st, Q.empty
|
||||
end
|
||||
|
||||
type 'a t = {
|
||||
st: 'a State.t A.t;
|
||||
closed: bool A.t;
|
||||
q: 'a Queue.t;
|
||||
mutex: Mutex.t; (** protects critical section *)
|
||||
mutable closed: bool;
|
||||
max_size: int;
|
||||
push_waiters: Trigger.t Queue.t;
|
||||
pop_waiters: Trigger.t Queue.t;
|
||||
}
|
||||
|
||||
let create ~max_size () : _ t =
|
||||
if max_size < 0 then invalid_arg "Chan: max_size < 0";
|
||||
{
|
||||
max_size;
|
||||
closed = A.make false;
|
||||
st =
|
||||
A.make
|
||||
{
|
||||
State.q = Q.empty;
|
||||
size = 0;
|
||||
pop_waiters = Q.empty;
|
||||
push_waiters = Q.empty;
|
||||
};
|
||||
mutex = Mutex.create ();
|
||||
closed = false;
|
||||
q = Queue.create ();
|
||||
push_waiters = Queue.create ();
|
||||
pop_waiters = Queue.create ();
|
||||
}
|
||||
|
||||
let try_pop (self : 'a t) : 'a option =
|
||||
let old_st = A.get self.st in
|
||||
match State.pop_one_ ~max_size:self.max_size old_st with
|
||||
| exception Q.Empty ->
|
||||
if A.get self.closed then raise Closed;
|
||||
None
|
||||
| x, new_st, to_broadcast ->
|
||||
if A.compare_and_set self.st old_st new_st then (
|
||||
Q.iter Trigger.signal to_broadcast;
|
||||
Some x
|
||||
) else
|
||||
None
|
||||
let try_push (self : _ t) x : bool =
|
||||
let res = ref false in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
let try_push (self : 'a t) (x : 'a) : bool =
|
||||
if A.get self.closed then raise Closed;
|
||||
let old_st = A.get self.st in
|
||||
match State.push_one_ ~max_size:self.max_size old_st x with
|
||||
| exception Full -> false
|
||||
| new_st, to_broadcast ->
|
||||
if A.compare_and_set self.st old_st new_st then (
|
||||
Q.iter Trigger.signal to_broadcast;
|
||||
true
|
||||
) else
|
||||
false
|
||||
match Queue.length self.q with
|
||||
| 0 ->
|
||||
let to_awake = Queue.create () in
|
||||
Queue.push x self.q;
|
||||
Queue.transfer self.pop_waiters to_awake;
|
||||
res := true;
|
||||
Mutex.unlock self.mutex;
|
||||
(* wake up pop triggers if needed. Be careful to do that
|
||||
outside the critical section*)
|
||||
Queue.iter Trigger.signal to_awake
|
||||
| n when n < self.max_size ->
|
||||
Queue.push x self.q;
|
||||
Mutex.unlock self.mutex
|
||||
| _ -> Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
|
||||
let try_pop (type elt) self : elt option =
|
||||
let res = ref None in
|
||||
if Mutex.try_lock self.mutex then (
|
||||
(match Queue.pop self.q with
|
||||
| exception Queue.Empty ->
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
)
|
||||
| x -> res := Some x);
|
||||
Mutex.unlock self.mutex
|
||||
);
|
||||
!res
|
||||
|
||||
let close (self : _ t) : unit =
|
||||
if not (A.exchange self.closed true) then
|
||||
while
|
||||
let old_st = A.get self.st in
|
||||
if
|
||||
A.compare_and_set self.st old_st
|
||||
{ old_st with push_waiters = Q.empty; pop_waiters = Q.empty }
|
||||
then (
|
||||
(* signal all waiters *)
|
||||
Q.iter Trigger.signal old_st.push_waiters;
|
||||
Q.iter Trigger.signal old_st.pop_waiters;
|
||||
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
let q = Queue.create () in
|
||||
Mutex.lock self.mutex;
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
Queue.transfer self.pop_waiters q;
|
||||
Queue.transfer self.push_waiters q
|
||||
);
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal q
|
||||
|
||||
[@@@ifge 5.0]
|
||||
|
||||
let rec push (self : _ t) x : unit =
|
||||
Mutex.lock self.mutex;
|
||||
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
match Queue.length self.q with
|
||||
| 0 ->
|
||||
Queue.push x self.q;
|
||||
let to_wakeup = Queue.create () in
|
||||
Queue.transfer self.pop_waiters to_wakeup;
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal to_wakeup
|
||||
| n when n < self.max_size ->
|
||||
Queue.push x self.q;
|
||||
Mutex.unlock self.mutex
|
||||
| _ ->
|
||||
let tr = Trigger.create () in
|
||||
Queue.push tr self.push_waiters;
|
||||
Mutex.unlock self.mutex;
|
||||
Trigger.await_exn tr;
|
||||
push self x
|
||||
|
||||
let rec pop (self : 'a t) : 'a =
|
||||
let old_st = A.get self.st in
|
||||
match State.pop_one_ ~max_size:self.max_size old_st with
|
||||
| exception Q.Empty ->
|
||||
if A.get self.closed then raise Closed;
|
||||
|
||||
let tr = Trigger.create () in
|
||||
if
|
||||
A.compare_and_set self.st old_st
|
||||
{ old_st with pop_waiters = Q.push old_st.pop_waiters tr }
|
||||
then (
|
||||
Trigger.await_exn tr;
|
||||
pop self
|
||||
Mutex.lock self.mutex;
|
||||
match Queue.pop self.q with
|
||||
| x ->
|
||||
if Queue.is_empty self.q then (
|
||||
let to_wakeup = Queue.create () in
|
||||
Queue.transfer self.push_waiters to_wakeup;
|
||||
Mutex.unlock self.mutex;
|
||||
Queue.iter Trigger.signal to_wakeup
|
||||
) else
|
||||
pop self
|
||||
| x, new_st, to_broadcast ->
|
||||
if A.compare_and_set self.st old_st new_st then (
|
||||
Q.iter Trigger.signal to_broadcast;
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
) else
|
||||
pop self
|
||||
| exception Queue.Empty ->
|
||||
if self.closed then (
|
||||
Mutex.unlock self.mutex;
|
||||
raise Closed
|
||||
);
|
||||
|
||||
let push (self : _ t) x : unit =
|
||||
while
|
||||
if A.get self.closed then raise Closed;
|
||||
|
||||
let old_st = A.get self.st in
|
||||
match State.push_one_ ~max_size:self.max_size old_st x with
|
||||
| exception Full ->
|
||||
let tr = Trigger.create () in
|
||||
if
|
||||
A.compare_and_set self.st old_st
|
||||
{ old_st with push_waiters = Q.push old_st.push_waiters tr }
|
||||
then
|
||||
Queue.push tr self.pop_waiters;
|
||||
Mutex.unlock self.mutex;
|
||||
Trigger.await_exn tr;
|
||||
true
|
||||
| new_st, to_broadcast ->
|
||||
if A.compare_and_set self.st old_st new_st then (
|
||||
Q.iter Trigger.signal to_broadcast;
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
Domain_.relax ()
|
||||
done
|
||||
pop self
|
||||
|
||||
[@@@endif]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue