lock free chan

This commit is contained in:
Simon Cruanes 2024-09-25 22:17:41 -04:00
parent 854c3b819b
commit 94998ea407
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 127 additions and 162 deletions

View file

@ -1,9 +1,5 @@
module A = Atomic_ module A = Atomic_
type 'a or_error = 'a Fut.or_error
type 'a pop_waiter = 'a Fut.promise
type 'a push_waiter = 'a * unit Fut.promise
let[@inline] list_is_empty_ = function let[@inline] list_is_empty_ = function
| [] -> true | [] -> true
| _ :: _ -> false | _ :: _ -> false
@ -13,7 +9,6 @@ module Q : sig
type 'a t type 'a t
val empty : 'a t val empty : 'a t
val return : 'a -> 'a t
val is_empty : _ t -> bool val is_empty : _ t -> bool
exception Empty exception Empty
@ -31,7 +26,6 @@ end = struct
invariant: if hd=[], then tl=[] *) invariant: if hd=[], then tl=[] *)
let empty = { hd = []; tl = [] } let empty = { hd = []; tl = [] }
let[@inline] return x : _ t = { hd = [ x ]; tl = [] }
let[@inline] make_ hd tl = let[@inline] make_ hd tl =
match hd with match hd with
@ -58,178 +52,149 @@ end = struct
end end
exception Closed exception Closed
exception Full
module State = struct
type 'a t = {
q: 'a Q.t;
size: int;
pop_waiters: Trigger.t Q.t;
push_waiters: Trigger.t Q.t;
}
(** @raise Q.Empty *)
let[@inline] pop_one_ ~max_size (st : 'a t) : 'a * 'a t * Trigger.t Q.t =
let x, new_q = Q.pop_exn st.q in
let new_st = { st with q = new_q; size = st.size - 1 } in
if st.size = max_size then
(* we signal all the push waiters, the channel isn't full anymore *)
x, { new_st with push_waiters = Q.empty }, st.push_waiters
else
x, new_st, Q.empty
(** @raise Full *)
let[@inline] push_one_ ~max_size (st : 'a t) (x : 'a) : 'a t * Trigger.t Q.t =
if st.size >= max_size then raise_notrace Full;
let new_q = Q.push st.q x in
let new_st = { st with q = new_q; size = st.size + 1 } in
if st.size = 0 then
(* we signal all the pop waiters, the channel isn't empty anymore *)
{ new_st with pop_waiters = Q.empty }, st.pop_waiters
else
new_st, Q.empty
end
type 'a t = { type 'a t = {
q: 'a Queue.t; st: 'a State.t A.t;
mutex: Mutex.t; (** protects critical section *) closed: bool A.t;
mutable closed: bool;
max_size: int; max_size: int;
push_waiters: Trigger.t Queue.t;
pop_waiters: Trigger.t Queue.t;
} }
let create ~max_size () : _ t = let create ~max_size () : _ t =
if max_size < 0 then invalid_arg "Chan: max_size < 0"; if max_size < 0 then invalid_arg "Chan: max_size < 0";
{ {
max_size; max_size;
mutex = Mutex.create (); closed = A.make false;
closed = false; st =
q = Queue.create (); A.make
push_waiters = Queue.create (); {
pop_waiters = Queue.create (); State.q = Q.empty;
size = 0;
pop_waiters = Q.empty;
push_waiters = Q.empty;
};
} }
let try_push (self : _ t) x : bool = let try_pop (self : 'a t) : 'a option =
let res = ref false in let old_st = A.get self.st in
if Mutex.try_lock self.mutex then ( match State.pop_one_ ~max_size:self.max_size old_st with
if self.closed then ( | exception Q.Empty ->
Mutex.unlock self.mutex; if A.get self.closed then raise Closed;
raise Closed None
); | x, new_st, to_broadcast ->
if A.compare_and_set self.st old_st new_st then (
Q.iter Trigger.signal to_broadcast;
Some x
) else
None
match Queue.length self.q with let try_push (self : 'a t) (x : 'a) : bool =
| 0 -> if A.get self.closed then raise Closed;
let to_awake = Queue.create () in let old_st = A.get self.st in
Queue.push x self.q; match State.push_one_ ~max_size:self.max_size old_st x with
Queue.transfer self.pop_waiters to_awake; | exception Full -> false
res := true; | new_st, to_broadcast ->
Mutex.unlock self.mutex; if A.compare_and_set self.st old_st new_st then (
(* wake up pop triggers if needed. Be careful to do that Q.iter Trigger.signal to_broadcast;
outside the critical section*) true
Queue.iter Trigger.signal to_awake ) else
| n when n < self.max_size -> false
Queue.push x self.q;
Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
);
!res
let try_pop (type elt) self : elt option =
let res = ref None in
if Mutex.try_lock self.mutex then (
(match Queue.pop self.q with
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
)
| x -> res := Some x);
Mutex.unlock self.mutex
);
!res
let close (self : _ t) : unit = let close (self : _ t) : unit =
let q = Queue.create () in if not (A.exchange self.closed true) then
Mutex.lock self.mutex; while
if not self.closed then ( let old_st = A.get self.st in
self.closed <- true; if
Queue.transfer self.pop_waiters q; A.compare_and_set self.st old_st
Queue.transfer self.push_waiters q { old_st with push_waiters = Q.empty; pop_waiters = Q.empty }
); then (
Mutex.unlock self.mutex; (* signal all waiters *)
Queue.iter Trigger.signal q Q.iter Trigger.signal old_st.push_waiters;
Q.iter Trigger.signal old_st.pop_waiters;
false
) else
true
do
Domain_.relax ()
done
[@@@ifge 5.0] [@@@ifge 5.0]
let rec push (self : _ t) x : unit =
Mutex.lock self.mutex;
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
match Queue.length self.q with
| 0 ->
Queue.push x self.q;
let to_wakeup = Queue.create () in
Queue.transfer self.pop_waiters to_wakeup;
Mutex.unlock self.mutex;
Queue.iter Trigger.signal to_wakeup
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ ->
let tr = Trigger.create () in
Queue.push tr self.push_waiters;
Mutex.unlock self.mutex;
Trigger.await_exn tr;
push self x
let rec pop (self : 'a t) : 'a = let rec pop (self : 'a t) : 'a =
Mutex.lock self.mutex; let old_st = A.get self.st in
match Queue.pop self.q with match State.pop_one_ ~max_size:self.max_size old_st with
| x -> | exception Q.Empty ->
if Queue.is_empty self.q then ( if A.get self.closed then raise Closed;
let to_wakeup = Queue.create () in
Queue.transfer self.push_waiters to_wakeup;
Mutex.unlock self.mutex;
Queue.iter Trigger.signal to_wakeup
) else
Mutex.unlock self.mutex;
x
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
let tr = Trigger.create () in let tr = Trigger.create () in
Queue.push tr self.pop_waiters; if
Mutex.unlock self.mutex; A.compare_and_set self.st old_st
Trigger.await_exn tr; { old_st with pop_waiters = Q.push old_st.pop_waiters tr }
pop self then (
Trigger.await_exn tr;
pop self
) else
pop self
| x, new_st, to_broadcast ->
if A.compare_and_set self.st old_st new_st then (
Q.iter Trigger.signal to_broadcast;
x
) else
pop self
let push (self : _ t) x : unit =
while
if A.get self.closed then raise Closed;
let old_st = A.get self.st in
match State.push_one_ ~max_size:self.max_size old_st x with
| exception Full ->
let tr = Trigger.create () in
if
A.compare_and_set self.st old_st
{ old_st with push_waiters = Q.push old_st.push_waiters tr }
then
Trigger.await_exn tr;
true
| new_st, to_broadcast ->
if A.compare_and_set self.st old_st new_st then (
Q.iter Trigger.signal to_broadcast;
false
) else
true
do
Domain_.relax ()
done
[@@@endif] [@@@endif]
(* TODO: remove
(** A waiter queue, somewhat similar to a condition. *)
module Waiters_ = struct
type t = { waiters: Trigger.t Q.t A.t } [@@unboxed]
let create () : t = { waiters = A.make Q.empty }
let add_waiter self (tr : Trigger.t) : unit =
while
let q = A.get self.waiters in
not (A.compare_and_set self.waiters q (Q.push q tr))
do
Domain_.relax ()
done
let wait_await (self : t) : unit =
let tr = Trigger.create () in
add_waiter self tr;
Trigger.await_exn tr
exception Empty = Q.Empty
let rec pop_waiter (self : t) : Trigger.t =
let q = A.get self.waiters in
let x, q' = Q.pop_exn q in
if A.compare_and_set self.waiters q q' then
x
else (
Domain_.relax ();
pop_waiter self
)
let rec pop_all (self : t) : Trigger.t Q.t =
let q = A.get self.waiters in
if A.compare_and_set self.waiters q Q.empty then
q
else (
Domain_.relax ();
pop_all self
)
let signal (self : t) : unit =
match pop_waiter self with
| exception Empty -> ()
| tr -> Trigger.signal tr
let broadcast (self : t) : unit =
let waiters = pop_all self in
Q.iter Trigger.signal waiters
end
*)

View file

@ -6,8 +6,6 @@
The channels became bounded since @NEXT_RELEASE . The channels became bounded since @NEXT_RELEASE .
*) *)
type 'a or_error = 'a Fut.or_error
type 'a t type 'a t
(** Channel carrying values of type ['a]. *) (** Channel carrying values of type ['a]. *)
@ -23,7 +21,9 @@ val try_push : 'a t -> 'a -> bool
val try_pop : 'a t -> 'a option val try_pop : 'a t -> 'a option
(** [try_pop chan] pops and return an element if one is available (** [try_pop chan] pops and return an element if one is available
immediately. Otherwise it returns [None]. *) immediately. Otherwise it returns [None].
@raise Closed if the channel is closed and empty.
*)
val close : _ t -> unit val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. (** Close the channel. Further push and pop calls will fail.