diff --git a/src/moonpool.ml b/src/moonpool.ml index 109cc500..c44db86b 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -4,3 +4,4 @@ let start_thread_on_some_domain f x = module Pool = Pool module Fut = Fut +module Blocking_queue = Bb_queue diff --git a/src/moonpool.mli b/src/moonpool.mli index a96f07e8..1f90d2dc 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -12,3 +12,39 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) module Fut = Fut + +(** A simple blocking queue. + + This queue is quite basic and will not behave well under heavy + contention. However, it can be sufficient for many practical use cases. + + {b NOTE}: this queue will typically block the caller thread + in case the operation (push/pop) cannot proceed. + Be wary of deadlocks when using the queue {i from} a pool + when you expect the other end to also be produced/consumed from + the same pool. + + See discussion on {!Fut.wait_block} for more details on deadlocks + and how to mitigate the risk of running into them. + + More scalable queues can be found in + Lockfree (https://github.com/ocaml-multicore/lockfree/) +*) +module Blocking_queue : sig + type 'a t + + val create : unit -> _ t + + exception Closed + + val push : 'a t -> 'a -> unit + (** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + + val pop : 'a t -> 'a + (** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + + val close : _ t -> unit + (** Close the queue, meaning there won't be any more [push] allowed. *) +end