better exporter/emitter combinators; better cleanup (now shutdown)

This commit is contained in:
Simon Cruanes 2025-12-08 11:25:50 -05:00
parent 25afa2085c
commit 61f17fa6ce
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
13 changed files with 129 additions and 50 deletions

View file

@ -180,13 +180,16 @@ let create_exporter ?stop ?(config = Config.make ()) ~sw ~env () =
let create_backend = create_exporter let create_backend = create_exporter
let setup_ ~sw ?stop ?config env : unit = let setup_ ~sw ?stop ?config env : unit =
let backend = create_backend ?stop ?config ~sw ~env () in let exp = create_exporter ?stop ?config ~sw ~env () in
Main_exporter.set backend Main_exporter.set exp
let setup ?stop ?config ?(enable = true) ~sw env = let setup ?stop ?config ?(enable = true) ~sw env =
if enable then setup_ ~sw ?stop ?config env if enable then setup_ ~sw ?stop ?config env
let remove_exporter () = Main_exporter.remove ~on_done:ignore () let remove_exporter () =
let p, waker = Eio.Promise.create () in
Main_exporter.remove () ~on_done:(fun () -> Eio.Promise.resolve waker ());
Eio.Promise.await p
let remove_backend = remove_exporter let remove_backend = remove_exporter

View file

@ -98,11 +98,13 @@ let setup_ ?stop ?config () : unit =
let setup ?stop ?config ?(enable = true) () = let setup ?stop ?config ?(enable = true) () =
if enable then setup_ ?stop ?config () if enable then setup_ ?stop ?config ()
let remove_backend () : unit Lwt.t = let remove_exporter () : unit Lwt.t =
let done_fut, done_u = Lwt.wait () in let done_fut, done_u = Lwt.wait () in
Main_exporter.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); Main_exporter.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) ();
done_fut done_fut
let remove_backend = remove_exporter
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
= =
if enable then ( if enable then (

View file

@ -120,10 +120,12 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
: Thread.t) : Thread.t)
) )
let remove_backend () : unit = let remove_exporter () : unit =
(* we don't need the callback, this runs in the same thread *) (* we don't need the callback, this runs in the same thread *)
OTEL.Main_exporter.remove () ~on_done:ignore OTEL.Main_exporter.remove () ~on_done:ignore
let remove_backend = remove_exporter
let setup ?stop ?config ?(enable = true) () = let setup ?stop ?config ?(enable = true) () =
if enable then setup_ ?stop ?config () if enable then setup_ ?stop ?config ()

View file

@ -24,6 +24,7 @@ val consumer :
val create_exporter : val create_exporter :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
(** @since NEXT_RELEASE *)
val create_backend : val create_backend :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
@ -40,7 +41,11 @@ val setup :
an atomic boolean. When it becomes true, background threads will all stop an atomic boolean. When it becomes true, background threads will all stop
after a little while. *) after a little while. *)
val remove_exporter : unit -> unit
(** @since NEXT_RELEASE *)
val remove_backend : unit -> unit val remove_backend : unit -> unit
[@@deprecated "use remove_exporter"]
(** @since 0.12 *) (** @since 0.12 *)
val with_setup : val with_setup :

View file

@ -1,4 +1,4 @@
(** Add batching to emitter based on client config *) (** Add batching to emitters *)
open Common_ open Common_
@ -9,6 +9,7 @@ open struct
Batch.wrap_emitter b emitter Batch.wrap_emitter b emitter
end end
(** Given an exporter, add batches for each emitter according to [config]. *)
let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) : let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) :
OTEL.Exporter.t = OTEL.Exporter.t =
let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in

View file

@ -0,0 +1,35 @@
open Common_
open Opentelemetry_atomic
open struct
let shutdown_l ~on_done:on_done_real (es : OTEL.Exporter.t list) : unit =
let missing = Atomic.make (List.length es) in
let on_done () =
if Atomic.fetch_and_add missing (-1) = 1 then
(* we were the last exporter to shutdown, [missing] is now 0 *)
on_done_real ()
in
List.iter (OTEL.Exporter.shutdown ~on_done) es
end
let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t =
let open OTEL.Exporter in
if es = [] then
OTEL.Exporter.dummy ()
else
{
emit_spans =
Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es);
emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es);
emit_metrics =
Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es);
on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es);
tick = (fun () -> List.iter tick es);
shutdown = (fun ~on_done () -> shutdown_l ~on_done es);
}
(** [combine exp1 exp2] is the exporter that emits signals to both [exp1] and
[exp2]. *)
let combine exp1 exp2 : OTEL.Exporter.t = combine_l [ exp1; exp2 ]

