From 61f17fa6ce66c767c926af09ae24a5f1762e7b7d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Dec 2025 11:25:50 -0500 Subject: [PATCH] better exporter/emitter combinators; better cleanup (now `shutdown`) --- .../opentelemetry_client_cohttp_eio.ml | 9 +++-- .../opentelemetry_client_ocurl_lwt.ml | 4 +- .../opentelemetry_client_ocurl.ml | 4 +- .../opentelemetry_client_ocurl.mli | 5 +++ src/client/exporter_add_batching.ml | 3 +- src/client/exporter_combine.ml | 35 ++++++++++++++++++ src/client/exporter_debug.ml | 37 ++++++++----------- src/client/exporter_queued.ml | 4 +- src/client/exporter_stdout.ml | 20 +++++++--- src/client/signal.ml | 15 ++++++-- src/core/exporter.ml | 12 +++--- src/emitter/emitter.ml | 17 +++++++++ src/lib/main_exporter.ml | 14 +++---- 13 files changed, 129 insertions(+), 50 deletions(-) create mode 100644 src/client/exporter_combine.ml diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 33e05830..07e17b9b 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -180,13 +180,16 @@ let create_exporter ?stop ?(config = Config.make ()) ~sw ~env () = let create_backend = create_exporter let setup_ ~sw ?stop ?config env : unit = - let backend = create_backend ?stop ?config ~sw ~env () in - Main_exporter.set backend + let exp = create_exporter ?stop ?config ~sw ~env () in + Main_exporter.set exp let setup ?stop ?config ?(enable = true) ~sw env = if enable then setup_ ~sw ?stop ?config env -let remove_exporter () = Main_exporter.remove ~on_done:ignore () +let remove_exporter () = + let p, waker = Eio.Promise.create () in + Main_exporter.remove () ~on_done:(fun () -> Eio.Promise.resolve waker ()); + Eio.Promise.await p let remove_backend = remove_exporter diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 50cd834b..8db204aa 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -98,11 +98,13 @@ let setup_ ?stop ?config () : unit = let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () -let remove_backend () : unit Lwt.t = +let remove_exporter () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in Main_exporter.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); done_fut +let remove_backend = remove_exporter + let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t = if enable then ( diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 8f06dbd0..de20c894 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -120,10 +120,12 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () : Thread.t) ) -let remove_backend () : unit = +let remove_exporter () : unit = (* we don't need the callback, this runs in the same thread *) OTEL.Main_exporter.remove () ~on_done:ignore +let remove_backend = remove_exporter + let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index cece89df..5ac4fdd9 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -24,6 +24,7 @@ val consumer : val create_exporter : ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +(** @since NEXT_RELEASE *) val create_backend : ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t @@ -40,7 +41,11 @@ val setup : an atomic boolean. When it becomes true, background threads will all stop after a little while. *) +val remove_exporter : unit -> unit +(** @since NEXT_RELEASE *) + val remove_backend : unit -> unit +[@@deprecated "use remove_exporter"] (** @since 0.12 *) val with_setup : diff --git a/src/client/exporter_add_batching.ml b/src/client/exporter_add_batching.ml index 0858d5b8..3f3adaf0 100644 --- a/src/client/exporter_add_batching.ml +++ b/src/client/exporter_add_batching.ml @@ -1,4 +1,4 @@ -(** Add batching to emitter based on client config *) +(** Add batching to emitters *) open Common_ @@ -9,6 +9,7 @@ open struct Batch.wrap_emitter b emitter end +(** Given an exporter, add batches for each emitter according to [config]. *) let add_batching ~(config : Client_config.t) (exp : OTEL.Exporter.t) : OTEL.Exporter.t = let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in diff --git a/src/client/exporter_combine.ml b/src/client/exporter_combine.ml new file mode 100644 index 00000000..86c5b100 --- /dev/null +++ b/src/client/exporter_combine.ml @@ -0,0 +1,35 @@ +open Common_ +open Opentelemetry_atomic + +open struct + let shutdown_l ~on_done:on_done_real (es : OTEL.Exporter.t list) : unit = + let missing = Atomic.make (List.length es) in + + let on_done () = + if Atomic.fetch_and_add missing (-1) = 1 then + (* we were the last exporter to shutdown, [missing] is now 0 *) + on_done_real () + in + + List.iter (OTEL.Exporter.shutdown ~on_done) es +end + +let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = + let open OTEL.Exporter in + if es = [] then + OTEL.Exporter.dummy () + else + { + emit_spans = + Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es); + emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es); + emit_metrics = + Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es); + on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es); + tick = (fun () -> List.iter tick es); + shutdown = (fun ~on_done () -> shutdown_l ~on_done es); + } + +(** [combine exp1 exp2] is the exporter that emits signals to both [exp1] and + [exp2]. *) +let combine exp1 exp2 : OTEL.Exporter.t = combine_l [ exp1; exp2 ] diff --git a/src/client/exporter_debug.ml b/src/client/exporter_debug.ml index 295becac..f6346b36 100644 --- a/src/client/exporter_debug.ml +++ b/src/client/exporter_debug.ml @@ -1,32 +1,27 @@ open Common_ open Opentelemetry_emitter -(** [debug exporter] behaves like [exporter], but will print signals on [stderr] - before passing them to [exporter] *) -let debug ?(out = Format.err_formatter) (exp : OTEL.Exporter.t) : - OTEL.Exporter.t = +(** [debug ?out ()] is an exporter that pretty-prints signals on [out]. + @param out the formatter into which to print, default [stderr]. *) +let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t = let open Proto in + let ticker = Cb_set.create () in { emit_spans = - Emitter.tap - (fun sp -> Format.fprintf out "SPAN: %a@." Trace.pp_span sp) - exp.emit_spans; + Emitter.make_simple () ~emit:(fun sp -> + List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp); emit_logs = - Emitter.tap - (fun log -> Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record log) - exp.emit_logs; + Emitter.make_simple () ~emit:(fun log -> + List.iter + (Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record) + log); emit_metrics = - Emitter.tap - (fun m -> Format.fprintf out "METRIC: %a@." Metrics.pp_metric m) - exp.emit_metrics; - on_tick = exp.on_tick; - tick = exp.tick; - cleanup = + Emitter.make_simple () ~emit:(fun m -> + List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) m); + on_tick = Cb_set.register ticker; + tick = (fun () -> Cb_set.trigger ticker); + shutdown = (fun ~on_done () -> Format.fprintf out "CLEANUP@."; - exp.cleanup ~on_done ()); + on_done ()); } - -(** Exporter that simply debugs on [stderr] *) -let debug_only : OTEL.Exporter.t = - debug ~out:Format.err_formatter @@ OTEL.Exporter.dummy () diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 55304959..7db97e09 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -47,11 +47,11 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) let consumer = consumer.start_consuming q in - let cleanup ~on_done () = + let shutdown ~on_done () = if not (Atomic.exchange closed true) then ( Bounded_queue.close q; Consumer.shutdown consumer ~on_done ) else on_done () in - { emit_logs; emit_metrics; emit_spans; tick; on_tick; cleanup } + { emit_logs; emit_metrics; emit_spans; tick; on_tick; shutdown } diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index 76dbf760..e3abee08 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -51,15 +51,25 @@ let stdout : OTEL.Exporter.t = Format.pp_print_flush out () in let closed () = Atomic.get closed in - { Emitter.emit; closed; enabled; tick; flush_and_close } in + let emit_spans = mk_emitter pp_span in + let emit_logs = mk_emitter Proto.Logs.pp_log_record in + let emit_metrics = mk_emitter Proto.Metrics.pp_metric in + + let shutdown ~on_done () = + Emitter.flush_and_close emit_spans; + Emitter.flush_and_close emit_logs; + Emitter.flush_and_close emit_metrics; + on_done () + in + { - emit_spans = mk_emitter pp_span; - emit_logs = mk_emitter Proto.Logs.pp_log_record; - emit_metrics = mk_emitter Proto.Metrics.pp_metric; + emit_spans; + emit_logs; + emit_metrics; on_tick = Cb_set.register ticker; tick; - cleanup = (fun ~on_done () -> on_done ()); + shutdown; } diff --git a/src/client/signal.ml b/src/client/signal.ml index cde963de..92dfce2c 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -46,10 +46,17 @@ module Encode = struct | None -> Pbrt.Encoder.create () in let x = ctor resource in - let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in - enc x encoder; - let data = Pbrt.Encoder.to_string encoder in - Pbrt.Encoder.reset encoder; + let data = + let@ _sc = + Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" + in + enc x encoder; + let data = Pbrt.Encoder.to_string encoder in + Span.add_attrs _sc [ "size", `Int (String.length data) ]; + Pbrt.Encoder.reset encoder; + data + in + data let logs ?encoder resource_logs = diff --git a/src/core/exporter.ml b/src/core/exporter.ml index 621ea5b9..e9c2395d 100644 --- a/src/core/exporter.ml +++ b/src/core/exporter.ml @@ -17,11 +17,11 @@ type t = { tick: unit -> unit; (** Call all the callbacks registered with [on_tick]. Should be triggered regularly for background processing, timeout checks, etc. *) - cleanup: on_done:(unit -> unit) -> unit -> unit; - (** [cleanup ~on_done ()] is called when the exporter is shut down, and is + shutdown: on_done:(unit -> unit) -> unit -> unit; + (** [shutdown ~on_done ()] is called when the exporter is shut down, and is responsible for sending remaining batches, flushing sockets, etc. @param on_done - callback invoked after the cleanup is done. @since 0.12 *) + callback invoked after the shutdown is done. @since 0.12 *) } (** Main exporter interface. *) @@ -34,7 +34,7 @@ let dummy () : t = emit_logs = Emitter.dummy; on_tick = Cb_set.register ticker; tick = (fun () -> Cb_set.trigger ticker); - cleanup = (fun ~on_done () -> on_done ()); + shutdown = (fun ~on_done () -> on_done ()); } let[@inline] send_trace (self : t) (l : Proto.Trace.span list) = @@ -61,4 +61,6 @@ let tick (self : t) = self.tick (); () -let[@inline] cleanup (self : t) ~on_done : unit = self.cleanup ~on_done () +let[@inline] shutdown (self : t) ~on_done : unit = self.shutdown ~on_done () + +let (cleanup [@deprecated "use shutdown instead"]) = shutdown diff --git a/src/emitter/emitter.ml b/src/emitter/emitter.ml index a940fc43..f0a23248 100644 --- a/src/emitter/emitter.ml +++ b/src/emitter/emitter.ml @@ -56,6 +56,23 @@ let tap (f : 'a -> unit) (self : 'a t) : 'a t = in { self with emit } +(** [make_simple ~emit ()] is an emitter that calls [emit]. *) +let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () : + _ t = + let tick = + match tick with + | None -> fun ~now:_ -> () + | Some f -> f + in + let closed, enabled = + match closed, enabled with + | None, None -> (fun () -> false), fun () -> true + | Some f, None -> f, fun () -> not (f ()) + | None, Some f -> (fun () -> not (f ())), f + | Some f1, Some f2 -> f1, f2 + in + { tick; emit; flush_and_close; closed; enabled } + (** Dummy emitter, doesn't accept or emit anything. *) let dummy : _ t = { diff --git a/src/lib/main_exporter.ml b/src/lib/main_exporter.ml index 6a5284bc..22477d3f 100644 --- a/src/lib/main_exporter.ml +++ b/src/lib/main_exporter.ml @@ -13,13 +13,13 @@ open struct end (** Remove current exporter, if any. - @param on_done see {!t#cleanup}, @since 0.12 *) + @param on_done see {!Main_exporter.shutdown}, @since 0.12 *) let remove ~on_done () : unit = match Atomic.exchange exporter None with - | None -> () + | None -> on_done () | Some exp -> tick exp; - cleanup exp ~on_done + shutdown exp ~on_done (** Is there a configured exporter? *) let present () : bool = Option.is_some (Atomic.get exporter) @@ -83,8 +83,8 @@ let dynamic_forward_to_main_exporter : Exporter.t = | None -> () | Some exp -> exp.tick () in - let cleanup ~on_done () = on_done () in - { Exporter.emit_metrics; emit_spans; emit_logs; on_tick; tick; cleanup } + let shutdown ~on_done () = on_done () in + { Exporter.emit_metrics; emit_spans; emit_logs; on_tick; tick; shutdown } (** Set the global exporter *) let set (exp : t) : unit = @@ -110,6 +110,6 @@ let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f = if enable then ( set exp; - Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f + Fun.protect f ~finally:(fun () -> shutdown exp ~on_done) ) else - f () + Fun.protect f ~finally:(fun () -> on_done ())