expose a simple blocking queue, with a controlled API.

This commit is contained in:
Simon Cruanes 2023-06-10 00:02:50 -04:00
parent 0a4d87816d
commit b4f0c1572e
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 37 additions and 0 deletions

View file

@ -4,3 +4,4 @@ let start_thread_on_some_domain f x =
module Pool = Pool
module Fut = Fut
module Blocking_queue = Bb_queue

View file

@ -12,3 +12,39 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
to run all the various threads needed in an application (timers, event loops, etc.) *)
module Fut = Fut
(** A simple blocking queue.
This queue is quite basic and will not behave well under heavy
contention. However, it can be sufficient for many practical use cases.
{b NOTE}: this queue will typically block the caller thread
in case the operation (push/pop) cannot proceed.
Be wary of deadlocks when using the queue {i from} a pool
when you expect the other end to also be produced/consumed from
the same pool.
See discussion on {!Fut.wait_block} for more details on deadlocks
and how to mitigate the risk of running into them.
More scalable queues can be found in
Lockfree (https://github.com/ocaml-multicore/lockfree/)
*)
module Blocking_queue : sig
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)
end