add Any_signal_l; modify Exporter to use Aswitch

This commit is contained in:
Simon Cruanes 2025-12-08 15:35:36 -05:00
parent ee91fa4a45
commit 18b653a896
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 36 additions and 7 deletions

20
src/core/any_signal_l.ml Normal file
View file

@ -0,0 +1,20 @@
(** Any kind of lists of signals *)
open Common_
type t =
| Spans of Span.t list
| Metrics of Metrics.t list
| Logs of Log_record.t list
open struct
let pp_sep out () = Format.fprintf out ";@ "
let pp_list ppx out l =
Format.fprintf out "[@[%a@]]" (Format.pp_print_list ~pp_sep ppx) l
end
let pp out = function
| Spans s -> pp_list Proto.Trace.pp_span out s
| Metrics m -> pp_list Proto.Metrics.pp_metric out m
| Logs l -> pp_list Proto.Logs.pp_log_record out l

View file

@ -10,6 +10,8 @@ open Common_
open Opentelemetry_emitter
type t = {
active: unit -> Aswitch.t;
(** Is the exporer currently active? After shutdown this is turned off. *)
emit_spans: Proto.Trace.span Emitter.t;
emit_metrics: Proto.Metrics.metric Emitter.t;
emit_logs: Proto.Logs.log_record Emitter.t;
@ -17,24 +19,26 @@ type t = {
tick: unit -> unit;
(** Call all the callbacks registered with [on_tick]. Should be triggered
regularly for background processing, timeout checks, etc. *)
shutdown: on_done:(unit -> unit) -> unit -> unit;
(** [shutdown ~on_done ()] is called when the exporter is shut down, and is
responsible for sending remaining batches, flushing sockets, etc.
@param on_done
callback invoked after the shutdown is done. @since 0.12 *)
shutdown: unit -> unit;
(** [shutdown ()] is called when the exporter is shut down, and is
responsible for sending remaining batches, flushing sockets, etc. To
know when shutdown is complete, register callbacks on [active].
@since 0.12 *)
}
(** Main exporter interface. *)
(** Dummy exporter, does nothing *)
let dummy () : t =
let ticker = Cb_set.create () in
let active, trigger = Aswitch.create () in
{
active = (fun () -> active);
emit_spans = Emitter.dummy;
emit_metrics = Emitter.dummy;
emit_logs = Emitter.dummy;
on_tick = Cb_set.register ticker;
tick = (fun () -> Cb_set.trigger ticker);
shutdown = (fun ~on_done () -> on_done ());
shutdown = (fun () -> Aswitch.turn_off trigger);
}
let[@inline] send_trace (self : t) (l : Proto.Trace.span list) =
@ -61,6 +65,11 @@ let tick (self : t) =
self.tick ();
()
let[@inline] shutdown (self : t) ~on_done : unit = self.shutdown ~on_done ()
let[@inline] active self : Aswitch.t = self.active ()
(** [on_stop e f] calls [f()] when [e] stops, or now if it's already stopped *)
let on_stop self f = Aswitch.on_turn_off (self.active ()) f
let[@inline] shutdown (self : t) : unit = self.shutdown ()
let (cleanup [@deprecated "use shutdown instead"]) = shutdown