diff --git a/src/core/chan.ml b/src/core/chan.ml index 099e7074..80613d89 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,9 +1,5 @@ module A = Atomic_ -type 'a or_error = 'a Fut.or_error -type 'a pop_waiter = 'a Fut.promise -type 'a push_waiter = 'a * unit Fut.promise - let[@inline] list_is_empty_ = function | [] -> true | _ :: _ -> false @@ -13,7 +9,6 @@ module Q : sig type 'a t val empty : 'a t - val return : 'a -> 'a t val is_empty : _ t -> bool exception Empty @@ -31,7 +26,6 @@ end = struct invariant: if hd=[], then tl=[] *) let empty = { hd = []; tl = [] } - let[@inline] return x : _ t = { hd = [ x ]; tl = [] } let[@inline] make_ hd tl = match hd with @@ -58,178 +52,149 @@ end = struct end exception Closed +exception Full + +module State = struct + type 'a t = { + q: 'a Q.t; + size: int; + pop_waiters: Trigger.t Q.t; + push_waiters: Trigger.t Q.t; + } + + (** @raise Q.Empty *) + let[@inline] pop_one_ ~max_size (st : 'a t) : 'a * 'a t * Trigger.t Q.t = + let x, new_q = Q.pop_exn st.q in + let new_st = { st with q = new_q; size = st.size - 1 } in + if st.size = max_size then + (* we signal all the push waiters, the channel isn't full anymore *) + x, { new_st with push_waiters = Q.empty }, st.push_waiters + else + x, new_st, Q.empty + + (** @raise Full *) + let[@inline] push_one_ ~max_size (st : 'a t) (x : 'a) : 'a t * Trigger.t Q.t = + if st.size >= max_size then raise_notrace Full; + let new_q = Q.push st.q x in + let new_st = { st with q = new_q; size = st.size + 1 } in + if st.size = 0 then + (* we signal all the pop waiters, the channel isn't empty anymore *) + { new_st with pop_waiters = Q.empty }, st.pop_waiters + else + new_st, Q.empty +end type 'a t = { - q: 'a Queue.t; - mutex: Mutex.t; (** protects critical section *) - mutable closed: bool; + st: 'a State.t A.t; + closed: bool A.t; max_size: int; - push_waiters: Trigger.t Queue.t; - pop_waiters: Trigger.t Queue.t; } let create ~max_size () : _ t = if max_size < 0 then invalid_arg "Chan: max_size < 0"; { max_size; - mutex = Mutex.create (); - closed = false; - q = Queue.create (); - push_waiters = Queue.create (); - pop_waiters = Queue.create (); + closed = A.make false; + st = + A.make + { + State.q = Q.empty; + size = 0; + pop_waiters = Q.empty; + push_waiters = Q.empty; + }; } -let try_push (self : _ t) x : bool = - let res = ref false in - if Mutex.try_lock self.mutex then ( - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); +let try_pop (self : 'a t) : 'a option = + let old_st = A.get self.st in + match State.pop_one_ ~max_size:self.max_size old_st with + | exception Q.Empty -> + if A.get self.closed then raise Closed; + None + | x, new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + Some x + ) else + None - match Queue.length self.q with - | 0 -> - let to_awake = Queue.create () in - Queue.push x self.q; - Queue.transfer self.pop_waiters to_awake; - res := true; - Mutex.unlock self.mutex; - (* wake up pop triggers if needed. Be careful to do that - outside the critical section*) - Queue.iter Trigger.signal to_awake - | n when n < self.max_size -> - Queue.push x self.q; - Mutex.unlock self.mutex - | _ -> Mutex.unlock self.mutex - ); - !res - -let try_pop (type elt) self : elt option = - let res = ref None in - if Mutex.try_lock self.mutex then ( - (match Queue.pop self.q with - | exception Queue.Empty -> - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ) - | x -> res := Some x); - Mutex.unlock self.mutex - ); - !res +let try_push (self : 'a t) (x : 'a) : bool = + if A.get self.closed then raise Closed; + let old_st = A.get self.st in + match State.push_one_ ~max_size:self.max_size old_st x with + | exception Full -> false + | new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + true + ) else + false let close (self : _ t) : unit = - let q = Queue.create () in - Mutex.lock self.mutex; - if not self.closed then ( - self.closed <- true; - Queue.transfer self.pop_waiters q; - Queue.transfer self.push_waiters q - ); - Mutex.unlock self.mutex; - Queue.iter Trigger.signal q + if not (A.exchange self.closed true) then + while + let old_st = A.get self.st in + if + A.compare_and_set self.st old_st + { old_st with push_waiters = Q.empty; pop_waiters = Q.empty } + then ( + (* signal all waiters *) + Q.iter Trigger.signal old_st.push_waiters; + Q.iter Trigger.signal old_st.pop_waiters; + + false + ) else + true + do + Domain_.relax () + done [@@@ifge 5.0] -let rec push (self : _ t) x : unit = - Mutex.lock self.mutex; - - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); - - match Queue.length self.q with - | 0 -> - Queue.push x self.q; - let to_wakeup = Queue.create () in - Queue.transfer self.pop_waiters to_wakeup; - Mutex.unlock self.mutex; - Queue.iter Trigger.signal to_wakeup - | n when n < self.max_size -> - Queue.push x self.q; - Mutex.unlock self.mutex - | _ -> - let tr = Trigger.create () in - Queue.push tr self.push_waiters; - Mutex.unlock self.mutex; - Trigger.await_exn tr; - push self x - let rec pop (self : 'a t) : 'a = - Mutex.lock self.mutex; - match Queue.pop self.q with - | x -> - if Queue.is_empty self.q then ( - let to_wakeup = Queue.create () in - Queue.transfer self.push_waiters to_wakeup; - Mutex.unlock self.mutex; - Queue.iter Trigger.signal to_wakeup - ) else - Mutex.unlock self.mutex; - x - | exception Queue.Empty -> - if self.closed then ( - Mutex.unlock self.mutex; - raise Closed - ); + let old_st = A.get self.st in + match State.pop_one_ ~max_size:self.max_size old_st with + | exception Q.Empty -> + if A.get self.closed then raise Closed; let tr = Trigger.create () in - Queue.push tr self.pop_waiters; - Mutex.unlock self.mutex; - Trigger.await_exn tr; - pop self + if + A.compare_and_set self.st old_st + { old_st with pop_waiters = Q.push old_st.pop_waiters tr } + then ( + Trigger.await_exn tr; + pop self + ) else + pop self + | x, new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + x + ) else + pop self + +let push (self : _ t) x : unit = + while + if A.get self.closed then raise Closed; + + let old_st = A.get self.st in + match State.push_one_ ~max_size:self.max_size old_st x with + | exception Full -> + let tr = Trigger.create () in + if + A.compare_and_set self.st old_st + { old_st with push_waiters = Q.push old_st.push_waiters tr } + then + Trigger.await_exn tr; + true + | new_st, to_broadcast -> + if A.compare_and_set self.st old_st new_st then ( + Q.iter Trigger.signal to_broadcast; + false + ) else + true + do + Domain_.relax () + done [@@@endif] - -(* TODO: remove - (** A waiter queue, somewhat similar to a condition. *) - module Waiters_ = struct - type t = { waiters: Trigger.t Q.t A.t } [@@unboxed] - - let create () : t = { waiters = A.make Q.empty } - - let add_waiter self (tr : Trigger.t) : unit = - while - let q = A.get self.waiters in - not (A.compare_and_set self.waiters q (Q.push q tr)) - do - Domain_.relax () - done - - let wait_await (self : t) : unit = - let tr = Trigger.create () in - add_waiter self tr; - Trigger.await_exn tr - - exception Empty = Q.Empty - - let rec pop_waiter (self : t) : Trigger.t = - let q = A.get self.waiters in - let x, q' = Q.pop_exn q in - if A.compare_and_set self.waiters q q' then - x - else ( - Domain_.relax (); - pop_waiter self - ) - - let rec pop_all (self : t) : Trigger.t Q.t = - let q = A.get self.waiters in - if A.compare_and_set self.waiters q Q.empty then - q - else ( - Domain_.relax (); - pop_all self - ) - - let signal (self : t) : unit = - match pop_waiter self with - | exception Empty -> () - | tr -> Trigger.signal tr - - let broadcast (self : t) : unit = - let waiters = pop_all self in - Q.iter Trigger.signal waiters - end -*) diff --git a/src/core/chan.mli b/src/core/chan.mli index 142f43ce..7ec1163d 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -6,8 +6,6 @@ The channels became bounded since @NEXT_RELEASE . *) -type 'a or_error = 'a Fut.or_error - type 'a t (** Channel carrying values of type ['a]. *) @@ -23,7 +21,9 @@ val try_push : 'a t -> 'a -> bool val try_pop : 'a t -> 'a option (** [try_pop chan] pops and return an element if one is available - immediately. Otherwise it returns [None]. *) + immediately. Otherwise it returns [None]. + @raise Closed if the channel is closed and empty. + *) val close : _ t -> unit (** Close the channel. Further push and pop calls will fail.