From 210b7991c92a58f04107ee32a7cffb22424e62bf Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 20 Feb 2026 12:31:15 -0500 Subject: [PATCH] rework tracer/meter/logger interfaces - Meter is new, and makes more sense than Metrics_callbacks/Metrics_emitter - Instrument in core, with some basic counters, gauges, and histograms, + the possibility to do one's own --- src/client/self_trace.ml | 2 +- src/core/instrument.ml | 221 ++++++++++++++++++ src/core/instrument.mli | 99 ++++++++ .../cohttp/opentelemetry_cohttp_lwt.ml | 8 +- src/integrations/logs/opentelemetry_logs.ml | 2 +- src/lib/logger.ml | 15 +- src/lib/meter.ml | 81 +++++++ src/lib/meter.mli | 64 +++++ src/lib/metrics_callbacks.ml | 66 ------ src/lib/metrics_callbacks.mli | 39 ---- src/lib/metrics_emitter.ml | 31 --- src/lib/opentelemetry.ml | 38 ++- src/lib/tracer.ml | 14 +- src/lwt/opentelemetry_lwt.ml | 5 +- tests/bin/emit1.ml | 18 +- tests/bin/emit1_cohttp.ml | 17 +- tests/bin/emit1_eio.ml | 20 +- tests/bin/emit1_ocurl_lwt.ml | 18 +- tests/bin/emit1_stdout.ml | 16 +- tests/core/dune | 2 +- tests/core/t_histogram.expected | 100 ++++++++ tests/core/t_histogram.ml | 58 +++++ 22 files changed, 728 insertions(+), 206 deletions(-) create mode 100644 src/core/instrument.ml create mode 100644 src/core/instrument.mli create mode 100644 src/lib/meter.ml create mode 100644 src/lib/meter.mli delete mode 100644 src/lib/metrics_callbacks.ml delete mode 100644 src/lib/metrics_callbacks.mli delete mode 100644 src/lib/metrics_emitter.ml create mode 100644 tests/core/t_histogram.expected create mode 100644 tests/core/t_histogram.ml diff --git a/src/client/self_trace.ml b/src/client/self_trace.ml index 72c92bfa..fa401ef6 100644 --- a/src/client/self_trace.ml +++ b/src/client/self_trace.ml @@ -2,7 +2,7 @@ open Common_ let enabled = Atomic.make false -let tracer = Atomic.make OTEL.Tracer.dynamic_main +let tracer = Atomic.make OTEL.Tracer.default let[@inline] add_event (scope : OTEL.Span.t) ev = OTEL.Span.add_event scope ev diff --git a/src/core/instrument.ml b/src/core/instrument.ml new file mode 100644 index 00000000..953259b7 --- /dev/null +++ b/src/core/instrument.ml @@ -0,0 +1,221 @@ +type 'a t = { + kind: string; + name: string; + emit: clock:Clock.t -> unit -> Metrics.t list; + update: 'a -> unit; +} + +let all : (clock:Clock.t -> unit -> Metrics.t list) Alist.t = Alist.make () + +let register (instr : 'a t) : unit = Alist.add all instr.emit + +module Internal = struct + let iter_all f = Alist.get all |> List.iter f +end + +let float_add (a : float Atomic.t) (delta : float) : unit = + while + let cur = Atomic.get a in + not (Atomic.compare_and_set a cur (cur +. delta)) + do + () + done + +module type CUSTOM_IMPL = sig + type data + + type state + + val kind : string + + val init : unit -> state + + val update : state -> data -> unit + + val to_metrics : + state -> + name:string -> + ?description:string -> + ?unit_:string -> + clock:Clock.t -> + unit -> + Metrics.t list +end + +module Make (I : CUSTOM_IMPL) = struct + let create ~name ?description ?unit_ () : I.data t = + let state = I.init () in + let emit ~clock () = + I.to_metrics state ~name ?description ?unit_ ~clock () + in + let instrument = + { kind = I.kind; name; emit; update = I.update state } [@warning "-45"] + in + register instrument; + instrument +end + +module Int_counter = struct + include Make (struct + type data = int + + type state = int Atomic.t + + let kind = "counter" + + let init () = Atomic.make 0 + + let update state delta = ignore (Atomic.fetch_and_add state delta : int) + + let to_metrics state ~name ?description ?unit_ ~clock () = + let now = Clock.now clock in + [ + Metrics.sum ~name ?description ?unit_ ~is_monotonic:true + [ Metrics.int ~now (Atomic.get state) ]; + ] + end) + + let add (instrument : int t) delta = instrument.update delta +end + +module Float_counter = struct + include Make (struct + type data = float + + type state = float Atomic.t + + let kind = "counter" + + let init () = Atomic.make 0. + + let update state delta = float_add state delta + + let to_metrics state ~name ?description ?unit_ ~clock () = + let now = Clock.now clock in + [ + Metrics.sum ~name ?description ?unit_ ~is_monotonic:true + [ Metrics.float ~now (Atomic.get state) ]; + ] + end) + + let add (instrument : float t) delta = instrument.update delta +end + +module Int_gauge = struct + include Make (struct + type data = int + + type state = int Atomic.t + + let kind = "gauge" + + let init () = Atomic.make 0 + + let update state v = Atomic.set state v + + let to_metrics state ~name ?description ?unit_ ~clock () = + let now = Clock.now clock in + [ + Metrics.gauge ~name ?description ?unit_ + [ Metrics.int ~now (Atomic.get state) ]; + ] + end) + + let record (instrument : int t) v = instrument.update v +end + +module Float_gauge = struct + include Make (struct + type data = float + + type state = float Atomic.t + + let kind = "gauge" + + let init () = Atomic.make 0. + + let update state v = Atomic.set state v + + let to_metrics state ~name ?description ?unit_ ~clock () = + let now = Clock.now clock in + [ + Metrics.gauge ~name ?description ?unit_ + [ Metrics.float ~now (Atomic.get state) ]; + ] + end) + + let record (instrument : float t) v = instrument.update v +end + +module Histogram = struct + let default_bounds = + [ + 0.005; + 0.01; + 0.025; + 0.05; + 0.075; + 0.1; + 0.25; + 0.5; + 0.75; + 1.; + 2.5; + 5.; + 7.5; + 10.; + ] + + (* Find the index of the first bucket whose upper bound >= v. + Returns Array.length bounds if v exceeds all bounds (overflow bucket). *) + let find_bucket (bounds : float array) (v : float) : int = + let n = Array.length bounds in + let lo = ref 0 and hi = ref (n - 1) in + while !lo < !hi do + let mid = (!lo + !hi) / 2 in + if bounds.(mid) < v then + lo := mid + 1 + else + hi := mid + done; + if !lo < n && v <= bounds.(!lo) then + !lo + else + n + + let create ~name ?description ?unit_ ?(bounds = default_bounds) () : float t = + let bounds_arr = Array.of_list bounds in + let n_buckets = Array.length bounds_arr + 1 in + let bucket_counts = Array.init n_buckets (fun _ -> Atomic.make 0) in + let sum = Atomic.make 0. in + let count = Atomic.make 0 in + let update v = + let bucket = find_bucket bounds_arr v in + ignore (Atomic.fetch_and_add bucket_counts.(bucket) 1 : int); + float_add sum v; + ignore (Atomic.fetch_and_add count 1 : int) + in + let emit ~clock () = + let now = Clock.now clock in + let count_v = Int64.of_int (Atomic.get count) in + let sum_v = Atomic.get sum in + let bc = + Array.to_list + (Array.map (fun a -> Int64.of_int (Atomic.get a)) bucket_counts) + in + [ + Metrics.histogram ~name ?description ?unit_ + [ + Metrics.histogram_data_point ~now ~count:count_v ~sum:sum_v + ~bucket_counts:bc ~explicit_bounds:bounds (); + ]; + ] + in + let instrument = + { kind = "histogram"; name; emit; update } [@warning "-45"] + in + register instrument; + instrument + + let record (instrument : float t) v = instrument.update v +end diff --git a/src/core/instrument.mli b/src/core/instrument.mli new file mode 100644 index 00000000..e28201c1 --- /dev/null +++ b/src/core/instrument.mli @@ -0,0 +1,99 @@ +(** Global registry of metric instruments. + + Instruments are stateful accumulators (counters, gauges, histograms, …). + [update] is called at any time to record a value; [emit] is called at + collection time by a {!Meter.t}, which supplies the clock. + + All instruments register themselves into a global list on creation via + {!register}, so any meter can collect the full set in one pass. Make sure to + only create instruments at the toplevel so that the list doesn't grow + forever. *) + +type 'a t = { + kind: string; (** "counter", "gauge", "histogram", … *) + name: string; + emit: clock:Clock.t -> unit -> Metrics.t list; + (** Snapshot current accumulated state into metrics. *) + update: 'a -> unit; (** Record a new value. *) +} + +val register : 'a t -> unit +(** Add an instrument's [emit] to {!all}. Called automatically by the standard + instrument-creation functions. *) + +(** Implementation details for a custom stateful instrument. Pass to {!Make} to + obtain a [create] function. *) +module type CUSTOM_IMPL = sig + type data + + type state + + val kind : string + + val init : unit -> state + + val update : state -> data -> unit + + val to_metrics : + state -> + name:string -> + ?description:string -> + ?unit_:string -> + clock:Clock.t -> + unit -> + Metrics.t list +end + +(** Build a custom instrument type from a {!CUSTOM_IMPL}. The returned [create] + registers the instrument into {!all} automatically. *) +module Make (I : CUSTOM_IMPL) : sig + val create : + name:string -> ?description:string -> ?unit_:string -> unit -> I.data t +end + +module Int_counter : sig + val create : + name:string -> ?description:string -> ?unit_:string -> unit -> int t + + val add : int t -> int -> unit +end + +module Float_counter : sig + val create : + name:string -> ?description:string -> ?unit_:string -> unit -> float t + + val add : float t -> float -> unit +end + +module Int_gauge : sig + val create : + name:string -> ?description:string -> ?unit_:string -> unit -> int t + + val record : int t -> int -> unit +end + +module Float_gauge : sig + val create : + name:string -> ?description:string -> ?unit_:string -> unit -> float t + + val record : float t -> float -> unit +end + +module Histogram : sig + val default_bounds : float list + + val create : + name:string -> + ?description:string -> + ?unit_:string -> + ?bounds:float list -> + unit -> + float t + + val record : float t -> float -> unit +end + +module Internal : sig + val iter_all : ((clock:Clock.t -> unit -> Metrics.t list) -> unit) -> unit + (** Access all the instruments *) +end diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index 6dd6e644..10be34c7 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -117,8 +117,8 @@ end = struct in { req with headers } - let trace ?(tracer = Otel.Tracer.dynamic_main) ?(attrs = []) callback conn req - body = + let trace ?(tracer = Otel.Tracer.default) ?(attrs = []) callback conn req body + = let parent = get_trace_context ~from:`External req in Otel_lwt.Tracer.with_ ~tracer "request" ~kind:Span_kind_server ?trace_id:(Option.map Otel.Span.trace_id parent) @@ -131,7 +131,7 @@ end = struct Otel.Span.add_attrs span (attrs_of_response res); Lwt.return (res, body)) - let with_ ?(tracer = Otel.Tracer.dynamic_main) ?trace_state ?attrs + let with_ ?(tracer = Otel.Tracer.default) ?trace_state ?attrs ?(kind = Otel.Span.Span_kind_internal) ?links name req (f : Request.t -> 'a Lwt.t) = let span = get_trace_context ~from:`Internal req in @@ -142,7 +142,7 @@ end = struct f req) end -let client ?(tracer = Otel.Tracer.dynamic_main) ?(span : Otel.Span.t option) +let client ?(tracer = Otel.Tracer.default) ?(span : Otel.Span.t option) (module C : Cohttp_lwt.S.Client) = let module Traced = struct open Lwt.Syntax diff --git a/src/integrations/logs/opentelemetry_logs.ml b/src/integrations/logs/opentelemetry_logs.ml index 7a276624..c840bacf 100644 --- a/src/integrations/logs/opentelemetry_logs.ml +++ b/src/integrations/logs/opentelemetry_logs.ml @@ -34,7 +34,7 @@ let emit_telemetry do_emit = Logs.Tag.(empty |> add emit_telemetry_tag do_emit) (*****************************************************************************) (* Log a message to otel with some attrs *) -let log ?(logger = OTEL.Logger.dynamic_main) ?attrs +let log ?(logger = OTEL.Logger.default) ?attrs ?(scope = OTEL.Ambient_span.get ()) ~level msg = let log_level = Logs.level_to_string (Some level) in let span_id = Option.map OTEL.Span.id scope in diff --git a/src/lib/logger.ml b/src/lib/logger.ml index 6fb458d1..006d4d59 100644 --- a/src/lib/logger.ml +++ b/src/lib/logger.ml @@ -30,16 +30,21 @@ let (emit_main [@deprecated "use an explicit Logger.t"]) = | None -> () | Some exp -> Exporter.send_logs exp logs -(** An emitter that uses the current {!Main_exporter}'s logger *) -let dynamic_main : t = - of_exporter Main_exporter.dynamic_forward_to_main_exporter +open struct + (* internal default, keeps the default params below working without deprecation alerts *) + let dynamic_main_ : t = + of_exporter Main_exporter.dynamic_forward_to_main_exporter +end + +(** A logger that uses the current {!Main_exporter}'s logger *) +let default = dynamic_main_ (** {2 Logging helpers} *) open Log_record (** Create log record and emit it on [logger] *) -let log ?(logger = dynamic_main) ?attrs ?trace_id ?span_id +let log ?(logger = dynamic_main_) ?attrs ?trace_id ?span_id ?(severity : severity option) (msg : string) : unit = if enabled logger then ( let now = Clock.now logger.clock in @@ -55,7 +60,7 @@ let log ?(logger = dynamic_main) ?attrs ?trace_id ?span_id Example usage: [logf ~severity:Severity_number_warn (fun k->k"oh no!! %s it's bad: %b" "help" true)] *) -let logf ?(logger = dynamic_main) ?attrs ?trace_id ?span_id ?severity msgf : +let logf ?(logger = dynamic_main_) ?attrs ?trace_id ?span_id ?severity msgf : unit = if enabled logger then msgf (fun fmt -> diff --git a/src/lib/meter.ml b/src/lib/meter.ml new file mode 100644 index 00000000..4b09b767 --- /dev/null +++ b/src/lib/meter.ml @@ -0,0 +1,81 @@ +open Opentelemetry_emitter + +type t = { + emit: Metrics.t Emitter.t; + clock: Clock.t; +} + +let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock } + +let[@inline] enabled (self : t) = Emitter.enabled self.emit + +let of_exporter (exp : Exporter.t) : t = + { emit = exp.emit_metrics; clock = exp.clock } + +let (create [@deprecated "use Meter.of_exporter"]) = + fun ~(exporter : Exporter.t) ?name:_name () : t -> of_exporter exporter + +let default : t = Main_exporter.dynamic_forward_to_main_exporter |> of_exporter + +let[@inline] emit1 (self : t) (m : Metrics.t) : unit = + Emitter.emit self.emit [ m ] + +(** Global list of raw metric callbacks, collected alongside {!Instrument.all}. +*) +let cbs_ : (clock:Clock.t -> unit -> Metrics.t list) Alist.t = Alist.make () + +let add_cb (f : clock:Clock.t -> unit -> Metrics.t list) : unit = + Alist.add cbs_ f + +let collect (self : t) : Metrics.t list = + let clock = self.clock in + let acc = ref [] in + Instrument.Internal.iter_all (fun f -> + acc := List.rev_append (f ~clock ()) !acc); + List.iter + (fun f -> acc := List.rev_append (f ~clock ()) !acc) + (Alist.get cbs_); + List.rev !acc + +let minimum_min_interval_ = Mtime.Span.(100 * ms) + +let default_min_interval_ = Mtime.Span.(4 * s) + +let clamp_interval_ interval = + if Mtime.Span.is_shorter interval ~than:minimum_min_interval_ then + minimum_min_interval_ + else + interval + +let add_to_exporter ?(min_interval = default_min_interval_) (exp : Exporter.t) + (self : t) : unit = + let limiter = + Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) () + in + Exporter.on_tick exp (fun () -> + if Interval_limiter.make_attempt limiter then ( + let metrics = collect self in + if metrics <> [] then Emitter.emit self.emit metrics + )) + +let add_to_main_exporter ?(min_interval = default_min_interval_) (self : t) : + unit = + let limiter = + Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) () + in + Main_exporter.add_on_tick_callback (fun () -> + if Interval_limiter.make_attempt limiter then ( + let metrics = collect self in + if metrics <> [] then Emitter.emit self.emit metrics + )) + +module Instrument = Instrument + +module type INSTRUMENT_IMPL = Instrument.CUSTOM_IMPL + +module Make_instrument = Instrument.Make +module Int_counter = Instrument.Int_counter +module Float_counter = Instrument.Float_counter +module Int_gauge = Instrument.Int_gauge +module Float_gauge = Instrument.Float_gauge +module Histogram = Instrument.Histogram diff --git a/src/lib/meter.mli b/src/lib/meter.mli new file mode 100644 index 00000000..ea9ab7ea --- /dev/null +++ b/src/lib/meter.mli @@ -0,0 +1,64 @@ +(** Builder for instruments and periodic metric emission. + + https://opentelemetry.io/docs/specs/otel/metrics/api/#get-a-meter + + Instruments ({!Int_counter}, {!Histogram}, …) register themselves into a + global list ({!Instrument.all}) on creation and do not require a meter. A + {!t} is only needed to wire up periodic collection and emission: call + {!add_to_exporter} or {!add_to_main_exporter} once after creating your + instruments. *) + +type t + +val dummy : t +(** Dummy meter, always disabled *) + +val enabled : t -> bool + +val of_exporter : Exporter.t -> t +(** Create a meter from an exporter *) + +val create : exporter:Exporter.t -> ?name:string -> unit -> t +[@@deprecated "use of_exporter"] + +val default : t +(** Meter that forwards to the current main exporter. Equivalent to + [of_exporter Main_exporter.dynamic_forward_to_main_exporter]. *) + +val emit1 : t -> Metrics.t -> unit +(** Emit a single metric directly, bypassing the instrument registry *) + +val add_cb : (clock:Clock.t -> unit -> Metrics.t list) -> unit +(** Register a raw global metrics callback. Called alongside all instruments + when {!collect} runs. Use this for ad-hoc metrics that don't fit the + structured instrument API. *) + +val collect : t -> Metrics.t list +(** Collect metrics from all registered instruments ({!Instrument.all}) and raw + callbacks ({!add_cb}), using this meter's clock. *) + +val add_to_exporter : ?min_interval:Mtime.span -> Exporter.t -> t -> unit +(** Register a periodic tick callback on [exp] that collects and emits all + instruments. Call this once after creating your instruments. + @param min_interval minimum time between collections (default 4s, min 100ms) +*) + +val add_to_main_exporter : ?min_interval:Mtime.span -> t -> unit +(** Like {!add_to_exporter} but targets the main exporter via + {!Main_exporter.add_on_tick_callback}, so it works even if the main exporter + has not been set yet. *) + +module Instrument = Instrument +(** Global registry of metric instruments. Re-exported from + {!Opentelemetry_core.Instrument} for convenience. *) + +(** Convenience aliases for the instrument submodules in {!Instrument}. *) + +module type INSTRUMENT_IMPL = Instrument.CUSTOM_IMPL + +module Make_instrument = Instrument.Make +module Int_counter = Instrument.Int_counter +module Float_counter = Instrument.Float_counter +module Int_gauge = Instrument.Int_gauge +module Float_gauge = Instrument.Float_gauge +module Histogram = Instrument.Histogram diff --git a/src/lib/metrics_callbacks.ml b/src/lib/metrics_callbacks.ml deleted file mode 100644 index c4a40a7f..00000000 --- a/src/lib/metrics_callbacks.ml +++ /dev/null @@ -1,66 +0,0 @@ -open Common_ - -type t = { cbs: (unit -> Metrics.t list) Alist.t } [@@unboxed] - -let create () : t = { cbs = Alist.make () } - -let[@inline] add_metrics_cb (self : t) f = Alist.add self.cbs f - -let minimum_min_interval = Mtime.Span.(100 * ms) - -let collect_and_send (self : t) (exp : Exporter.t) = - (* collect all metrics *) - let res = ref [] in - List.iter - (fun f -> - let f_metrics = f () in - res := List.rev_append f_metrics !res) - (Alist.get self.cbs); - let metrics = !res in - - (* emit the metrics *) - Exporter.send_metrics exp metrics - -let add_to_exporter ?(min_interval = Mtime.Span.(4 * s)) (exp : Exporter.t) - (self : t) = - let min_interval = - Mtime.Span.( - if is_shorter min_interval ~than:minimum_min_interval then - minimum_min_interval - else - min_interval) - in - - let limiter = Interval_limiter.create ~min_interval () in - let on_tick () = - if Interval_limiter.make_attempt limiter then collect_and_send self exp - in - Exporter.on_tick exp on_tick - -let with_set_added_to_exporter ?min_interval (exp : Exporter.t) (f : t -> 'a) : - 'a = - let set = create () in - add_to_exporter ?min_interval exp set; - f set - -let with_set_added_to_main_exporter ?min_interval (f : t -> unit) : unit = - match Main_exporter.get () with - | None -> () - | Some exp -> with_set_added_to_exporter ?min_interval exp f - -module Main_set = struct - let cur_set_ : t option Atomic.t = Atomic.make None - - let rec get () : t = - match Atomic.get cur_set_ with - | Some s -> s - | None -> - let s = create () in - if Atomic.compare_and_set cur_set_ None (Some s) then ( - (match Main_exporter.get () with - | Some exp -> add_to_exporter exp s - | None -> ()); - s - ) else - get () -end diff --git a/src/lib/metrics_callbacks.mli b/src/lib/metrics_callbacks.mli deleted file mode 100644 index 5a85cd44..00000000 --- a/src/lib/metrics_callbacks.mli +++ /dev/null @@ -1,39 +0,0 @@ -(** A set of callbacks that produce metrics when called. The metrics are - automatically called regularly. - - This allows applications to register metrics callbacks from various points - in the program (or even in libraries), and not worry about setting - alarms/intervals to emit them. *) - -type t - -val create : unit -> t - -val add_metrics_cb : t -> (unit -> Metrics.t list) -> unit -(** [register set f] adds the callback [f] to the [set]. - - [f] will be called at unspecified times and is expected to return a list of - metrics. It might be called regularly by the backend, in particular (but not - only) when {!Exporter.tick} is called. *) - -val add_to_exporter : ?min_interval:Mtime.span -> Exporter.t -> t -> unit -(** Make sure we try to export metrics at every [tick] of the exporter. - @param min_interval - the minimum duration between two consecutive exports, using - {!Interval_limiter}. We don't want a too frequent [tick] to spam metrics. - Default [4s], minimum [0.1s]. *) - -val with_set_added_to_exporter : - ?min_interval:Mtime.span -> Exporter.t -> (t -> 'a) -> 'a -(** [with_set_added_to_exporter exp f] creates a set, adds it to the exporter, - and calls [f] on it *) - -val with_set_added_to_main_exporter : - ?min_interval:Mtime.span -> (t -> unit) -> unit -(** If there is a main exporter, add a set to it and call [f set], else do not - call [f] at all *) - -module Main_set : sig - val get : unit -> t - (** The global set *) -end diff --git a/src/lib/metrics_emitter.ml b/src/lib/metrics_emitter.ml deleted file mode 100644 index 05d8318f..00000000 --- a/src/lib/metrics_emitter.ml +++ /dev/null @@ -1,31 +0,0 @@ -(** Metrics. - - The metrics emitter is used to, well, emit metrics. *) - -open Opentelemetry_emitter - -type t = { - emit: Metrics.t Emitter.t; - clock: Clock.t; -} - -let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock } - -let[@inline] enabled (self : t) = Emitter.enabled self.emit - -let of_exporter (exp : Exporter.t) : t = - { emit = exp.emit_metrics; clock = exp.clock } - -let dynamic_main : t = - Main_exporter.dynamic_forward_to_main_exporter |> of_exporter - -(** Emit some metrics to the collector (sync). This blocks until the backend has - pushed the metrics into some internal queue, or discarded them. *) -let (emit [@deprecated "use an explicit Metrics_emitter.t"]) = - fun ?attrs:_ (l : Metrics.t list) : unit -> - match Main_exporter.get () with - | None -> () - | Some exp -> Exporter.send_metrics exp l - -let[@inline] emit1 (self : t) (m : Metrics.t) : unit = - Emitter.emit self.emit [ m ] diff --git a/src/lib/opentelemetry.ml b/src/lib/opentelemetry.ml index 25d4ddac..a3ed2aee 100644 --- a/src/lib/opentelemetry.ml +++ b/src/lib/opentelemetry.ml @@ -22,8 +22,38 @@ module Timestamp_ns = Timestamp_ns (** {2 Export signals to some external collector.} *) module Emitter = Opentelemetry_emitter.Emitter -module Exporter = Exporter -module Main_exporter = Main_exporter + +module Exporter = struct + include Exporter + + (** Get a tracer from this exporter. + @since NEXT_RELEASE *) + let get_tracer (self : t) : Tracer.t = Tracer.of_exporter self + + (** Get a meter from this exporter. + @since NEXT_RELEASE *) + let get_meter (self : t) : Meter.t = Meter.of_exporter self + + (** Get a logger from this exporter. + @since NEXT_RELEASE *) + let get_logger (self : t) : Logger.t = Logger.of_exporter self +end + +module Main_exporter = struct + include Main_exporter + + (** Get a tracer forwarding to the current main exporter. + @since NEXT_RELEASE *) + let get_tracer () : Tracer.t = Tracer.default + + (** Get a meter forwarding to the current main exporter. + @since NEXT_RELEASE *) + let get_meter () : Meter.t = Meter.default + + (** Get a logger forwarding to the current main exporter. + @since NEXT_RELEASE *) + let get_logger () : Logger.t = Logger.default +end module Collector = struct include Exporter @@ -74,8 +104,8 @@ module Trace = Tracer [@@deprecated "use Tracer instead"] (** {2 Metrics} *) module Metrics = Metrics -module Metrics_callbacks = Metrics_callbacks -module Metrics_emitter = Metrics_emitter +module Instrument = Instrument +module Meter = Meter (** {2 Logs} *) diff --git a/src/lib/tracer.ml b/src/lib/tracer.ml index f79ef177..bc4afd2a 100644 --- a/src/lib/tracer.ml +++ b/src/lib/tracer.ml @@ -28,9 +28,13 @@ let[@inline] enabled (self : t) = Emitter.enabled self.emit let of_exporter (exp : Exporter.t) : t = { emit = exp.emit_spans; clock = exp.clock } -(** A tracer that uses the current {!Main_exporter} *) -let dynamic_main : t = - Main_exporter.dynamic_forward_to_main_exporter |> of_exporter +open struct + (* internal default, keeps the default param below working without deprecation alerts *) + let dynamic_main_ : t = + Main_exporter.dynamic_forward_to_main_exporter |> of_exporter +end + +let default = dynamic_main_ let (add_event [@deprecated "use Span.add_event"]) = Span.add_event' @@ -106,8 +110,8 @@ let with_thunk_and_finally (self : t) ?(force_new_trace_id = false) ?trace_state if true (default false), the span will not use a ambient scope, the [~scope] argument, nor [~trace_id], but will instead always create fresh identifiers for this span *) -let with_ ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state ?attrs ?kind - ?trace_id ?parent ?links name (cb : Span.t -> 'a) : 'a = +let with_ ?(tracer = dynamic_main_) ?force_new_trace_id ?trace_state ?attrs + ?kind ?trace_id ?parent ?links name (cb : Span.t -> 'a) : 'a = let thunk, finally = with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs ?kind ?trace_id ?parent ?links name cb diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index fe459a40..48104128 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -21,9 +21,8 @@ module Tracer = struct include Tracer (** Sync span guard *) - let with_ (type a) ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state - ?attrs ?kind ?trace_id ?parent ?links name (cb : Span.t -> a Lwt.t) : - a Lwt.t = + let with_ (type a) ?(tracer = default) ?force_new_trace_id ?trace_state ?attrs + ?kind ?trace_id ?parent ?links name (cb : Span.t -> a Lwt.t) : a Lwt.t = let open Lwt.Syntax in let thunk, finally = with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index c5c275d9..83376078 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -90,15 +90,15 @@ let run_job () = let run () = OT.Gc_metrics.setup_on_main_exporter (); - OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set -> - OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics; - OT.Metrics_callbacks.add_metrics_cb set (fun () -> - let now = OT.Clock.now_main () in - OT.Metrics. - [ - sum ~name:"num-sleep" ~is_monotonic:true - [ int ~now (Atomic.get num_sleep) ]; - ])); + OT.Meter.add_cb (fun ~clock:_ () -> OT.Main_exporter.self_metrics ()); + OT.Meter.add_cb (fun ~clock () -> + let now = OT.Clock.now clock in + OT.Metrics. + [ + sum ~name:"num-sleep" ~is_monotonic:true + [ int ~now (Atomic.get num_sleep) ]; + ]); + OT.Meter.add_to_main_exporter OT.Meter.default; let n_jobs = max 1 !n_jobs in Printf.printf "run %d job(s)\n%!" n_jobs; diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index bab71030..478e4add 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -84,15 +84,14 @@ let run_job job_id : unit Lwt.t = let run () : unit Lwt.t = T.Gc_metrics.setup_on_main_exporter (); - T.Metrics_callbacks.( - with_set_added_to_main_exporter (fun set -> - add_metrics_cb set (fun () -> - let now = T.Clock.now_main () in - T.Metrics. - [ - sum ~name:"num-sleep" ~is_monotonic:true - [ int ~now (Atomic.get num_sleep) ]; - ]))); + T.Meter.add_cb (fun ~clock () -> + let now = T.Clock.now clock in + T.Metrics. + [ + sum ~name:"num-sleep" ~is_monotonic:true + [ int ~now (Atomic.get num_sleep) ]; + ]); + T.Meter.add_to_main_exporter T.Meter.default; let n_jobs = max 1 !n_jobs in (* Printf.printf "run %d jobs\n%!" n_jobs; *) diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml index c3302c9c..6185f858 100644 --- a/tests/bin/emit1_eio.ml +++ b/tests/bin/emit1_eio.ml @@ -71,17 +71,15 @@ let run_job clock _job_id iterations : unit = let run env proc iterations () : unit = OT.Gc_metrics.setup_on_main_exporter (); - OT.Metrics_callbacks.( - with_set_added_to_main_exporter - ~min_interval:Mtime.Span.(10 * ms) - (fun set -> - add_metrics_cb set (fun () -> - let now = OT.Clock.now_main () in - OT.Metrics. - [ - sum ~name:"num-sleep" ~is_monotonic:true - [ int ~now (Atomic.get num_sleep) ]; - ]))); + OT.Meter.add_cb (fun ~clock () -> + let now = OT.Clock.now clock in + OT.Metrics. + [ + sum ~name:"num-sleep" ~is_monotonic:true + [ int ~now (Atomic.get num_sleep) ]; + ]); + OT.Meter.add_to_main_exporter ~min_interval:Mtime.Span.(10 * ms) + OT.Meter.default; let n_jobs = max 1 !n_jobs in Printf.printf "run %d jobs in proc %d\n%!" n_jobs proc; diff --git a/tests/bin/emit1_ocurl_lwt.ml b/tests/bin/emit1_ocurl_lwt.ml index 03b24bb2..08fe9ede 100644 --- a/tests/bin/emit1_ocurl_lwt.ml +++ b/tests/bin/emit1_ocurl_lwt.ml @@ -88,15 +88,15 @@ let run_job () : unit Lwt.t = let run () : unit Lwt.t = OT.Gc_metrics.setup_on_main_exporter (); - OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set -> - OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics; - OT.Metrics_callbacks.add_metrics_cb set (fun () -> - let now = OT.Clock.now_main () in - OT.Metrics. - [ - sum ~name:"num-sleep" ~is_monotonic:true - [ int ~now (Atomic.get num_sleep) ]; - ])); + OT.Meter.add_cb (fun ~clock:_ () -> OT.Main_exporter.self_metrics ()); + OT.Meter.add_cb (fun ~clock () -> + let now = OT.Clock.now clock in + OT.Metrics. + [ + sum ~name:"num-sleep" ~is_monotonic:true + [ int ~now (Atomic.get num_sleep) ]; + ]); + OT.Meter.add_to_main_exporter OT.Meter.default; let n_jobs = max 1 !n_jobs in Printf.printf "run %d job(s)\n%!" n_jobs; diff --git a/tests/bin/emit1_stdout.ml b/tests/bin/emit1_stdout.ml index f0ede528..4fa38f97 100644 --- a/tests/bin/emit1_stdout.ml +++ b/tests/bin/emit1_stdout.ml @@ -87,14 +87,14 @@ let run_job () = let run () = OT.Gc_metrics.setup_on_main_exporter (); - OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set -> - OT.Metrics_callbacks.add_metrics_cb set (fun () -> - let now = OT.Clock.now_main () in - OT.Metrics. - [ - sum ~name:"num-sleep" ~is_monotonic:true - [ int ~now (Atomic.get num_sleep) ]; - ])); + OT.Meter.add_cb (fun ~clock () -> + let now = OT.Clock.now clock in + OT.Metrics. + [ + sum ~name:"num-sleep" ~is_monotonic:true + [ int ~now (Atomic.get num_sleep) ]; + ]); + OT.Meter.add_to_main_exporter OT.Meter.default; let n_jobs = max 1 !n_jobs in Printf.printf "run %d job(s)\n%!" n_jobs; diff --git a/tests/core/dune b/tests/core/dune index ad9aceba..0a373429 100644 --- a/tests/core/dune +++ b/tests/core/dune @@ -1,4 +1,4 @@ (tests - (names test_trace_context t_size) + (names test_trace_context t_size t_histogram) (package opentelemetry) (libraries pbrt opentelemetry opentelemetry-client)) diff --git a/tests/core/t_histogram.expected b/tests/core/t_histogram.expected new file mode 100644 index 00000000..7be6189f --- /dev/null +++ b/tests/core/t_histogram.expected @@ -0,0 +1,100 @@ +{ name = "test.latency"; + description = "test histogram"; + unit_ = "" (* absent *); + data = + Some( + Histogram( + { data_points = + [{ attributes = []; + start_time_unix_nano = 0 (* absent *); + time_unix_nano = 0; + count = 4; + sum = 15.; + bucket_counts = [1;1;1;1]; + explicit_bounds = [1.;2.;5.]; + exemplars = []; + flags = 0 (* absent *); + min = 0. (* absent *); + max = 0. (* absent *); + } + ]; + aggregation_temporality = + Aggregation_temporality_unspecified (* absent *); + })); + metadata = []; +} +{ name = "test.size"; + description = "" (* absent *); + unit_ = "" (* absent *); + data = + Some( + Histogram( + { data_points = + [{ attributes = []; + start_time_unix_nano = 0 (* absent *); + time_unix_nano = 0; + count = 4; + sum = 2.6; + bucket_counts = [3;1;0]; + explicit_bounds = [1.;5.]; + exemplars = []; + flags = 0 (* absent *); + min = 0. (* absent *); + max = 0. (* absent *); + } + ]; + aggregation_temporality = + Aggregation_temporality_unspecified (* absent *); + })); + metadata = []; +} +{ name = "test.empty"; + description = "" (* absent *); + unit_ = "" (* absent *); + data = + Some( + Histogram( + { data_points = + [{ attributes = []; + start_time_unix_nano = 0 (* absent *); + time_unix_nano = 0; + count = 0; + sum = 0.; + bucket_counts = [0;0;0;0]; + explicit_bounds = [1.;2.;5.]; + exemplars = []; + flags = 0 (* absent *); + min = 0. (* absent *); + max = 0. (* absent *); + } + ]; + aggregation_temporality = + Aggregation_temporality_unspecified (* absent *); + })); + metadata = []; +} +{ name = "test.boundary"; + description = "" (* absent *); + unit_ = "" (* absent *); + data = + Some( + Histogram( + { data_points = + [{ attributes = []; + start_time_unix_nano = 0 (* absent *); + time_unix_nano = 0; + count = 3; + sum = 8.; + bucket_counts = [1;1;1;0]; + explicit_bounds = [1.;2.;5.]; + exemplars = []; + flags = 0 (* absent *); + min = 0. (* absent *); + max = 0. (* absent *); + } + ]; + aggregation_temporality = + Aggregation_temporality_unspecified (* absent *); + })); + metadata = []; +} diff --git a/tests/core/t_histogram.ml b/tests/core/t_histogram.ml new file mode 100644 index 00000000..70d736e1 --- /dev/null +++ b/tests/core/t_histogram.ml @@ -0,0 +1,58 @@ +open Opentelemetry + +(** A deterministic clock that always returns timestamp 0 *) +let dummy_clock : Clock.t = { Clock.now = (fun () -> 0L) } + +let emit h = h.Instrument.emit ~clock:dummy_clock () + +let pp_metrics metrics = + List.iter (Format.printf "%a@." Metrics.pp) metrics + +(* ------------------------------------------------------------------ *) +(* Test 1: one value per bucket, plus one in the overflow bucket *) +(* bounds [1; 2; 5] → 4 buckets: (≤1) (1,2] (2,5] (5,∞) *) +let () = + let h = + Instrument.Histogram.create ~name:"test.latency" + ~description:"test histogram" ~bounds:[ 1.; 2.; 5. ] () + in + Instrument.Histogram.record h 0.5; (* bucket 0: ≤1 *) + Instrument.Histogram.record h 1.5; (* bucket 1: ≤2 *) + Instrument.Histogram.record h 3.0; (* bucket 2: ≤5 *) + Instrument.Histogram.record h 10.; (* bucket 3: >5 *) + (* count=4 sum=15.0 bucket_counts=[1;1;1;1] *) + pp_metrics (emit h) + +(* ------------------------------------------------------------------ *) +(* Test 2: multiple values pile into the same bucket *) +let () = + let h = + Instrument.Histogram.create ~name:"test.size" ~bounds:[ 1.; 5. ] () + in + Instrument.Histogram.record h 0.1; + Instrument.Histogram.record h 0.2; + Instrument.Histogram.record h 0.3; (* 3 values in bucket 0 *) + Instrument.Histogram.record h 2.0; (* 1 value in bucket 1 *) + (* count=4 sum=2.6 bucket_counts=[3;1;0] *) + pp_metrics (emit h) + +(* ------------------------------------------------------------------ *) +(* Test 3: empty histogram *) +let () = + let h = + Instrument.Histogram.create ~name:"test.empty" ~bounds:[ 1.; 2.; 5. ] () + in + (* count=0 sum=0.0 bucket_counts=[0;0;0;0] *) + pp_metrics (emit h) + +(* ------------------------------------------------------------------ *) +(* Test 4: value exactly on a bound goes into that bound's bucket *) +let () = + let h = + Instrument.Histogram.create ~name:"test.boundary" ~bounds:[ 1.; 2.; 5. ] () + in + Instrument.Histogram.record h 1.0; (* exactly on bound → bucket 0 *) + Instrument.Histogram.record h 2.0; (* exactly on bound → bucket 1 *) + Instrument.Histogram.record h 5.0; (* exactly on bound → bucket 2 *) + (* count=3 sum=8.0 bucket_counts=[1;1;1;0] *) + pp_metrics (emit h)