View file

@ -1,32 +1,27 @@
open Common_ open Common_
open Opentelemetry_emitter open Opentelemetry_emitter
(** [debug exporter] behaves like [exporter], but will print signals on [stderr] (** [debug ?out ()] is an exporter that pretty-prints signals on [out].
before passing them to [exporter] *) @param out the formatter into which to print, default [stderr]. *)
let debug ?(out = Format.err_formatter) (exp : OTEL.Exporter.t) : let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t =
OTEL.Exporter.t =
let open Proto in let open Proto in
let ticker = Cb_set.create () in
{ {
emit_spans = emit_spans =
Emitter.tap Emitter.make_simple () ~emit:(fun sp ->
(fun sp -> Format.fprintf out "SPAN: %a@." Trace.pp_span sp) List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp);
exp.emit_spans;
emit_logs = emit_logs =
Emitter.tap Emitter.make_simple () ~emit:(fun log ->
(fun log -> Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record log) List.iter
exp.emit_logs; (Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record)
log);
emit_metrics = emit_metrics =
Emitter.tap Emitter.make_simple () ~emit:(fun m ->
(fun m -> Format.fprintf out "METRIC: %a@." Metrics.pp_metric m) List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) m);
exp.emit_metrics; on_tick = Cb_set.register ticker;
on_tick = exp.on_tick; tick = (fun () -> Cb_set.trigger ticker);
tick = exp.tick; shutdown =
cleanup =
(fun ~on_done () -> (fun ~on_done () ->
Format.fprintf out "CLEANUP@."; Format.fprintf out "CLEANUP@.";
exp.cleanup ~on_done ()); on_done ());
} }
(** Exporter that simply debugs on [stderr] *)
let debug_only : OTEL.Exporter.t =
debug ~out:Format.err_formatter @@ OTEL.Exporter.dummy ()

View file

@ -47,11 +47,11 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
let consumer = consumer.start_consuming q in let consumer = consumer.start_consuming q in
let cleanup ~on_done () = let shutdown ~on_done () =
if not (Atomic.exchange closed true) then ( if not (Atomic.exchange closed true) then (
Bounded_queue.close q; Bounded_queue.close q;
Consumer.shutdown consumer ~on_done Consumer.shutdown consumer ~on_done
) else ) else
on_done () on_done ()
in in
{ emit_logs; emit_metrics; emit_spans; tick; on_tick; cleanup } { emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown }

View file

@ -51,15 +51,25 @@ let stdout : OTEL.Exporter.t =
Format.pp_print_flush out () Format.pp_print_flush out ()
in in
let closed () = Atomic.get closed in let closed () = Atomic.get closed in
{ Emitter.emit; closed; enabled; tick; flush_and_close } { Emitter.emit; closed; enabled; tick; flush_and_close }
in in
let emit_spans = mk_emitter pp_span in
let emit_logs = mk_emitter Proto.Logs.pp_log_record in
let emit_metrics = mk_emitter Proto.Metrics.pp_metric in
let shutdown ~on_done () =
Emitter.flush_and_close emit_spans;
Emitter.flush_and_close emit_logs;
Emitter.flush_and_close emit_metrics;
on_done ()
in
{ {
emit_spans = mk_emitter pp_span; emit_spans;
emit_logs = mk_emitter Proto.Logs.pp_log_record; emit_logs;
emit_metrics = mk_emitter Proto.Metrics.pp_metric; emit_metrics;
on_tick = Cb_set.register ticker; on_tick = Cb_set.register ticker;
tick; tick;
cleanup = (fun ~on_done () -> on_done ()); shutdown;
} }

View file

