diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml index 96324e7..c1c2978 100644 --- a/src/tef/b_queue.ml +++ b/src/tef/b_queue.ml @@ -1,9 +1,9 @@ type 'a t = { mutex: Mutex.t; cond: Condition.t; - q: 'a Mpsc_queue.t; + q: 'a Mpsc_bag.t; mutable closed: bool; - mutable consumer_waiting: bool; + consumer_waiting: bool Atomic.t; } exception Closed @@ -12,9 +12,9 @@ let create () : _ t = { mutex = Mutex.create (); cond = Condition.create (); - q = Mpsc_queue.create (); + q = Mpsc_bag.create (); closed = false; - consumer_waiting = false; + consumer_waiting = Atomic.make false; } let close (self : _ t) = @@ -27,35 +27,23 @@ let close (self : _ t) = let push (self : _ t) x : unit = if self.closed then raise Closed; - Mpsc_queue.enqueue self.q x; + Mpsc_bag.add self.q x; if self.closed then raise Closed; - if self.consumer_waiting then ( + if Atomic.get self.consumer_waiting then ( (* wakeup consumer *) Mutex.lock self.mutex; Condition.broadcast self.cond; Mutex.unlock self.mutex ) -let rec pop (self : 'a t) : 'a = - match Mpsc_queue.dequeue self.q with - | x -> x - | exception Mpsc_queue.Empty -> - if self.closed then raise Closed; - Mutex.lock self.mutex; - self.consumer_waiting <- true; - Condition.wait self.cond self.mutex; - self.consumer_waiting <- false; - Mutex.unlock self.mutex; - pop self - let rec pop_all (self : 'a t) : 'a list = - match Mpsc_queue.dequeue_all self.q with - | l -> l - | exception Mpsc_queue.Empty -> + match Mpsc_bag.pop_all self.q with + | l -> List.rev l + | exception Mpsc_bag.Empty -> if self.closed then raise Closed; Mutex.lock self.mutex; - self.consumer_waiting <- true; + Atomic.set self.consumer_waiting true; Condition.wait self.cond self.mutex; - self.consumer_waiting <- false; + Atomic.set self.consumer_waiting false; Mutex.unlock self.mutex; pop_all self diff --git a/src/tef/b_queue.mli b/src/tef/b_queue.mli index 1db779f..cf3abb2 100644 --- a/src/tef/b_queue.mli +++ b/src/tef/b_queue.mli @@ -10,13 +10,9 @@ val push : 'a t -> 'a -> unit (** [push q x] pushes [x] into [q], and returns [()]. @raise Closed if [close q] was previously called.*) -val pop : 'a t -> 'a -(** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) - val pop_all : 'a t -> 'a list -(** [transfer bq q2] transfers all items presently - in [bq] into [q2], and clears [bq]. +(** [pop_all bq] returns all items presently + in [bq], in the same order, and clears [bq]. It blocks if no element is in [bq]. *) val close : _ t -> unit diff --git a/src/tef/mpsc_bag.ml b/src/tef/mpsc_bag.ml new file mode 100644 index 0000000..7a4be78 --- /dev/null +++ b/src/tef/mpsc_bag.ml @@ -0,0 +1,32 @@ +type 'a t = { bag: 'a list Atomic.t } [@@unboxed] + +let create () = + let bag = Atomic.make [] in + { bag } + +module Backoff = struct + type t = int + + let default = 2 + + let once (b : t) : t = + for _i = 1 to b do + Relax_.cpu_relax () + done; + min (b * 2) 256 +end + +let rec add backoff t x = + let before = Atomic.get t.bag in + let after = x :: before in + if not (Atomic.compare_and_set t.bag before after) then + add (Backoff.once backoff) t x + +let[@inline] add t x = add Backoff.default t x + +exception Empty + +let[@inline] pop_all t : _ list = + match Atomic.exchange t.bag [] with + | [] -> raise_notrace Empty + | l -> l diff --git a/src/tef/mpsc_bag.mli b/src/tef/mpsc_bag.mli new file mode 100644 index 0000000..f9b62b5 --- /dev/null +++ b/src/tef/mpsc_bag.mli @@ -0,0 +1,14 @@ +(** A multi-producer, single-consumer bag *) + +type 'a t + +val create : unit -> 'a t + +val add : 'a t -> 'a -> unit +(** [add q x] adds [x] in the bag. *) + +exception Empty + +val pop_all : 'a t -> 'a list +(** Return all current items in an unspecified order. + @raise Empty if empty *) diff --git a/src/tef/mpsc_queue.ml b/src/tef/mpsc_queue.ml deleted file mode 100644 index c40b62d..0000000 --- a/src/tef/mpsc_queue.ml +++ /dev/null @@ -1,60 +0,0 @@ -type 'a t = { - tail: 'a list Atomic.t; - head: 'a list ref; -} - -let create () = - let tail = Atomic.make [] in - (* padding *) - ignore (Sys.opaque_identity (Array.make 16 ())); - let head = ref [] in - { tail; head } - -module Backoff = struct - type t = int - - let default = 2 - - let once (b : t) : t = - let actual_b = b + Random.int 4 in - for _i = 1 to actual_b do - Relax_.cpu_relax () - done; - min (b * 2) 256 -end - -let rec enqueue backoff t x = - let before = Atomic.get t.tail in - let after = x :: before in - if not (Atomic.compare_and_set t.tail before after) then - enqueue (Backoff.once backoff) t x - -let enqueue t x = enqueue Backoff.default t x - -exception Empty - -let dequeue t = - match !(t.head) with - | x :: xs -> - t.head := xs; - x - | [] -> - (match Atomic.exchange t.tail [] with - | [] -> raise_notrace Empty - | [ x ] -> x - | x :: xs -> - (match List.rev_append [ x ] xs with - | x :: xs -> - t.head := xs; - x - | [] -> assert false)) - -let dequeue_all t : _ list = - match !(t.head) with - | _ :: _ as l -> - t.head := []; - l - | [] -> - (match Atomic.exchange t.tail [] with - | [] -> raise_notrace Empty - | l -> List.rev l) diff --git a/src/tef/mpsc_queue.mli b/src/tef/mpsc_queue.mli deleted file mode 100644 index 594c8dc..0000000 --- a/src/tef/mpsc_queue.mli +++ /dev/null @@ -1,14 +0,0 @@ -(** A multi-producer, single-consumer queue (from picos) *) - -type 'a t - -val create : unit -> 'a t -val enqueue : 'a t -> 'a -> unit - -exception Empty - -val dequeue : 'a t -> 'a -(** @raise Empty if empty *) - -val dequeue_all : 'a t -> 'a list -(** @raise Empty if empty *)