mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-09 12:23:32 -04:00
refactor consumer
This commit is contained in:
parent
741de6cece
commit
45c5860fe4
1 changed files with 20 additions and 13 deletions
|
|
@ -1,20 +1,27 @@
|
||||||
(** Consumer that accepts items from a bounded queue *)
|
(** Consumer that accepts items from a bounded queue *)
|
||||||
|
|
||||||
|
type 'a t = {
|
||||||
|
active: unit -> bool; (** Still running? Must be fast and thread-safe *)
|
||||||
|
tick: unit -> unit;
|
||||||
|
(** Regularly called, eg to emit metrics, check timeouts, etc. Must be
|
||||||
|
thread safe. *)
|
||||||
|
shutdown: on_done:(unit -> unit) -> unit;
|
||||||
|
(** Shutdown the consumer as soon as possible, call [on_done()] once it's
|
||||||
|
done. *)
|
||||||
|
}
|
||||||
(** A consumer for signals of type ['a] *)
|
(** A consumer for signals of type ['a] *)
|
||||||
class type ['a] t = object
|
|
||||||
method register : 'a Bounded_queue.t -> unit
|
|
||||||
|
|
||||||
method active : unit -> bool
|
type 'a consumer = 'a t
|
||||||
|
|
||||||
method start : on_done:(unit -> unit) -> unit
|
let[@inline] active (self : _ t) = self.active ()
|
||||||
|
|
||||||
method shutdown : on_done:(unit -> unit) -> unit
|
let[@inline] shutdown (self : _ t) ~on_done = self.shutdown ~on_done
|
||||||
|
|
||||||
|
module Builder = struct
|
||||||
|
type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer }
|
||||||
|
(** A builder that will create a consumer for a given queue, start the
|
||||||
|
consumer so it starts consuming from the queue, and return the consumer.
|
||||||
|
*)
|
||||||
|
|
||||||
|
let start_consuming (self : _ t) bq = self.start_consuming bq
|
||||||
end
|
end
|
||||||
|
|
||||||
let register (self : _ #t) q = self#register q
|
|
||||||
|
|
||||||
let active (self : _ #t) = self#active ()
|
|
||||||
|
|
||||||
let start (self : _ #t) ~on_done = self#start ~on_done
|
|
||||||
|
|
||||||
let shutdown (self : _ #t) ~on_done = self#shutdown ~on_done
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue