diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml index c1c2978..1a77aa3 100644 --- a/src/tef/b_queue.ml +++ b/src/tef/b_queue.ml @@ -38,12 +38,26 @@ let push (self : _ t) x : unit = let rec pop_all (self : 'a t) : 'a list = match Mpsc_bag.pop_all self.q with - | l -> List.rev l - | exception Mpsc_bag.Empty -> + | Some l -> l + | None -> if self.closed then raise Closed; Mutex.lock self.mutex; Atomic.set self.consumer_waiting true; - Condition.wait self.cond self.mutex; - Atomic.set self.consumer_waiting false; - Mutex.unlock self.mutex; - pop_all self + (* check again, a producer might have pushed an element since we + last checked. However if we still find + nothing, because this comes after [consumer_waiting:=true], + any producer arriving after that will know to wake us up. *) + (match Mpsc_bag.pop_all self.q with + | Some l -> + Atomic.set self.consumer_waiting false; + Mutex.unlock self.mutex; + l + | None -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex; + Atomic.set self.consumer_waiting false; + Mutex.unlock self.mutex; + pop_all self) diff --git a/src/tef/mpsc_bag.ml b/src/tef/mpsc_bag.ml index 7a4be78..004e8f5 100644 --- a/src/tef/mpsc_bag.ml +++ b/src/tef/mpsc_bag.ml @@ -24,9 +24,7 @@ let rec add backoff t x = let[@inline] add t x = add Backoff.default t x -exception Empty - -let[@inline] pop_all t : _ list = +let[@inline] pop_all t : _ list option = match Atomic.exchange t.bag [] with - | [] -> raise_notrace Empty - | l -> l + | [] -> None + | l -> Some (List.rev l) diff --git a/src/tef/mpsc_bag.mli b/src/tef/mpsc_bag.mli index f9b62b5..edb5dba 100644 --- a/src/tef/mpsc_bag.mli +++ b/src/tef/mpsc_bag.mli @@ -7,8 +7,5 @@ val create : unit -> 'a t val add : 'a t -> 'a -> unit (** [add q x] adds [x] in the bag. *) -exception Empty - -val pop_all : 'a t -> 'a list -(** Return all current items in an unspecified order. - @raise Empty if empty *) +val pop_all : 'a t -> 'a list option +(** Return all current items in the insertion order. *)