diff --git a/src/client/consumer.ml b/src/client/consumer.ml index 3cf92658..89288080 100644 --- a/src/client/consumer.ml +++ b/src/client/consumer.ml @@ -1,20 +1,27 @@ (** Consumer that accepts items from a bounded queue *) +type 'a t = { + active: unit -> bool; (** Still running? Must be fast and thread-safe *) + tick: unit -> unit; + (** Regularly called, eg to emit metrics, check timeouts, etc. Must be + thread safe. *) + shutdown: on_done:(unit -> unit) -> unit; + (** Shutdown the consumer as soon as possible, call [on_done()] once it's + done. *) +} (** A consumer for signals of type ['a] *) -class type ['a] t = object - method register : 'a Bounded_queue.t -> unit - method active : unit -> bool +type 'a consumer = 'a t - method start : on_done:(unit -> unit) -> unit +let[@inline] active (self : _ t) = self.active () - method shutdown : on_done:(unit -> unit) -> unit +let[@inline] shutdown (self : _ t) ~on_done = self.shutdown ~on_done + +module Builder = struct + type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer } + (** A builder that will create a consumer for a given queue, start the + consumer so it starts consuming from the queue, and return the consumer. + *) + + let start_consuming (self : _ t) bq = self.start_consuming bq end - -let register (self : _ #t) q = self#register q - -let active (self : _ #t) = self#active () - -let start (self : _ #t) ~on_done = self#start ~on_done - -let shutdown (self : _ #t) ~on_done = self#shutdown ~on_done