diff --git a/src/core/chan.ml b/src/core/chan.ml index 5ce82376..099e7074 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,7 +1,8 @@ module A = Atomic_ type 'a or_error = 'a Fut.or_error -type 'a waiter = 'a Fut.promise +type 'a pop_waiter = 'a Fut.promise +type 'a push_waiter = 'a * unit Fut.promise let[@inline] list_is_empty_ = function | [] -> true @@ -11,6 +12,7 @@ let[@inline] list_is_empty_ = function module Q : sig type 'a t + val empty : 'a t val return : 'a -> 'a t val is_empty : _ t -> bool @@ -28,6 +30,7 @@ end = struct invariant: if hd=[], then tl=[] *) + let empty = { hd = []; tl = [] } let[@inline] return x : _ t = { hd = [ x ]; tl = [] } let[@inline] make_ hd tl = @@ -56,138 +59,177 @@ end exception Closed -type 'a state = - | Empty - | St_closed - | Elems of 'a Q.t - | Waiters of 'a waiter Q.t +type 'a t = { + q: 'a Queue.t; + mutex: Mutex.t; (** protects critical section *) + mutable closed: bool; + max_size: int; + push_waiters: Trigger.t Queue.t; + pop_waiters: Trigger.t Queue.t; +} -type 'a t = { st: 'a state A.t } [@@unboxed] +let create ~max_size () : _ t = + if max_size < 0 then invalid_arg "Chan: max_size < 0"; + { + max_size; + mutex = Mutex.create (); + closed = false; + q = Queue.create (); + push_waiters = Queue.create (); + pop_waiters = Queue.create (); + } -let create () : _ t = { st = A.make Empty } +let try_push (self : _ t) x : bool = + let res = ref false in + if Mutex.try_lock self.mutex then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); -(** Produce a state from a queue of waiters *) -let[@inline] mk_st_waiters_ ws : _ state = - if Q.is_empty ws then - Empty - else - Waiters ws - -(** Produce a state from a queue of elements *) -let[@inline] mk_st_elems_ q : _ state = - if Q.is_empty q then - Empty - else - Elems q - -let push (self : _ t) x : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> raise Closed - | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x))) - | Waiters ws -> - (* awake first waiter and give it [x] *) - let w, ws' = Q.pop_exn ws in - let new_st = mk_st_waiters_ ws' in - if A.compare_and_set self.st old_st new_st then ( - Fut.fulfill w (Ok x); - false - ) else - true - | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x))) - do - Domain_.relax () - done + match Queue.length self.q with + | 0 -> + let to_awake = Queue.create () in + Queue.push x self.q; + Queue.transfer self.pop_waiters to_awake; + res := true; + Mutex.unlock self.mutex; + (* wake up pop triggers if needed. Be careful to do that + outside the critical section*) + Queue.iter Trigger.signal to_awake + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> Mutex.unlock self.mutex + ); + !res let try_pop (type elt) self : elt option = - let module M = struct - exception Found of elt - end in - try - (* a bit of spinning *) - for _retry = 1 to 10 do - let old_st = A.get self.st in - match old_st with - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Found x) - else - Domain_.relax () - | _ -> raise_notrace Exit - done; - None - with - | M.Found x -> Some x - | Exit -> None - -let pop (type elt) (self : _ t) : elt Fut.t = - let module M = struct - exception Ret of elt - exception Fut of elt Fut.t - end in - try - while - let old_st = A.get self.st in - (match old_st with - | St_closed -> - let bt = Printexc.get_callstack 10 in - raise_notrace (M.Fut (Fut.fail Closed bt)) - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x) - | Empty -> - let fut, promise = Fut.make () in - let new_st = Waiters (Q.return promise) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut) - | Waiters ws -> - let fut, promise = Fut.make () in - (* add new promise at the end of the queue of waiters *) - let new_st = Waiters (Q.push ws promise) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut)); - true - do - Domain_.relax () - done; - (* never reached *) - assert false - with - | M.Ret x -> Fut.return x - | M.Fut f -> f - -let pop_block_exn (self : 'a t) : 'a = - match try_pop self with - | Some x -> x - | None -> Fut.wait_block_exn @@ pop self + let res = ref None in + if Mutex.try_lock self.mutex then ( + (match Queue.pop self.q with + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) + | x -> res := Some x); + Mutex.unlock self.mutex + ); + !res let close (self : _ t) : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> false (* exit *) - | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) - | Waiters ws -> - if A.compare_and_set self.st old_st St_closed then ( - (* fail all waiters with [Closed]. *) - let bt = Printexc.get_callstack 10 in - Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws; - false - ) else - true - do - Domain_.relax () - done + let q = Queue.create () in + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Queue.transfer self.pop_waiters q; + Queue.transfer self.push_waiters q + ); + Mutex.unlock self.mutex; + Queue.iter Trigger.signal q [@@@ifge 5.0] -let pop_await self = - match try_pop self with - | Some x -> x - | None -> Fut.await @@ pop self +let rec push (self : _ t) x : unit = + Mutex.lock self.mutex; + + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + match Queue.length self.q with + | 0 -> + Queue.push x self.q; + let to_wakeup = Queue.create () in + Queue.transfer self.pop_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> + let tr = Trigger.create () in + Queue.push tr self.push_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + push self x + +let rec pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + match Queue.pop self.q with + | x -> + if Queue.is_empty self.q then ( + let to_wakeup = Queue.create () in + Queue.transfer self.push_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + ) else + Mutex.unlock self.mutex; + x + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + let tr = Trigger.create () in + Queue.push tr self.pop_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + pop self [@@@endif] + +(* TODO: remove + (** A waiter queue, somewhat similar to a condition. *) + module Waiters_ = struct + type t = { waiters: Trigger.t Q.t A.t } [@@unboxed] + + let create () : t = { waiters = A.make Q.empty } + + let add_waiter self (tr : Trigger.t) : unit = + while + let q = A.get self.waiters in + not (A.compare_and_set self.waiters q (Q.push q tr)) + do + Domain_.relax () + done + + let wait_await (self : t) : unit = + let tr = Trigger.create () in + add_waiter self tr; + Trigger.await_exn tr + + exception Empty = Q.Empty + + let rec pop_waiter (self : t) : Trigger.t = + let q = A.get self.waiters in + let x, q' = Q.pop_exn q in + if A.compare_and_set self.waiters q q' then + x + else ( + Domain_.relax (); + pop_waiter self + ) + + let rec pop_all (self : t) : Trigger.t Q.t = + let q = A.get self.waiters in + if A.compare_and_set self.waiters q Q.empty then + q + else ( + Domain_.relax (); + pop_all self + ) + + let signal (self : t) : unit = + match pop_waiter self with + | exception Empty -> () + | tr -> Trigger.signal tr + + let broadcast (self : t) : unit = + let waiters = pop_all self in + Q.iter Trigger.signal waiters + end +*) diff --git a/src/core/chan.mli b/src/core/chan.mli index 083cf8d5..142f43ce 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -1,12 +1,9 @@ (** Channels. - Channels are pipelines of values where threads can push into - one end, and pull from the other end. + The channels have bounded size. Push/pop return futures or can use effects + to provide an [await]-friendly version. - Unlike {!Moonpool.Blocking_queue}, channels are designed so - that pushing never blocks, and pop'ing values returns a future. - - @since 0.3 + The channels became bounded since @NEXT_RELEASE . *) type 'a or_error = 'a Fut.or_error @@ -14,39 +11,43 @@ type 'a or_error = 'a Fut.or_error type 'a t (** Channel carrying values of type ['a]. *) -val create : unit -> 'a t +val create : max_size:int -> unit -> 'a t (** Create a channel. *) exception Closed -val push : 'a t -> 'a -> unit -(** [push chan x] pushes [x] into [chan]. This does not block. +val try_push : 'a t -> 'a -> bool +(** [try_push chan x] pushes [x] into [chan]. This does not block. + Returns [true] if it succeeded in pushing. @raise Closed if the channel is closed. *) -val pop : 'a t -> 'a Fut.t -(** Pop an element. This returns a future that will be - fulfilled when an element is available. - @raise Closed if the channel is closed, or fails the future - if the channel is closed before an element is available for it. *) - val try_pop : 'a t -> 'a option (** [try_pop chan] pops and return an element if one is available immediately. Otherwise it returns [None]. *) -val pop_block_exn : 'a t -> 'a -(** Like [pop], but blocks if an element is not available immediately. - The precautions around blocking from inside a thread pool - are the same as explained in {!Fut.wait_block}. *) - val close : _ t -> unit (** Close the channel. Further push and pop calls will fail. This is idempotent. *) [@@@ifge 5.0] -val pop_await : 'a t -> 'a -(** Like {!pop} but suspends the current thread until an element is - available. See {!Fut.await} for more details. - @since 0.3 *) +val push : 'a t -> 'a -> unit +(** Push the value into the channel, suspending the current task + if the channel is currently full. + @raise Closed if the channel is closed + @since NEXT_RELEASE *) + +val pop : 'a t -> 'a +(** Pop an element. This might suspend the current task if the + channel is currently empty. + @raise Closed if the channel is empty and closed. + @since NEXT_RELEASE *) + +(* +val pop_block_exn : 'a t -> 'a +(** Like [pop], but blocks if an element is not available immediately. + The precautions around blocking from inside a thread pool + are the same as explained in {!Fut.wait_block}. *) +*) [@@@endif] diff --git a/src/sync/moonpool_sync.ml b/src/sync/moonpool_sync.ml index 99065305..f2c29ba7 100644 --- a/src/sync/moonpool_sync.ml +++ b/src/sync/moonpool_sync.ml @@ -1,4 +1,5 @@ module Mutex = Picos_std_sync.Mutex +module Chan = Chan module Condition = Picos_std_sync.Condition module Lock = Lock module Event = Event