wip: exporter_queued, a queue + a consumer

This commit is contained in:
Simon Cruanes 2025-12-04 21:33:33 -05:00
parent 60d355ea23
commit 6ce1ebf9c7
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 75 additions and 12 deletions

View file

@ -47,15 +47,3 @@ let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t =
let tick ~now:_ = () in let tick ~now:_ = () in
let flush_and_close () = close self in let flush_and_close () = close self in
{ closed; enabled; emit; tick; flush_and_close } { closed; enabled; emit; tick; flush_and_close }
let logs_emitter (self : Any_resource.t t) : OTEL.Logger.t =
to_emitter self
|> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_logs_or_empty
let spans_emitter (self : Any_resource.t t) : OTEL.Tracer.t =
to_emitter self
|> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_spans_or_empty
let metrics_emitter (self : Any_resource.t t) : OTEL.Metrics_emitter.t =
to_emitter self
|> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_metrics_or_empty

20
src/client/consumer.ml Normal file
View file

@ -0,0 +1,20 @@
(** Consumer that accepts items from a bounded queue *)
(** A consumer for signals of type ['a] *)
class type ['a] t = object
method register : 'a Bounded_queue.t -> unit
method active : unit -> bool
method start : on_done:(unit -> unit) -> unit
method shutdown : on_done:(unit -> unit) -> unit
end
let register (self : _ #t) q = self#register q
let active (self : _ #t) = self#active ()
let start (self : _ #t) ~on_done = self#start ~on_done
let shutdown (self : _ #t) ~on_done = self#shutdown ~on_done

View file

@ -0,0 +1,55 @@
(** Build an exporter from a queue and a consumer *)
open Common_
module BQ = Bounded_queue
module BQ_emitters = struct
let logs_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t =
Bounded_queue.to_emitter q
|> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_logs_or_empty ?service_name ?attrs)
let spans_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t =
Bounded_queue.to_emitter q
|> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_spans_or_empty ?service_name ?attrs)
let metrics_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t =
Bounded_queue.to_emitter q
|> Opentelemetry_emitter.Emitter.flat_map
(Any_resource.of_metrics_or_empty ?service_name ?attrs)
end
(** Pair a queue with a consumer to build an exporter.
The resulting exporter will emit logs, spans, and traces directly into the
bounded queue; while the consumer takes them from the queue to forward them
somewhere else, store them, etc.
@param resource_attributes attributes added to every "resource" batch *)
let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
~(consumer : Any_resource.t Consumer.t) () : OTEL.Exporter.t =
let emit_spans =
BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q
in
let emit_logs = BQ_emitters.logs_emitter_of_bq ~attrs:resource_attributes q in
let emit_metrics =
BQ_emitters.metrics_emitter_of_bq ~attrs:resource_attributes q
in
let tick_set = Cb_set.create () in
let tick () = Cb_set.trigger tick_set in
let on_tick f = Cb_set.register tick_set f in
let closed = Atomic.make false in
let cleanup ~on_done () =
if not (Atomic.exchange closed true) then (
Bounded_queue.close q;
Consumer.shutdown consumer ~on_done
) else
on_done ()
in
{ emit_logs; emit_metrics; emit_spans; tick; on_tick; cleanup }