From 6ce1ebf9c71305f4c6df9cb52103d1e73d8348a0 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 4 Dec 2025 21:33:33 -0500 Subject: [PATCH] wip: exporter_queued, a queue + a consumer --- src/client/bounded_queue.ml | 12 -------- src/client/consumer.ml | 20 +++++++++++++ src/client/exporter_queued.ml | 55 +++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 12 deletions(-) create mode 100644 src/client/consumer.ml create mode 100644 src/client/exporter_queued.ml diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index b69a60a4..a8616aaa 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -47,15 +47,3 @@ let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = let tick ~now:_ = () in let flush_and_close () = close self in { closed; enabled; emit; tick; flush_and_close } - -let logs_emitter (self : Any_resource.t t) : OTEL.Logger.t = - to_emitter self - |> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_logs_or_empty - -let spans_emitter (self : Any_resource.t t) : OTEL.Tracer.t = - to_emitter self - |> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_spans_or_empty - -let metrics_emitter (self : Any_resource.t t) : OTEL.Metrics_emitter.t = - to_emitter self - |> Opentelemetry_emitter.Emitter.flat_map Any_resource.of_metrics_or_empty diff --git a/src/client/consumer.ml b/src/client/consumer.ml new file mode 100644 index 00000000..3cf92658 --- /dev/null +++ b/src/client/consumer.ml @@ -0,0 +1,20 @@ +(** Consumer that accepts items from a bounded queue *) + +(** A consumer for signals of type ['a] *) +class type ['a] t = object + method register : 'a Bounded_queue.t -> unit + + method active : unit -> bool + + method start : on_done:(unit -> unit) -> unit + + method shutdown : on_done:(unit -> unit) -> unit +end + +let register (self : _ #t) q = self#register q + +let active (self : _ #t) = self#active () + +let start (self : _ #t) ~on_done = self#start ~on_done + +let shutdown (self : _ #t) ~on_done = self#shutdown ~on_done diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml new file mode 100644 index 00000000..6ee9483e --- /dev/null +++ b/src/client/exporter_queued.ml @@ -0,0 +1,55 @@ +(** Build an exporter from a queue and a consumer *) + +open Common_ +module BQ = Bounded_queue + +module BQ_emitters = struct + let logs_emitter_of_bq ?service_name ?attrs + (q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t = + Bounded_queue.to_emitter q + |> Opentelemetry_emitter.Emitter.flat_map + (Any_resource.of_logs_or_empty ?service_name ?attrs) + + let spans_emitter_of_bq ?service_name ?attrs + (q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t = + Bounded_queue.to_emitter q + |> Opentelemetry_emitter.Emitter.flat_map + (Any_resource.of_spans_or_empty ?service_name ?attrs) + + let metrics_emitter_of_bq ?service_name ?attrs + (q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t = + Bounded_queue.to_emitter q + |> Opentelemetry_emitter.Emitter.flat_map + (Any_resource.of_metrics_or_empty ?service_name ?attrs) +end + +(** Pair a queue with a consumer to build an exporter. + + The resulting exporter will emit logs, spans, and traces directly into the + bounded queue; while the consumer takes them from the queue to forward them + somewhere else, store them, etc. + @param resource_attributes attributes added to every "resource" batch *) +let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) + ~(consumer : Any_resource.t Consumer.t) () : OTEL.Exporter.t = + let emit_spans = + BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q + in + let emit_logs = BQ_emitters.logs_emitter_of_bq ~attrs:resource_attributes q in + let emit_metrics = + BQ_emitters.metrics_emitter_of_bq ~attrs:resource_attributes q + in + + let tick_set = Cb_set.create () in + let tick () = Cb_set.trigger tick_set in + let on_tick f = Cb_set.register tick_set f in + + let closed = Atomic.make false in + + let cleanup ~on_done () = + if not (Atomic.exchange closed true) then ( + Bounded_queue.close q; + Consumer.shutdown consumer ~on_done + ) else + on_done () + in + { emit_logs; emit_metrics; emit_spans; tick; on_tick; cleanup }