From 8ade177b0eb29248dedef2e3336a81f1a9271b75 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 14 Sep 2023 22:33:44 -0400 Subject: [PATCH] add Bounded_queue --- src/bounded_queue.ml | 170 ++++++++++++++++++++++++++++++++++++++++++ src/bounded_queue.mli | 70 +++++++++++++++++ src/moonpool.ml | 1 + src/moonpool.mli | 2 + 4 files changed, 243 insertions(+) create mode 100644 src/bounded_queue.ml create mode 100644 src/bounded_queue.mli diff --git a/src/bounded_queue.ml b/src/bounded_queue.ml new file mode 100644 index 00000000..0f63847e --- /dev/null +++ b/src/bounded_queue.ml @@ -0,0 +1,170 @@ +type 'a t = { + max_size: int; + q: 'a Queue.t; + mutex: Mutex.t; + cond_push: Condition.t; + cond_pop: Condition.t; + mutable closed: bool; +} + +exception Closed + +let create ~max_size () : _ t = + if max_size < 1 then invalid_arg "Bounded_queue.create"; + { + max_size; + mutex = Mutex.create (); + cond_push = Condition.create (); + cond_pop = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + (* awake waiters so they fail *) + Condition.broadcast self.cond_push; + Condition.broadcast self.cond_pop + ); + Mutex.unlock self.mutex + +(** Check if the queue is full. Precondition: [self.mutex] is acquired. *) +let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size + +let push (self : _ t) x : unit = + let continue = ref true in + Mutex.lock self.mutex; + while !continue do + if self.closed then ( + (* push always fails on a closed queue *) + Mutex.unlock self.mutex; + raise Closed + ) else if is_full_ self then + Condition.wait self.cond_push self.mutex + else ( + let was_empty = Queue.is_empty self.q in + Queue.push x self.q; + if was_empty then Condition.broadcast self.cond_pop; + + (* exit loop *) + continue := false; + Mutex.unlock self.mutex + ) + done + +let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + if self.closed then ( + (* pop fails on a closed queue if it's also empty, + otherwise it still returns the remaining elements *) + Mutex.unlock self.mutex; + raise Closed + ); + + Condition.wait self.cond_pop self.mutex; + (loop [@tailcall]) () + ) else ( + let was_full = is_full_ self in + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + (* wakeup pushers *) + if was_full then Condition.broadcast self.cond_push; + x + ) + in + loop () + +let try_pop ~force_lock (self : _ t) : _ option = + let has_lock = + if force_lock then ( + Mutex.lock self.mutex; + true + ) else + Mutex.try_lock self.mutex + in + if has_lock then ( + let was_full_before_pop = is_full_ self in + match Queue.pop self.q with + | x -> + Mutex.unlock self.mutex; + if was_full_before_pop then Condition.broadcast self.cond_push; + Some x + | exception Queue.Empty -> + Mutex.unlock self.mutex; + None + ) else + None + +let try_push (self : _ t) x : bool = + if Mutex.try_lock self.mutex then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + if is_full_ self then ( + Mutex.unlock self.mutex; + false + ) else ( + let was_empty = Queue.is_empty self.q in + Queue.push x self.q; + if was_empty then Condition.broadcast self.cond_pop; + Mutex.unlock self.mutex; + true + ) + ) else + false + +let[@inline] max_size self = self.max_size + +let size (self : _ t) : int = + Mutex.lock self.mutex; + let n = Queue.length self.q in + Mutex.unlock self.mutex; + n + +let transfer (self : 'a t) q2 : unit = + Mutex.lock self.mutex; + let continue = ref true in + while !continue do + if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond_pop self.mutex + ) else ( + let was_full = is_full_ self in + Queue.transfer self.q q2; + if was_full then Condition.broadcast self.cond_push; + continue := false; + Mutex.unlock self.mutex + ) + done + +type 'a gen = unit -> 'a option +type 'a iter = ('a -> unit) -> unit + +let to_iter self k = + try + while true do + let x = pop self in + k x + done + with Closed -> () + +let to_gen self : _ gen = + fun () -> + match pop self with + | exception Closed -> None + | x -> Some x + +let rec to_seq self : _ Seq.t = + fun () -> + match pop self with + | exception Closed -> Seq.Nil + | x -> Seq.Cons (x, to_seq self) diff --git a/src/bounded_queue.mli b/src/bounded_queue.mli new file mode 100644 index 00000000..e764f920 --- /dev/null +++ b/src/bounded_queue.mli @@ -0,0 +1,70 @@ +(** A blocking queue of finite size. *) + +type 'a t +(** A bounded queue. *) + +val create : max_size:int -> unit -> 'a t + +val close : _ t -> unit +(** [close q] closes [q]. No new elements can be pushed into [q], + and after all the elements still in [q] currently are [pop]'d, + {!pop} will also raise {!Closed}. *) + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] at the end of the queue. + If [q] is full, this will block until there is + room for [x]. + @raise Closed if [q] is closed. *) + +val try_push : 'a t -> 'a -> bool +(** [try_push q x] attempts to push [x] into [q], but abandons + if it cannot acquire [q] or if [q] is full. + @raise Closed if [q] is closed. *) + +val pop : 'a t -> 'a +(** [pop q] pops the first element off [q]. It blocks if [q] + is empty, until some element becomes available. + @raise Closed if [q] is empty and closed. *) + +val try_pop : force_lock:bool -> 'a t -> 'a option +(** [try_pop q] tries to pop the first element, or returns [None] + if no element is available or if it failed to acquire [q]. + + @param force_lock if true, use {!Mutex.lock} (which can block + under contention); + if false, use {!Mutex.try_lock}, which might return [None] even in + presence of an element if there's contention. + + @raise Closed if [q] is empty and closed. *) + +val size : _ t -> int +(** Number of elements currently in [q] *) + +val max_size : _ t -> int +(** Maximum size of the queue. See {!create}. *) + +val transfer : 'a t -> 'a Queue.t -> unit +(** [transfer bq q2] transfers all elements currently available + in [bq] into local queue [q2], and clears [bq], atomically. + It blocks if [bq] is empty. + + See {!Bb_queue.transfer} for more details. + @raise Closed if [bq] is empty and closed. *) + +type 'a gen = unit -> 'a option +type 'a iter = ('a -> unit) -> unit + +val to_iter : 'a t -> 'a iter +(** [to_iter q] returns an iterator over all items in the queue. + This might not terminate if [q] is never closed. + @since NEXT_RELEASE *) + +val to_gen : 'a t -> 'a gen +(** [to_gen q] returns a generator from the queue. + @since NEXT_RELEASE *) + +val to_seq : 'a t -> 'a Seq.t +(** [to_gen q] returns a (transient) sequence from the queue. + @since NEXT_RELEASE *) diff --git a/src/moonpool.ml b/src/moonpool.ml index de17f461..83ae22a8 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -4,6 +4,7 @@ let start_thread_on_some_domain f x = module Atomic = Atomic_ module Blocking_queue = Bb_queue +module Bounded_queue = Bounded_queue module Chan = Chan module Fork_join = Fork_join module Fut = Fut diff --git a/src/moonpool.mli b/src/moonpool.mli index 3d890027..438f9727 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -133,6 +133,8 @@ module Blocking_queue : sig @since NEXT_RELEASE *) end +module Bounded_queue = Bounded_queue + module Atomic = Atomic_ (** Atomic values.