refactor consumer

This commit is contained in:
Simon Cruanes 2025-12-05 09:10:10 -05:00
parent 72851b8e34
commit bff2c4bcce
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -1,20 +1,27 @@
(** Consumer that accepts items from a bounded queue *)
type 'a t = {
active: unit -> bool; (** Still running? Must be fast and thread-safe *)
tick: unit -> unit;
(** Regularly called, eg to emit metrics, check timeouts, etc. Must be
thread safe. *)
shutdown: on_done:(unit -> unit) -> unit;
(** Shutdown the consumer as soon as possible, call [on_done()] once it's
done. *)
}
(** A consumer for signals of type ['a] *)
class type ['a] t = object
method register : 'a Bounded_queue.t -> unit
method active : unit -> bool
type 'a consumer = 'a t
method start : on_done:(unit -> unit) -> unit
let[@inline] active (self : _ t) = self.active ()
method shutdown : on_done:(unit -> unit) -> unit
let[@inline] shutdown (self : _ t) ~on_done = self.shutdown ~on_done
module Builder = struct
type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer }
(** A builder that will create a consumer for a given queue, start the
consumer so it starts consuming from the queue, and return the consumer.
*)
let start_consuming (self : _ t) bq = self.start_consuming bq
end
let register (self : _ #t) q = self#register q
let active (self : _ #t) = self#active ()
let start (self : _ #t) ~on_done = self#start ~on_done
let shutdown (self : _ #t) ~on_done = self#shutdown ~on_done