diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index 89a0ff30..e6ad93ab 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -106,17 +106,20 @@ module Send = struct (** Turn the writing end of the queue into an emitter. @param close_queue_on_close if true, closing the emitter will close the queue *) - let to_emitter ~close_queue_on_close (self : 'a t) : + let to_emitter ~signal_name ~close_queue_on_close (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = let closed () = closed self in let enabled () = not (closed ()) in let emit x = if x <> [] then push self x in let tick ~mtime:_ = () in + (* the exporter will emit these, the queue is shared *) + let self_metrics ~now:_ () = [] in + (* NOTE: we cannot actually flush, only close. Emptying the queue is fundamentally asynchronous because it's done by consumers *) let flush_and_close () = if close_queue_on_close then close self in - { closed; enabled; emit; tick; flush_and_close } + { signal_name; closed; enabled; emit; tick; flush_and_close; self_metrics } end type 'a t = { diff --git a/src/client/emitter_batch.ml b/src/client/emitter_batch.ml index bd2106d0..6c8d0f36 100644 --- a/src/client/emitter_batch.ml +++ b/src/client/emitter_batch.ml @@ -12,8 +12,18 @@ let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t = then [e] itself will be closed. *) let closed_here = Atomic.make false in + let signal_name = e.signal_name in let enabled () = (not (Atomic.get closed_here)) && e.enabled () in let closed () = Atomic.get closed_here || e.closed () in + + let dropped_name = Printf.sprintf "otel.sdk.%s.batch.dropped" signal_name in + let self_metrics ~now () = + let m = + Opentelemetry_core.Metrics.( + sum ~name:dropped_name [ int ~now (Batch.n_dropped self) ]) + in + m :: e.self_metrics ~now () + in let flush_and_close () = if not (Atomic.exchange closed_here true) then ( (* NOTE: we need to close this wrapping emitter first, to prevent @@ -52,7 +62,15 @@ let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t = ) in - { Emitter.closed; enabled; flush_and_close; tick; emit } + { + Emitter.closed; + signal_name; + self_metrics; + enabled; + flush_and_close; + tick; + emit; + } let add_batching ~timeout ~batch_size (emitter : 'a Emitter.t) : 'a Emitter.t = let b = Batch.make ~batch:batch_size ~timeout () in diff --git a/src/client/emitter_combine.ml b/src/client/emitter_combine.ml index 938c55fd..d6078bd3 100644 --- a/src/client/emitter_combine.ml +++ b/src/client/emitter_combine.ml @@ -18,16 +18,26 @@ type closing_behavior = when is this emitter closing. Default [`Close_when_all_closed]. *) let combine_l ?(closing : closing_behavior = `Close_when_all_closed) (es : 'a t list) : 'a t = + assert (es <> []); + let signal_name = (List.hd es).signal_name in let closed = fun () -> match closing with | `Close_when_all_closed -> List.for_all closed es | `Close_when_one_closed -> List.exists closed es in + let self_metrics ~now () = + List.flatten @@ List.map (fun e -> e.self_metrics ~now ()) es + in let enabled () = not (closed ()) in let emit x = if x <> [] then List.iter (fun e -> emit e x) es in let tick ~mtime = List.iter (tick ~mtime) es in let flush_and_close () = List.iter flush_and_close es in - { closed; enabled; emit; tick; flush_and_close } + { signal_name; self_metrics; closed; enabled; emit; tick; flush_and_close } + +let combine_l ?closing es : _ t = + match es with + | [] -> dummy + | _ -> combine_l ?closing es let combine e1 e2 : _ t = combine_l [ e1; e2 ] diff --git a/src/client/emitter_sample.ml b/src/client/emitter_sample.ml index a655d03b..cd15ce9a 100644 --- a/src/client/emitter_sample.ml +++ b/src/client/emitter_sample.ml @@ -1,11 +1,19 @@ open Opentelemetry_emitter let add_sampler (self : Sampler.t) (e : _ Emitter.t) : _ Emitter.t = + let signal_name = e.signal_name in let enabled () = e.enabled () in let closed () = Emitter.closed e in let flush_and_close () = Emitter.flush_and_close e in let tick ~mtime = Emitter.tick e ~mtime in + let m_rate = Printf.sprintf "otel.sdk.%s.sampler.actual-rate" signal_name in + let self_metrics ~now () = + Opentelemetry_core.Metrics.( + gauge ~name:m_rate [ float ~now (Sampler.actual_rate self) ]) + :: e.self_metrics ~now () + in + let emit l = if l <> [] && e.enabled () then ( let accepted = List.filter (fun _x -> Sampler.accept self) l in @@ -13,6 +21,14 @@ let add_sampler (self : Sampler.t) (e : _ Emitter.t) : _ Emitter.t = ) in - { Emitter.closed; enabled; flush_and_close; tick; emit } + { + Emitter.closed; + self_metrics; + signal_name; + enabled; + flush_and_close; + tick; + emit; + } let sample ~proba_accept e = add_sampler (Sampler.create ~proba_accept ()) e diff --git a/src/client/exporter_debug.ml b/src/client/exporter_debug.ml index d9701295..875faba3 100644 --- a/src/client/exporter_debug.ml +++ b/src/client/exporter_debug.ml @@ -17,15 +17,15 @@ let debug ?(clock = OTEL.Clock.ptime_clock) ?(out = Format.err_formatter) () : active = (fun () -> active); clock; emit_spans = - Emitter.make_simple () ~emit:(fun sp -> + Emitter.make ~signal_name:"spans" () ~emit:(fun sp -> List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp); emit_logs = - Emitter.make_simple () ~emit:(fun log -> + Emitter.make ~signal_name:"logs" () ~emit:(fun log -> List.iter (Format.fprintf out "LOG: %a@." Proto.Logs.pp_log_record) log); emit_metrics = - Emitter.make_simple () ~emit:(fun m -> + Emitter.make ~signal_name:"metrics" () ~emit:(fun m -> List.iter (Format.fprintf out "METRIC: %a@." Metrics.pp_metric) m); on_tick = Cb_set.register ticker; tick = (fun () -> Cb_set.trigger ticker); diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 31a90993..791d74c0 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -17,18 +17,21 @@ module BQ_emitters = struct let logs_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : _ OTEL.Emitter.t = - Bounded_queue.Send.to_emitter q ~close_queue_on_close:false + Bounded_queue.Send.to_emitter q ~signal_name:"logs" + ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_logs_or_empty let spans_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : _ OTEL.Emitter.t = - Bounded_queue.Send.to_emitter q ~close_queue_on_close:false + Bounded_queue.Send.to_emitter q ~signal_name:"spans" + ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_spans_or_empty let metrics_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : _ OTEL.Emitter.t = - Bounded_queue.Send.to_emitter q ~close_queue_on_close:false + Bounded_queue.Send.to_emitter q ~signal_name:"metrics" + ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_metrics_or_empty end @@ -64,7 +67,7 @@ let create ~clock ~(q : OTEL.Any_signal_l.t Bounded_queue.t) [ OTEL.Metrics.int ~now (Bounded_queue.Recv.high_watermark q.recv) ] and m_discarded = OTEL.Metrics.sum ~is_monotonic:true - ~name:"otel_ocaml.exporter_queue.discarded" + ~name:"otel.sdk.exporter_queue.discarded" [ OTEL.Metrics.int ~now (Bounded_queue.Recv.num_discarded q.recv) ] in m_size :: m_cap :: m_discarded :: Consumer.self_metrics consumer ~clock diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index ee1bc0b8..71b2e449 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -47,12 +47,13 @@ let stdout ?(clock = OTEL.Clock.ptime_clock) () : OTEL.Exporter.t = let active, trigger = Aswitch.create () in let tick () = Cb_set.trigger ticker in - let mk_emitter pp_signal = + let mk_emitter ~signal_name pp_signal = let emit l = if Aswitch.is_off active then raise Emitter.Closed; pp_vlist mutex pp_signal out l in let enabled () = Aswitch.is_on active in + let self_metrics ~now:_ () = [] in let tick ~mtime:_ = () in let flush_and_close () = if Aswitch.is_on active then @@ -60,12 +61,20 @@ let stdout ?(clock = OTEL.Clock.ptime_clock) () : OTEL.Exporter.t = Format.pp_print_flush out () in let closed () = Aswitch.is_off active in - { Emitter.emit; closed; enabled; tick; flush_and_close } + { + Emitter.emit; + signal_name; + self_metrics; + closed; + enabled; + tick; + flush_and_close; + } in - let emit_spans = mk_emitter pp_span in - let emit_logs = mk_emitter pp_log in - let emit_metrics = mk_emitter pp_metric in + let emit_spans = mk_emitter ~signal_name:"spans" pp_span in + let emit_logs = mk_emitter ~signal_name:"logs" pp_log in + let emit_metrics = mk_emitter ~signal_name:"metrics" pp_metric in let self_metrics () = [] in let shutdown () = diff --git a/src/client/http_config.mli b/src/client/http_config.mli index 01524b71..95132d87 100644 --- a/src/client/http_config.mli +++ b/src/client/http_config.mli @@ -1,4 +1,5 @@ (** @deprecated Use {!Exporter_config} instead *) +[@@@deprecated "use Exporter_config instead"] + include module type of Exporter_config -[@@deprecated "use Exporter_config instead"] diff --git a/src/core/exporter.ml b/src/core/exporter.ml index 622604f1..4a67847a 100644 --- a/src/core/exporter.ml +++ b/src/core/exporter.ml @@ -77,4 +77,12 @@ let[@inline] shutdown (self : t) : unit = self.shutdown () let (cleanup [@deprecated "use shutdown instead"]) = shutdown -let[@inline] self_metrics (self : t) : _ list = self.self_metrics () +let self_metrics (self : t) : _ list = + let now = Clock.now self.clock in + List.flatten + [ + self.self_metrics (); + self.emit_spans.self_metrics ~now (); + self.emit_logs.self_metrics ~now (); + self.emit_metrics.self_metrics ~now (); + ] diff --git a/src/emitter/dune b/src/emitter/dune index 744c7762..d6d6f567 100644 --- a/src/emitter/dune +++ b/src/emitter/dune @@ -2,6 +2,8 @@ (name opentelemetry_emitter) (public_name opentelemetry.emitter) (libraries + (re_export opentelemetry.proto) + (re_export opentelemetry.util) (re_export opentelemetry.atomic) (re_export mtime)) (flags :standard -open Opentelemetry_atomic) diff --git a/src/emitter/emitter.ml b/src/emitter/emitter.ml index a3fc0f67..d5c0393a 100644 --- a/src/emitter/emitter.ml +++ b/src/emitter/emitter.ml @@ -8,6 +8,7 @@ exception Closed type -'a t = { + signal_name: string; (** Description of what signal is emitted *) enabled: unit -> bool; (** Return [true] if [emit] has a chance of doing something with the signals it's given. *) @@ -20,6 +21,11 @@ type -'a t = { (** True if the emitter is already closed. Beware TOCTOU bugs. *) flush_and_close: unit -> unit; (** Flush internally buffered signals, then close. *) + self_metrics: + now:Opentelemetry_util.Timestamp_ns.t -> + unit -> + Opentelemetry_proto.Metrics.metric list; + (** metrics about the emitter itself. *) } (** An emitter for values of type ['a]. *) @@ -33,6 +39,8 @@ let[@inline] closed self : bool = self.closed () let[@inline] flush_and_close (self : _ t) : unit = self.flush_and_close () +let[@inline] self_metrics self ~now : _ list = self.self_metrics ~now () + (** [map f emitter] returns a new emitter that applies [f] to signals item-wise before passing them to [emitter] *) let map (f : 'a -> 'b) (self : 'b t) : 'a t = @@ -56,9 +64,9 @@ let tap (f : 'a -> unit) (self : 'a t) : 'a t = in { self with emit } -(** [make_simple ~emit ()] is an emitter that calls [emit]. *) -let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () : - _ t = +(** [make ~emit ()] is an emitter that calls [emit]. *) +let make ?tick ?closed ?enabled ?(flush_and_close = ignore) + ?(self_metrics = fun ~now:_ () -> []) ~signal_name ~emit () : _ t = let tick = match tick with | None -> fun ~mtime:_ -> () @@ -71,14 +79,16 @@ let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () : | None, Some f -> (fun () -> not (f ())), f | Some f1, Some f2 -> f1, f2 in - { tick; emit; flush_and_close; closed; enabled } + { signal_name; tick; emit; flush_and_close; closed; enabled; self_metrics } (** Dummy emitter, doesn't accept or emit anything. *) let dummy : _ t = { + signal_name = "dummy"; enabled = (fun () -> false); emit = ignore; tick = (fun ~mtime:_ -> ()); closed = (fun () -> true); flush_and_close = ignore; + self_metrics = (fun ~now:_ () -> []); } diff --git a/src/emitter/to_list.ml b/src/emitter/to_list.ml index e1601132..260e5c10 100644 --- a/src/emitter/to_list.ml +++ b/src/emitter/to_list.ml @@ -1,14 +1,13 @@ (** Emitter that stores signals into a list, in reverse order (most recent signals first). *) -let to_list (l : 'a list ref) : 'a Emitter.t = - let closed = Atomic.make false in - { - enabled = (fun () -> not (Atomic.get closed)); - emit = - (fun sigs -> - if Atomic.get closed then raise Emitter.Closed; - l := List.rev_append sigs !l); - tick = (fun ~mtime:_ -> ()); - closed = (fun () -> Atomic.get closed); - flush_and_close = (fun () -> Atomic.set closed true); - } +let to_list ~signal_name (l : 'a list ref) : 'a Emitter.t = + let closed_ = Atomic.make false in + let enabled = fun () -> not (Atomic.get closed_) in + let emit = + fun sigs -> + if Atomic.get closed_ then raise Emitter.Closed; + l := List.rev_append sigs !l + in + let closed () = Atomic.get closed_ in + let flush_and_close = fun () -> Atomic.set closed_ true in + Emitter.make ~signal_name ~emit ~enabled ~closed ~flush_and_close () diff --git a/src/lib/main_exporter.ml b/src/lib/main_exporter.ml index 76f076f6..38bc98cd 100644 --- a/src/lib/main_exporter.ml +++ b/src/lib/main_exporter.ml @@ -44,7 +44,8 @@ module Util = struct exporter. When this emitter is used to [emit signals], the current exporter is looked up, [get_emitter exporter] is then used to locate the relevant emitter [e'], and [signals] is in turn emitted in [e']. *) - let dynamic_forward_emitter_to_main_exporter ~get_emitter () : _ Emitter.t = + let dynamic_forward_emitter_to_main_exporter ~signal_name + ~(get_emitter : Exporter.t -> _ Emitter.t) () : _ Emitter.t = let enabled () = present () in let closed () = not (enabled ()) in let flush_and_close () = () in @@ -53,6 +54,11 @@ module Util = struct | None -> () | Some exp -> Exporter.tick exp ~mtime in + let self_metrics ~now () = + match get () with + | None -> [] + | Some exp -> (get_emitter exp).self_metrics ~now () + in let emit signals = if signals <> [] then ( match get () with @@ -62,7 +68,8 @@ module Util = struct Emitter.emit emitter signals ) in - { Emitter.enabled; closed; emit; tick; flush_and_close } + Emitter.make ~signal_name ~enabled ~closed ~self_metrics ~flush_and_close + ~tick ~emit () end (** Aswitch of the current exporter, or {!Aswitch.dummy} *) @@ -77,15 +84,15 @@ let[@inline] active () : Aswitch.t = @since NEXT_RELEASE *) let dynamic_forward_to_main_exporter : Exporter.t = let emit_logs = - Util.dynamic_forward_emitter_to_main_exporter () + Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"logs" ~get_emitter:Exporter.(fun e -> e.emit_logs) in let emit_metrics = - Util.dynamic_forward_emitter_to_main_exporter () + Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"metrics" ~get_emitter:Exporter.(fun e -> e.emit_metrics) in let emit_spans = - Util.dynamic_forward_emitter_to_main_exporter () + Util.dynamic_forward_emitter_to_main_exporter () ~signal_name:"spans" ~get_emitter:Exporter.(fun e -> e.emit_spans) in let on_tick f = diff --git a/tests/implicit_scope/sync/test_implicit_scope_sync.ml b/tests/implicit_scope/sync/test_implicit_scope_sync.ml index d397d72b..61e73a66 100644 --- a/tests/implicit_scope/sync/test_implicit_scope_sync.ml +++ b/tests/implicit_scope/sync/test_implicit_scope_sync.ml @@ -7,7 +7,8 @@ let test_exporter : Otel.Exporter.t = let open Otel.Exporter in { (dummy ()) with - emit_spans = Opentelemetry_emitter.To_list.to_list spans_emitted; + emit_spans = + Opentelemetry_emitter.To_list.to_list ~signal_name:"spans" spans_emitted; } let with_test_exporter f =