mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
49 lines
1.5 KiB
OCaml
49 lines
1.5 KiB
OCaml
(** Consumer that accepts items from a bounded queue *)
|
|
|
|
open Common_
|
|
|
|
type t = {
|
|
active: unit -> Aswitch.t;
|
|
shutdown: unit -> unit;
|
|
(** Shutdown the consumer as soon as possible. [active] will be turned off
|
|
once the consumer is fully shut down. *)
|
|
tick: unit -> unit;
|
|
(** Regularly called, eg to emit metrics, check timeouts, etc. Must be
|
|
thread safe. *)
|
|
self_metrics: clock:OTEL.Clock.t -> unit -> OTEL.Metrics.t list;
|
|
(** Self observing metrics *)
|
|
}
|
|
(** A consumer for signals of type ['a] *)
|
|
|
|
type consumer = t
|
|
|
|
let[@inline] active (self : t) : Aswitch.t = self.active ()
|
|
|
|
let[@inline] shutdown (self : t) : unit = self.shutdown ()
|
|
|
|
let[@inline] self_metrics ~clock self : _ list = self.self_metrics ~clock ()
|
|
|
|
(** [on_stop e f] calls [f()] when [e] stops, or now if it's already stopped *)
|
|
let on_stop self f = Aswitch.on_turn_off (self.active ()) f
|
|
|
|
module Builder = struct
|
|
type 'a t = { start_consuming: 'a Bounded_queue.Recv.t -> consumer }
|
|
(** A builder that will create a consumer for a given queue, start the
|
|
consumer so it starts consuming from the queue, and return the consumer.
|
|
*)
|
|
|
|
let start_consuming (self : _ t) bq = self.start_consuming bq
|
|
|
|
let map (type a b) (f : a -> b) (self : b t) : a t =
|
|
{
|
|
start_consuming =
|
|
(fun q ->
|
|
let q = Bounded_queue.Recv.map f q in
|
|
self.start_consuming q);
|
|
}
|
|
end
|
|
|
|
type any_signal_l_builder = OTEL.Any_signal_l.t Builder.t
|
|
|
|
type resource_signal_builder = Resource_signal.t Builder.t
|
|
(** The type that's useful for HTTP backends *)
|