@ -46,10 +46,17 @@ module Encode = struct
| None -> Pbrt.Encoder.create () | None -> Pbrt.Encoder.create ()
in in
let x = ctor resource in let x = ctor resource in
let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in let data =
enc x encoder; let@ _sc =
let data = Pbrt.Encoder.to_string encoder in Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto"
Pbrt.Encoder.reset encoder; in
enc x encoder;
let data = Pbrt.Encoder.to_string encoder in
Span.add_attrs _sc [ "size", `Int (String.length data) ];
Pbrt.Encoder.reset encoder;
data
in
data data
let logs ?encoder resource_logs = let logs ?encoder resource_logs =

View file

@ -17,11 +17,11 @@ type t = {
tick: unit -> unit; tick: unit -> unit;
(** Call all the callbacks registered with [on_tick]. Should be triggered (** Call all the callbacks registered with [on_tick]. Should be triggered
regularly for background processing, timeout checks, etc. *) regularly for background processing, timeout checks, etc. *)
cleanup: on_done:(unit -> unit) -> unit -> unit; shutdown: on_done:(unit -> unit) -> unit -> unit;
(** [cleanup ~on_done ()] is called when the exporter is shut down, and is (** [shutdown ~on_done ()] is called when the exporter is shut down, and is
responsible for sending remaining batches, flushing sockets, etc. responsible for sending remaining batches, flushing sockets, etc.
@param on_done @param on_done
callback invoked after the cleanup is done. @since 0.12 *) callback invoked after the shutdown is done. @since 0.12 *)
} }
(** Main exporter interface. *) (** Main exporter interface. *)
@ -34,7 +34,7 @@ let dummy () : t =
emit_logs = Emitter.dummy; emit_logs = Emitter.dummy;
on_tick = Cb_set.register ticker; on_tick = Cb_set.register ticker;
tick = (fun () -> Cb_set.trigger ticker); tick = (fun () -> Cb_set.trigger ticker);
cleanup = (fun ~on_done () -> on_done ()); shutdown = (fun ~on_done () -> on_done ());
} }
let[@inline] send_trace (self : t) (l : Proto.Trace.span list) = let[@inline] send_trace (self : t) (l : Proto.Trace.span list) =
@ -61,4 +61,6 @@ let tick (self : t) =
self.tick (); self.tick ();
() ()
let[@inline] cleanup (self : t) ~on_done : unit = self.cleanup ~on_done () let[@inline] shutdown (self : t) ~on_done : unit = self.shutdown ~on_done ()
let (cleanup [@deprecated "use shutdown instead"]) = shutdown

View file

@ -56,6 +56,23 @@ let tap (f : 'a -> unit) (self : 'a t) : 'a t =
in in
{ self with emit } { self with emit }
(** [make_simple ~emit ()] is an emitter that calls [emit]. *)
let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () :
_ t =
let tick =
match tick with
| None -> fun ~now:_ -> ()
| Some f -> f
in
let closed, enabled =
match closed, enabled with
| None, None -> (fun () -> false), fun () -> true
| Some f, None -> f, fun () -> not (f ())
| None, Some f -> (fun () -> not (f ())), f
| Some f1, Some f2 -> f1, f2
in
{ tick; emit; flush_and_close; closed; enabled }
(** Dummy emitter, doesn't accept or emit anything. *) (** Dummy emitter, doesn't accept or emit anything. *)
let dummy : _ t = let dummy : _ t =
{ {

View file

@ -13,13 +13,13 @@ open struct
end end
(** Remove current exporter, if any. (** Remove current exporter, if any.
@param on_done see {!t#cleanup}, @since 0.12 *) @param on_done see {!Main_exporter.shutdown}, @since 0.12 *)
let remove ~on_done () : unit = let remove ~on_done () : unit =
match Atomic.exchange exporter None with match Atomic.exchange exporter None with
| None -> () | None -> on_done ()
| Some exp -> | Some exp ->
tick exp; tick exp;
cleanup exp ~on_done shutdown exp ~on_done
(** Is there a configured exporter? *) (** Is there a configured exporter? *)
let present () : bool = Option.is_some (Atomic.get exporter) let present () : bool = Option.is_some (Atomic.get exporter)
@ -83,8 +83,8 @@ let dynamic_forward_to_main_exporter : Exporter.t =
| None -> () | None -> ()
| Some exp -> exp.tick () | Some exp -> exp.tick ()
in in
let cleanup ~on_done () = on_done () in let shutdown ~on_done () = on_done () in
{ Exporter.emit_metrics; emit_spans; emit_logs; on_tick; tick; cleanup } { Exporter.emit_metrics; emit_spans; emit_logs; on_tick; tick; shutdown }
(** Set the global exporter *) (** Set the global exporter *)
let set (exp : t) : unit = let set (exp : t) : unit =
@ -110,6 +110,6 @@ let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f
= =
if enable then ( if enable then (
set exp; set exp;
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f Fun.protect f ~finally:(fun () -> shutdown exp ~on_done)
) else ) else
f () Fun.protect f ~finally:(fun () -> on_done ())