feat lib: dynamic forward to main; improve Main_exporter

This commit is contained in:
Simon Cruanes 2025-12-04 14:16:32 -05:00
parent 35f8bbc67d
commit c4e8f8c39b
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 145 additions and 125 deletions

View file

@ -12,6 +12,15 @@ let dummy () : t = Emitter.dummy ()
let enabled = Emitter.enabled
let emit = Emitter.emit
let of_exporter (exp : Exporter.t) : t = exp.emit_logs
let emit ?attrs:_ (logs : Log_record.t list) : unit =
match Main_exporter.get () with
| None -> ()
| Some exp -> Exporter.send_logs exp logs
[@@deprecated "use an explicit Logger.t"]
(** An emitter that uses the current {!Main_exporter} *)
let dynamic_forward_to_main_exporter : t =
Main_exporter.Util.dynamic_forward_to_main_exporter () ~get_emitter:(fun e ->
e.emit_logs)

115
src/lib/main_exporter.ml Normal file
View file

@ -0,0 +1,115 @@
(** Main exporter, used by the main tracing functions.
It is better to pass an explicit exporter when possible. *)
open Exporter
(* hidden *)
open struct
(* a list of callbacks automatically added to the main exporter *)
let on_tick_cbs_ = Alist.make ()
let exporter : t option Atomic.t = Atomic.make None
end
(** Remove current exporter, if any.
@param on_done see {!t#cleanup}, @since 0.12 *)
let remove ~on_done () : unit =
match Atomic.exchange exporter None with
| None -> ()
| Some exp ->
tick exp;
cleanup exp ~on_done
(** Is there a configured exporter? *)
let present () : bool = Option.is_some (Atomic.get exporter)
(** Current exporter, if any *)
let[@inline] get () : t option = Atomic.get exporter
let add_on_tick_callback f =
Alist.add on_tick_cbs_ f;
Option.iter (fun exp -> on_tick exp f) (get ())
module Util = struct
open Opentelemetry_emitter
(** An emitter that uses the current main *)
let dynamic_forward_to_main_exporter ~get_emitter () : _ Emitter.t =
let enabled () = present () in
let closed () = not (enabled ()) in
let flush_and_close () = () in
let tick ~now:_ =
match get () with
| None -> ()
| Some exp -> Exporter.tick exp
in
let emit signals =
if signals <> [] then (
match get () with
| None -> ()
| Some exp ->
let emitter = get_emitter exp in
Emitter.emit emitter signals
)
in
{ Emitter.enabled; closed; emit; tick; flush_and_close }
end
(** This exporter uses the current "main exporter" using [get()] at every
invocation. It is useful as a fallback or to port existing applications that
expect a global singleton backend^W exporter.
@since NEXT_RELEASE *)
let dynamic_forward_to_main_exporter : Exporter.t =
let open Exporter in
let emit_logs =
Util.dynamic_forward_to_main_exporter () ~get_emitter:(fun e -> e.emit_logs)
in
let emit_metrics =
Util.dynamic_forward_to_main_exporter () ~get_emitter:(fun e ->
e.emit_metrics)
in
let emit_spans =
Util.dynamic_forward_to_main_exporter () ~get_emitter:(fun e ->
e.emit_spans)
in
let on_tick f =
match get () with
| None -> ()
| Some exp -> Exporter.on_tick exp f
in
let tick () =
match get () with
| None -> ()
| Some exp -> exp.tick ()
in
let cleanup ~on_done () = on_done () in
{ Exporter.emit_metrics; emit_spans; emit_logs; on_tick; tick; cleanup }
(** Set the global exporter *)
let set (exp : t) : unit =
(* sanity check! this specific exporter would just call itself, leading to
stack overflow. *)
if exp == dynamic_forward_to_main_exporter then
failwith
"cannot set Main_exporter.dynamic_forward_to_main_exporter as main \
exporter!";
List.iter (on_tick exp) (Alist.get on_tick_cbs_);
Atomic.set exporter (Some exp)
let (set_backend [@deprecated "use `Main_exporter.set`"]) = set
let (remove_backend [@deprecated "use `Main_exporter.remove`"]) = remove
let (has_backend [@deprecated "use `Main_exporter.present`"]) = present
let (get_backend [@deprecated "use `Main_exporter.get"]) = get
let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f
=
if enable then (
set exp;
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f
) else
f ()

View file

@ -6,14 +6,17 @@ let dummy () : t = Emitter.dummy ()
let enabled = Emitter.enabled
let emit = Emitter.emit
let of_exporter (exp : Exporter.t) : t = exp.emit_metrics
(** Emit some metrics to the collector (sync). This blocks until the backend has
pushed the metrics into some internal queue, or discarded them. *)
let emit ?attrs:_ (l : Metrics.t list) : unit =
match Exporter.Main_exporter.get () with
let (emit [@deprecated "use an explicit Metrics_emitter.t"]) =
fun ?attrs:_ (l : Metrics.t list) : unit ->
match Main_exporter.get () with
| None -> ()
| Some exp -> Exporter.send_metrics exp l
[@@deprecated "use an explicit Metrics_emitter.t"]
(** An emitter that uses the current {!Main_exporter} *)
let dynamic_forward_to_main_exporter : t =
Main_exporter.Util.dynamic_forward_to_main_exporter () ~get_emitter:(fun e ->
e.emit_metrics)

View file

@ -21,7 +21,13 @@ module Timestamp_ns = Timestamp_ns
(** {2 Export signals to some external collector.} *)
module Exporter = Exporter
module Collector = Exporter [@@deprecated "Use 'Exporter' instead"]
module Main_exporter = Main_exporter
module Collector = struct
include Exporter
include Main_exporter
end
[@@deprecated "Use 'Exporter' instead"]
(** {2 Identifiers} *)

View file

@ -18,120 +18,7 @@ type t = Span.t Emitter.t
(** Dummy tracer, always disabled *)
let dummy () : t = Emitter.dummy ()
(** A tracer that uses {!Exporter.Main_exporter} *)
let simple_main_exporter : t =
let enabled () = Exporter.Main_exporter.present () in
let closed () = not (enabled ()) in
let flush_and_close () = () in
let tick ~now:_ =
match Exporter.Main_exporter.get () with
| None -> ()
| Some exp -> Exporter.tick exp
in
let emit spans =
if spans <> [] then (
match Exporter.Main_exporter.get () with
| None -> ()
| Some exp -> Exporter.send_trace exp spans
)
in
{ Emitter.enabled; closed; emit; tick; flush_and_close }
(** Directly emit to the main exporter.
{b NOTE} be careful not to call this inside a Gc alarm, as it can cause
deadlocks. *)
let (emit [@deprecated "use an explicit tracer"]) =
fun ?service_name:_ ?attrs:_ (spans : span list) : unit ->
match Exporter.Main_exporter.get () with
| None -> ()
| Some exp -> Exporter.send_trace exp spans
let (add_event [@deprecated "use Span.add_event"]) = Span.add_event
let (add_attrs [@deprecated "use Span.add_attrs"]) = Span.add_attrs
let with_thunk_and_finally ?(tracer = simple_main_exporter)
?(force_new_trace_id = false) ?trace_state
?(attrs : (string * [< Value.t ]) list = []) ?kind ?trace_id ?parent ?links
name cb =
let parent =
match parent with
| Some _ -> parent
| None -> Ambient_span.get ()
in
let trace_id =
match trace_id, parent with
| _ when force_new_trace_id -> Trace_id.create ()
| Some trace_id, _ -> trace_id
| None, Some p -> Span.trace_id p
| None, None -> Trace_id.create ()
in
let start_time = Timestamp_ns.now_unix_ns () in
let span_id = Span_id.create () in
let parent_id = Option.map Span.id parent in
let span : Span.t =
Span.make ?trace_state ?kind ?parent:parent_id ~trace_id ~id:span_id ~attrs
?links ~start_time ~end_time:start_time name
in
(* called once we're done, to emit a span *)
let finally res =
let end_time = Timestamp_ns.now_unix_ns () in
Proto.Trace.span_set_end_time_unix_nano span end_time;
(match Span.status span with
| Some _ -> ()
| None ->
(match res with
| Ok () ->
(* By default, all spans are Unset, which means a span completed without error.
The Ok status is reserved for when you need to explicitly mark a span as successful
rather than stick with the default of Unset (i.e., without error).
https://opentelemetry.io/docs/languages/go/instrumentation/#set-span-status *)
()
| Error (e, bt) ->
Span.record_exception span e bt;
let status =
make_status ~code:Status_code_error ~message:(Printexc.to_string e) ()
in
Span.set_status span status));
Emitter.emit tracer [ span ]
in
let thunk () = Ambient_span.with_ambient span (fun () -> cb span) in
thunk, finally
(** Sync span guard.
Notably, this includes {e implicit} scope-tracking: if called without a
[~scope] argument (or [~parent]/[~trace_id]), it will check in the
{!Ambient_context} for a surrounding environment, and use that as the scope.
Similarly, it uses {!Scope.with_ambient_scope} to {e set} a new scope in the
ambient context, so that any logically-nested calls to {!with_} will use
this span as their parent.
{b NOTE} be careful not to call this inside a Gc alarm, as it can cause
deadlocks.
@param force_new_trace_id
if true (default false), the span will not use a ambient scope, the
[~scope] argument, nor [~trace_id], but will instead always create fresh
identifiers for this span *)
let with_ ?tracer ?force_new_trace_id ?trace_state ?attrs ?kind ?trace_id
?parent ?links name (cb : Span.t -> 'a) : 'a =
let thunk, finally =
with_thunk_and_finally ?tracer ?force_new_trace_id ?trace_state ?attrs ?kind
?trace_id ?parent ?links name cb
in
try
let rv = thunk () in
finally (Ok ());
rv
with e ->
let bt = Printexc.get_raw_backtrace () in
finally (Error (e, bt));
raise e
(** A tracer that uses the current {!Main_exporter} *)
let dynamic_forward_to_main_exporter : t =
Main_exporter.Util.dynamic_forward_to_main_exporter () ~get_emitter:(fun e ->
e.emit_spans)