rework tracer/meter/logger interfaces

- Meter is new, and makes more sense than
Metrics_callbacks/Metrics_emitter
- Instrument in core, with some basic counters, gauges, and histograms,
+ the possibility to do one's own
This commit is contained in:
Simon Cruanes 2026-02-20 12:31:15 -05:00
parent d5f6b564db
commit 210b7991c9
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
22 changed files with 728 additions and 206 deletions

View file

@ -2,7 +2,7 @@ open Common_
let enabled = Atomic.make false
let tracer = Atomic.make OTEL.Tracer.dynamic_main
let tracer = Atomic.make OTEL.Tracer.default
let[@inline] add_event (scope : OTEL.Span.t) ev = OTEL.Span.add_event scope ev

221
src/core/instrument.ml Normal file
View file

@ -0,0 +1,221 @@
type 'a t = {
kind: string;
name: string;
emit: clock:Clock.t -> unit -> Metrics.t list;
update: 'a -> unit;
}
let all : (clock:Clock.t -> unit -> Metrics.t list) Alist.t = Alist.make ()
let register (instr : 'a t) : unit = Alist.add all instr.emit
module Internal = struct
let iter_all f = Alist.get all |> List.iter f
end
let float_add (a : float Atomic.t) (delta : float) : unit =
while
let cur = Atomic.get a in
not (Atomic.compare_and_set a cur (cur +. delta))
do
()
done
module type CUSTOM_IMPL = sig
type data
type state
val kind : string
val init : unit -> state
val update : state -> data -> unit
val to_metrics :
state ->
name:string ->
?description:string ->
?unit_:string ->
clock:Clock.t ->
unit ->
Metrics.t list
end
module Make (I : CUSTOM_IMPL) = struct
let create ~name ?description ?unit_ () : I.data t =
let state = I.init () in
let emit ~clock () =
I.to_metrics state ~name ?description ?unit_ ~clock ()
in
let instrument =
{ kind = I.kind; name; emit; update = I.update state } [@warning "-45"]
in
register instrument;
instrument
end
module Int_counter = struct
include Make (struct
type data = int
type state = int Atomic.t
let kind = "counter"
let init () = Atomic.make 0
let update state delta = ignore (Atomic.fetch_and_add state delta : int)
let to_metrics state ~name ?description ?unit_ ~clock () =
let now = Clock.now clock in
[
Metrics.sum ~name ?description ?unit_ ~is_monotonic:true
[ Metrics.int ~now (Atomic.get state) ];
]
end)
let add (instrument : int t) delta = instrument.update delta
end
module Float_counter = struct
include Make (struct
type data = float
type state = float Atomic.t
let kind = "counter"
let init () = Atomic.make 0.
let update state delta = float_add state delta
let to_metrics state ~name ?description ?unit_ ~clock () =
let now = Clock.now clock in
[
Metrics.sum ~name ?description ?unit_ ~is_monotonic:true
[ Metrics.float ~now (Atomic.get state) ];
]
end)
let add (instrument : float t) delta = instrument.update delta
end
module Int_gauge = struct
include Make (struct
type data = int
type state = int Atomic.t
let kind = "gauge"
let init () = Atomic.make 0
let update state v = Atomic.set state v
let to_metrics state ~name ?description ?unit_ ~clock () =
let now = Clock.now clock in
[
Metrics.gauge ~name ?description ?unit_
[ Metrics.int ~now (Atomic.get state) ];
]
end)
let record (instrument : int t) v = instrument.update v
end
module Float_gauge = struct
include Make (struct
type data = float
type state = float Atomic.t
let kind = "gauge"
let init () = Atomic.make 0.
let update state v = Atomic.set state v
let to_metrics state ~name ?description ?unit_ ~clock () =
let now = Clock.now clock in
[
Metrics.gauge ~name ?description ?unit_
[ Metrics.float ~now (Atomic.get state) ];
]
end)
let record (instrument : float t) v = instrument.update v
end
module Histogram = struct
let default_bounds =
[
0.005;
0.01;
0.025;
0.05;
0.075;
0.1;
0.25;
0.5;
0.75;
1.;
2.5;
5.;
7.5;
10.;
]
(* Find the index of the first bucket whose upper bound >= v.
Returns Array.length bounds if v exceeds all bounds (overflow bucket). *)
let find_bucket (bounds : float array) (v : float) : int =
let n = Array.length bounds in
let lo = ref 0 and hi = ref (n - 1) in
while !lo < !hi do
let mid = (!lo + !hi) / 2 in
if bounds.(mid) < v then
lo := mid + 1
else
hi := mid
done;
if !lo < n && v <= bounds.(!lo) then
!lo
else
n
let create ~name ?description ?unit_ ?(bounds = default_bounds) () : float t =
let bounds_arr = Array.of_list bounds in
let n_buckets = Array.length bounds_arr + 1 in
let bucket_counts = Array.init n_buckets (fun _ -> Atomic.make 0) in
let sum = Atomic.make 0. in
let count = Atomic.make 0 in
let update v =
let bucket = find_bucket bounds_arr v in
ignore (Atomic.fetch_and_add bucket_counts.(bucket) 1 : int);
float_add sum v;
ignore (Atomic.fetch_and_add count 1 : int)
in
let emit ~clock () =
let now = Clock.now clock in
let count_v = Int64.of_int (Atomic.get count) in
let sum_v = Atomic.get sum in
let bc =
Array.to_list
(Array.map (fun a -> Int64.of_int (Atomic.get a)) bucket_counts)
in
[
Metrics.histogram ~name ?description ?unit_
[
Metrics.histogram_data_point ~now ~count:count_v ~sum:sum_v
~bucket_counts:bc ~explicit_bounds:bounds ();
];
]
in
let instrument =
{ kind = "histogram"; name; emit; update } [@warning "-45"]
in
register instrument;
instrument
let record (instrument : float t) v = instrument.update v
end

99
src/core/instrument.mli Normal file
View file

@ -0,0 +1,99 @@
(** Global registry of metric instruments.
Instruments are stateful accumulators (counters, gauges, histograms, ).
[update] is called at any time to record a value; [emit] is called at
collection time by a {!Meter.t}, which supplies the clock.
All instruments register themselves into a global list on creation via
{!register}, so any meter can collect the full set in one pass. Make sure to
only create instruments at the toplevel so that the list doesn't grow
forever. *)
type 'a t = {
kind: string; (** "counter", "gauge", "histogram", … *)
name: string;
emit: clock:Clock.t -> unit -> Metrics.t list;
(** Snapshot current accumulated state into metrics. *)
update: 'a -> unit; (** Record a new value. *)
}
val register : 'a t -> unit
(** Add an instrument's [emit] to {!all}. Called automatically by the standard
instrument-creation functions. *)
(** Implementation details for a custom stateful instrument. Pass to {!Make} to
obtain a [create] function. *)
module type CUSTOM_IMPL = sig
type data
type state
val kind : string
val init : unit -> state
val update : state -> data -> unit
val to_metrics :
state ->
name:string ->
?description:string ->
?unit_:string ->
clock:Clock.t ->
unit ->
Metrics.t list
end
(** Build a custom instrument type from a {!CUSTOM_IMPL}. The returned [create]
registers the instrument into {!all} automatically. *)
module Make (I : CUSTOM_IMPL) : sig
val create :
name:string -> ?description:string -> ?unit_:string -> unit -> I.data t
end
module Int_counter : sig
val create :
name:string -> ?description:string -> ?unit_:string -> unit -> int t
val add : int t -> int -> unit
end
module Float_counter : sig
val create :
name:string -> ?description:string -> ?unit_:string -> unit -> float t
val add : float t -> float -> unit
end
module Int_gauge : sig
val create :
name:string -> ?description:string -> ?unit_:string -> unit -> int t
val record : int t -> int -> unit
end
module Float_gauge : sig
val create :
name:string -> ?description:string -> ?unit_:string -> unit -> float t
val record : float t -> float -> unit
end
module Histogram : sig
val default_bounds : float list
val create :
name:string ->
?description:string ->
?unit_:string ->
?bounds:float list ->
unit ->
float t
val record : float t -> float -> unit
end
module Internal : sig
val iter_all : ((clock:Clock.t -> unit -> Metrics.t list) -> unit) -> unit
(** Access all the instruments *)
end

View file

@ -117,8 +117,8 @@ end = struct
in
{ req with headers }
let trace ?(tracer = Otel.Tracer.dynamic_main) ?(attrs = []) callback conn req
body =
let trace ?(tracer = Otel.Tracer.default) ?(attrs = []) callback conn req body
=
let parent = get_trace_context ~from:`External req in
Otel_lwt.Tracer.with_ ~tracer "request" ~kind:Span_kind_server
?trace_id:(Option.map Otel.Span.trace_id parent)
@ -131,7 +131,7 @@ end = struct
Otel.Span.add_attrs span (attrs_of_response res);
Lwt.return (res, body))
let with_ ?(tracer = Otel.Tracer.dynamic_main) ?trace_state ?attrs
let with_ ?(tracer = Otel.Tracer.default) ?trace_state ?attrs
?(kind = Otel.Span.Span_kind_internal) ?links name req
(f : Request.t -> 'a Lwt.t) =
let span = get_trace_context ~from:`Internal req in
@ -142,7 +142,7 @@ end = struct
f req)
end
let client ?(tracer = Otel.Tracer.dynamic_main) ?(span : Otel.Span.t option)
let client ?(tracer = Otel.Tracer.default) ?(span : Otel.Span.t option)
(module C : Cohttp_lwt.S.Client) =
let module Traced = struct
open Lwt.Syntax

View file

@ -34,7 +34,7 @@ let emit_telemetry do_emit = Logs.Tag.(empty |> add emit_telemetry_tag do_emit)
(*****************************************************************************)
(* Log a message to otel with some attrs *)
let log ?(logger = OTEL.Logger.dynamic_main) ?attrs
let log ?(logger = OTEL.Logger.default) ?attrs
?(scope = OTEL.Ambient_span.get ()) ~level msg =
let log_level = Logs.level_to_string (Some level) in
let span_id = Option.map OTEL.Span.id scope in

View file

@ -30,16 +30,21 @@ let (emit_main [@deprecated "use an explicit Logger.t"]) =
| None -> ()
| Some exp -> Exporter.send_logs exp logs
(** An emitter that uses the current {!Main_exporter}'s logger *)
let dynamic_main : t =
open struct
(* internal default, keeps the default params below working without deprecation alerts *)
let dynamic_main_ : t =
of_exporter Main_exporter.dynamic_forward_to_main_exporter
end
(** A logger that uses the current {!Main_exporter}'s logger *)
let default = dynamic_main_
(** {2 Logging helpers} *)
open Log_record
(** Create log record and emit it on [logger] *)
let log ?(logger = dynamic_main) ?attrs ?trace_id ?span_id
let log ?(logger = dynamic_main_) ?attrs ?trace_id ?span_id
?(severity : severity option) (msg : string) : unit =
if enabled logger then (
let now = Clock.now logger.clock in
@ -55,7 +60,7 @@ let log ?(logger = dynamic_main) ?attrs ?trace_id ?span_id
Example usage:
[logf ~severity:Severity_number_warn (fun k->k"oh no!! %s it's bad: %b"
"help" true)] *)
let logf ?(logger = dynamic_main) ?attrs ?trace_id ?span_id ?severity msgf :
let logf ?(logger = dynamic_main_) ?attrs ?trace_id ?span_id ?severity msgf :
unit =
if enabled logger then
msgf (fun fmt ->

81
src/lib/meter.ml Normal file
View file

@ -0,0 +1,81 @@
open Opentelemetry_emitter
type t = {
emit: Metrics.t Emitter.t;
clock: Clock.t;
}
let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock }
let[@inline] enabled (self : t) = Emitter.enabled self.emit
let of_exporter (exp : Exporter.t) : t =
{ emit = exp.emit_metrics; clock = exp.clock }
let (create [@deprecated "use Meter.of_exporter"]) =
fun ~(exporter : Exporter.t) ?name:_name () : t -> of_exporter exporter
let default : t = Main_exporter.dynamic_forward_to_main_exporter |> of_exporter
let[@inline] emit1 (self : t) (m : Metrics.t) : unit =
Emitter.emit self.emit [ m ]
(** Global list of raw metric callbacks, collected alongside {!Instrument.all}.
*)
let cbs_ : (clock:Clock.t -> unit -> Metrics.t list) Alist.t = Alist.make ()
let add_cb (f : clock:Clock.t -> unit -> Metrics.t list) : unit =
Alist.add cbs_ f
let collect (self : t) : Metrics.t list =
let clock = self.clock in
let acc = ref [] in
Instrument.Internal.iter_all (fun f ->
acc := List.rev_append (f ~clock ()) !acc);
List.iter
(fun f -> acc := List.rev_append (f ~clock ()) !acc)
(Alist.get cbs_);
List.rev !acc
let minimum_min_interval_ = Mtime.Span.(100 * ms)
let default_min_interval_ = Mtime.Span.(4 * s)
let clamp_interval_ interval =
if Mtime.Span.is_shorter interval ~than:minimum_min_interval_ then
minimum_min_interval_
else
interval
let add_to_exporter ?(min_interval = default_min_interval_) (exp : Exporter.t)
(self : t) : unit =
let limiter =
Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) ()
in
Exporter.on_tick exp (fun () ->
if Interval_limiter.make_attempt limiter then (
let metrics = collect self in
if metrics <> [] then Emitter.emit self.emit metrics
))
let add_to_main_exporter ?(min_interval = default_min_interval_) (self : t) :
unit =
let limiter =
Interval_limiter.create ~min_interval:(clamp_interval_ min_interval) ()
in
Main_exporter.add_on_tick_callback (fun () ->
if Interval_limiter.make_attempt limiter then (
let metrics = collect self in
if metrics <> [] then Emitter.emit self.emit metrics
))
module Instrument = Instrument
module type INSTRUMENT_IMPL = Instrument.CUSTOM_IMPL
module Make_instrument = Instrument.Make
module Int_counter = Instrument.Int_counter
module Float_counter = Instrument.Float_counter
module Int_gauge = Instrument.Int_gauge
module Float_gauge = Instrument.Float_gauge
module Histogram = Instrument.Histogram

64
src/lib/meter.mli Normal file
View file

@ -0,0 +1,64 @@
(** Builder for instruments and periodic metric emission.
https://opentelemetry.io/docs/specs/otel/metrics/api/#get-a-meter
Instruments ({!Int_counter}, {!Histogram}, ) register themselves into a
global list ({!Instrument.all}) on creation and do not require a meter. A
{!t} is only needed to wire up periodic collection and emission: call
{!add_to_exporter} or {!add_to_main_exporter} once after creating your
instruments. *)
type t
val dummy : t
(** Dummy meter, always disabled *)
val enabled : t -> bool
val of_exporter : Exporter.t -> t
(** Create a meter from an exporter *)
val create : exporter:Exporter.t -> ?name:string -> unit -> t
[@@deprecated "use of_exporter"]
val default : t
(** Meter that forwards to the current main exporter. Equivalent to
[of_exporter Main_exporter.dynamic_forward_to_main_exporter]. *)
val emit1 : t -> Metrics.t -> unit
(** Emit a single metric directly, bypassing the instrument registry *)
val add_cb : (clock:Clock.t -> unit -> Metrics.t list) -> unit
(** Register a raw global metrics callback. Called alongside all instruments
when {!collect} runs. Use this for ad-hoc metrics that don't fit the
structured instrument API. *)
val collect : t -> Metrics.t list
(** Collect metrics from all registered instruments ({!Instrument.all}) and raw
callbacks ({!add_cb}), using this meter's clock. *)
val add_to_exporter : ?min_interval:Mtime.span -> Exporter.t -> t -> unit
(** Register a periodic tick callback on [exp] that collects and emits all
instruments. Call this once after creating your instruments.
@param min_interval minimum time between collections (default 4s, min 100ms)
*)
val add_to_main_exporter : ?min_interval:Mtime.span -> t -> unit
(** Like {!add_to_exporter} but targets the main exporter via
{!Main_exporter.add_on_tick_callback}, so it works even if the main exporter
has not been set yet. *)
module Instrument = Instrument
(** Global registry of metric instruments. Re-exported from
{!Opentelemetry_core.Instrument} for convenience. *)
(** Convenience aliases for the instrument submodules in {!Instrument}. *)
module type INSTRUMENT_IMPL = Instrument.CUSTOM_IMPL
module Make_instrument = Instrument.Make
module Int_counter = Instrument.Int_counter
module Float_counter = Instrument.Float_counter
module Int_gauge = Instrument.Int_gauge
module Float_gauge = Instrument.Float_gauge
module Histogram = Instrument.Histogram

View file

@ -1,66 +0,0 @@
open Common_
type t = { cbs: (unit -> Metrics.t list) Alist.t } [@@unboxed]
let create () : t = { cbs = Alist.make () }
let[@inline] add_metrics_cb (self : t) f = Alist.add self.cbs f
let minimum_min_interval = Mtime.Span.(100 * ms)
let collect_and_send (self : t) (exp : Exporter.t) =
(* collect all metrics *)
let res = ref [] in
List.iter
(fun f ->
let f_metrics = f () in
res := List.rev_append f_metrics !res)
(Alist.get self.cbs);
let metrics = !res in
(* emit the metrics *)
Exporter.send_metrics exp metrics
let add_to_exporter ?(min_interval = Mtime.Span.(4 * s)) (exp : Exporter.t)
(self : t) =
let min_interval =
Mtime.Span.(
if is_shorter min_interval ~than:minimum_min_interval then
minimum_min_interval
else
min_interval)
in
let limiter = Interval_limiter.create ~min_interval () in
let on_tick () =
if Interval_limiter.make_attempt limiter then collect_and_send self exp
in
Exporter.on_tick exp on_tick
let with_set_added_to_exporter ?min_interval (exp : Exporter.t) (f : t -> 'a) :
'a =
let set = create () in
add_to_exporter ?min_interval exp set;
f set
let with_set_added_to_main_exporter ?min_interval (f : t -> unit) : unit =
match Main_exporter.get () with
| None -> ()
| Some exp -> with_set_added_to_exporter ?min_interval exp f
module Main_set = struct
let cur_set_ : t option Atomic.t = Atomic.make None
let rec get () : t =
match Atomic.get cur_set_ with
| Some s -> s
| None ->
let s = create () in
if Atomic.compare_and_set cur_set_ None (Some s) then (
(match Main_exporter.get () with
| Some exp -> add_to_exporter exp s
| None -> ());
s
) else
get ()
end

View file

@ -1,39 +0,0 @@
(** A set of callbacks that produce metrics when called. The metrics are
automatically called regularly.
This allows applications to register metrics callbacks from various points
in the program (or even in libraries), and not worry about setting
alarms/intervals to emit them. *)
type t
val create : unit -> t
val add_metrics_cb : t -> (unit -> Metrics.t list) -> unit
(** [register set f] adds the callback [f] to the [set].
[f] will be called at unspecified times and is expected to return a list of
metrics. It might be called regularly by the backend, in particular (but not
only) when {!Exporter.tick} is called. *)
val add_to_exporter : ?min_interval:Mtime.span -> Exporter.t -> t -> unit
(** Make sure we try to export metrics at every [tick] of the exporter.
@param min_interval
the minimum duration between two consecutive exports, using
{!Interval_limiter}. We don't want a too frequent [tick] to spam metrics.
Default [4s], minimum [0.1s]. *)
val with_set_added_to_exporter :
?min_interval:Mtime.span -> Exporter.t -> (t -> 'a) -> 'a
(** [with_set_added_to_exporter exp f] creates a set, adds it to the exporter,
and calls [f] on it *)
val with_set_added_to_main_exporter :
?min_interval:Mtime.span -> (t -> unit) -> unit
(** If there is a main exporter, add a set to it and call [f set], else do not
call [f] at all *)
module Main_set : sig
val get : unit -> t
(** The global set *)
end

View file

@ -1,31 +0,0 @@
(** Metrics.
The metrics emitter is used to, well, emit metrics. *)
open Opentelemetry_emitter
type t = {
emit: Metrics.t Emitter.t;
clock: Clock.t;
}
let dummy : t = { emit = Emitter.dummy; clock = Clock.ptime_clock }
let[@inline] enabled (self : t) = Emitter.enabled self.emit
let of_exporter (exp : Exporter.t) : t =
{ emit = exp.emit_metrics; clock = exp.clock }
let dynamic_main : t =
Main_exporter.dynamic_forward_to_main_exporter |> of_exporter
(** Emit some metrics to the collector (sync). This blocks until the backend has
pushed the metrics into some internal queue, or discarded them. *)
let (emit [@deprecated "use an explicit Metrics_emitter.t"]) =
fun ?attrs:_ (l : Metrics.t list) : unit ->
match Main_exporter.get () with
| None -> ()
| Some exp -> Exporter.send_metrics exp l
let[@inline] emit1 (self : t) (m : Metrics.t) : unit =
Emitter.emit self.emit [ m ]

View file

@ -22,8 +22,38 @@ module Timestamp_ns = Timestamp_ns
(** {2 Export signals to some external collector.} *)
module Emitter = Opentelemetry_emitter.Emitter
module Exporter = Exporter
module Main_exporter = Main_exporter
module Exporter = struct
include Exporter
(** Get a tracer from this exporter.
@since NEXT_RELEASE *)
let get_tracer (self : t) : Tracer.t = Tracer.of_exporter self
(** Get a meter from this exporter.
@since NEXT_RELEASE *)
let get_meter (self : t) : Meter.t = Meter.of_exporter self
(** Get a logger from this exporter.
@since NEXT_RELEASE *)
let get_logger (self : t) : Logger.t = Logger.of_exporter self
end
module Main_exporter = struct
include Main_exporter
(** Get a tracer forwarding to the current main exporter.
@since NEXT_RELEASE *)
let get_tracer () : Tracer.t = Tracer.default
(** Get a meter forwarding to the current main exporter.
@since NEXT_RELEASE *)
let get_meter () : Meter.t = Meter.default
(** Get a logger forwarding to the current main exporter.
@since NEXT_RELEASE *)
let get_logger () : Logger.t = Logger.default
end
module Collector = struct
include Exporter
@ -74,8 +104,8 @@ module Trace = Tracer [@@deprecated "use Tracer instead"]
(** {2 Metrics} *)
module Metrics = Metrics
module Metrics_callbacks = Metrics_callbacks
module Metrics_emitter = Metrics_emitter
module Instrument = Instrument
module Meter = Meter
(** {2 Logs} *)

View file

@ -28,9 +28,13 @@ let[@inline] enabled (self : t) = Emitter.enabled self.emit
let of_exporter (exp : Exporter.t) : t =
{ emit = exp.emit_spans; clock = exp.clock }
(** A tracer that uses the current {!Main_exporter} *)
let dynamic_main : t =
open struct
(* internal default, keeps the default param below working without deprecation alerts *)
let dynamic_main_ : t =
Main_exporter.dynamic_forward_to_main_exporter |> of_exporter
end
let default = dynamic_main_
let (add_event [@deprecated "use Span.add_event"]) = Span.add_event'
@ -106,8 +110,8 @@ let with_thunk_and_finally (self : t) ?(force_new_trace_id = false) ?trace_state
if true (default false), the span will not use a ambient scope, the
[~scope] argument, nor [~trace_id], but will instead always create fresh
identifiers for this span *)
let with_ ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state ?attrs ?kind
?trace_id ?parent ?links name (cb : Span.t -> 'a) : 'a =
let with_ ?(tracer = dynamic_main_) ?force_new_trace_id ?trace_state ?attrs
?kind ?trace_id ?parent ?links name (cb : Span.t -> 'a) : 'a =
let thunk, finally =
with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs ?kind
?trace_id ?parent ?links name cb

View file

@ -21,9 +21,8 @@ module Tracer = struct
include Tracer
(** Sync span guard *)
let with_ (type a) ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state
?attrs ?kind ?trace_id ?parent ?links name (cb : Span.t -> a Lwt.t) :
a Lwt.t =
let with_ (type a) ?(tracer = default) ?force_new_trace_id ?trace_state ?attrs
?kind ?trace_id ?parent ?links name (cb : Span.t -> a Lwt.t) : a Lwt.t =
let open Lwt.Syntax in
let thunk, finally =
with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs

View file

@ -90,15 +90,15 @@ let run_job () =
let run () =
OT.Gc_metrics.setup_on_main_exporter ();
OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set ->
OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics;
OT.Metrics_callbacks.add_metrics_cb set (fun () ->
let now = OT.Clock.now_main () in
OT.Meter.add_cb (fun ~clock:_ () -> OT.Main_exporter.self_metrics ());
OT.Meter.add_cb (fun ~clock () ->
let now = OT.Clock.now clock in
OT.Metrics.
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int ~now (Atomic.get num_sleep) ];
]));
]);
OT.Meter.add_to_main_exporter OT.Meter.default;
let n_jobs = max 1 !n_jobs in
Printf.printf "run %d job(s)\n%!" n_jobs;

View file

@ -84,15 +84,14 @@ let run_job job_id : unit Lwt.t =
let run () : unit Lwt.t =
T.Gc_metrics.setup_on_main_exporter ();
T.Metrics_callbacks.(
with_set_added_to_main_exporter (fun set ->
add_metrics_cb set (fun () ->
let now = T.Clock.now_main () in
T.Meter.add_cb (fun ~clock () ->
let now = T.Clock.now clock in
T.Metrics.
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int ~now (Atomic.get num_sleep) ];
])));
]);
T.Meter.add_to_main_exporter T.Meter.default;
let n_jobs = max 1 !n_jobs in
(* Printf.printf "run %d jobs\n%!" n_jobs; *)

View file

@ -71,17 +71,15 @@ let run_job clock _job_id iterations : unit =
let run env proc iterations () : unit =
OT.Gc_metrics.setup_on_main_exporter ();
OT.Metrics_callbacks.(
with_set_added_to_main_exporter
~min_interval:Mtime.Span.(10 * ms)
(fun set ->
add_metrics_cb set (fun () ->
let now = OT.Clock.now_main () in
OT.Meter.add_cb (fun ~clock () ->
let now = OT.Clock.now clock in
OT.Metrics.
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int ~now (Atomic.get num_sleep) ];
])));
]);
OT.Meter.add_to_main_exporter ~min_interval:Mtime.Span.(10 * ms)
OT.Meter.default;
let n_jobs = max 1 !n_jobs in
Printf.printf "run %d jobs in proc %d\n%!" n_jobs proc;

View file

@ -88,15 +88,15 @@ let run_job () : unit Lwt.t =
let run () : unit Lwt.t =
OT.Gc_metrics.setup_on_main_exporter ();
OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set ->
OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics;
OT.Metrics_callbacks.add_metrics_cb set (fun () ->
let now = OT.Clock.now_main () in
OT.Meter.add_cb (fun ~clock:_ () -> OT.Main_exporter.self_metrics ());
OT.Meter.add_cb (fun ~clock () ->
let now = OT.Clock.now clock in
OT.Metrics.
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int ~now (Atomic.get num_sleep) ];
]));
]);
OT.Meter.add_to_main_exporter OT.Meter.default;
let n_jobs = max 1 !n_jobs in
Printf.printf "run %d job(s)\n%!" n_jobs;

View file

@ -87,14 +87,14 @@ let run_job () =
let run () =
OT.Gc_metrics.setup_on_main_exporter ();
OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set ->
OT.Metrics_callbacks.add_metrics_cb set (fun () ->
let now = OT.Clock.now_main () in
OT.Meter.add_cb (fun ~clock () ->
let now = OT.Clock.now clock in
OT.Metrics.
[
sum ~name:"num-sleep" ~is_monotonic:true
[ int ~now (Atomic.get num_sleep) ];
]));
]);
OT.Meter.add_to_main_exporter OT.Meter.default;
let n_jobs = max 1 !n_jobs in
Printf.printf "run %d job(s)\n%!" n_jobs;

View file

@ -1,4 +1,4 @@
(tests
(names test_trace_context t_size)
(names test_trace_context t_size t_histogram)
(package opentelemetry)
(libraries pbrt opentelemetry opentelemetry-client))

View file

@ -0,0 +1,100 @@
{ name = "test.latency";
description = "test histogram";
unit_ = "" (* absent *);
data =
Some(
Histogram(
{ data_points =
[{ attributes = [];
start_time_unix_nano = 0 (* absent *);
time_unix_nano = 0;
count = 4;
sum = 15.;
bucket_counts = [1;1;1;1];
explicit_bounds = [1.;2.;5.];
exemplars = [];
flags = 0 (* absent *);
min = 0. (* absent *);
max = 0. (* absent *);
}
];
aggregation_temporality =
Aggregation_temporality_unspecified (* absent *);
}));
metadata = [];
}
{ name = "test.size";
description = "" (* absent *);
unit_ = "" (* absent *);
data =
Some(
Histogram(
{ data_points =
[{ attributes = [];
start_time_unix_nano = 0 (* absent *);
time_unix_nano = 0;
count = 4;
sum = 2.6;
bucket_counts = [3;1;0];
explicit_bounds = [1.;5.];
exemplars = [];
flags = 0 (* absent *);
min = 0. (* absent *);
max = 0. (* absent *);
}
];
aggregation_temporality =
Aggregation_temporality_unspecified (* absent *);
}));
metadata = [];
}
{ name = "test.empty";
description = "" (* absent *);
unit_ = "" (* absent *);
data =
Some(
Histogram(
{ data_points =
[{ attributes = [];
start_time_unix_nano = 0 (* absent *);
time_unix_nano = 0;
count = 0;
sum = 0.;
bucket_counts = [0;0;0;0];
explicit_bounds = [1.;2.;5.];
exemplars = [];
flags = 0 (* absent *);
min = 0. (* absent *);
max = 0. (* absent *);
}
];
aggregation_temporality =
Aggregation_temporality_unspecified (* absent *);
}));
metadata = [];
}
{ name = "test.boundary";
description = "" (* absent *);
unit_ = "" (* absent *);
data =
Some(
Histogram(
{ data_points =
[{ attributes = [];
start_time_unix_nano = 0 (* absent *);
time_unix_nano = 0;
count = 3;
sum = 8.;
bucket_counts = [1;1;1;0];
explicit_bounds = [1.;2.;5.];
exemplars = [];
flags = 0 (* absent *);
min = 0. (* absent *);
max = 0. (* absent *);
}
];
aggregation_temporality =
Aggregation_temporality_unspecified (* absent *);
}));
metadata = [];
}

58
tests/core/t_histogram.ml Normal file
View file

@ -0,0 +1,58 @@
open Opentelemetry
(** A deterministic clock that always returns timestamp 0 *)
let dummy_clock : Clock.t = { Clock.now = (fun () -> 0L) }
let emit h = h.Instrument.emit ~clock:dummy_clock ()
let pp_metrics metrics =
List.iter (Format.printf "%a@." Metrics.pp) metrics
(* ------------------------------------------------------------------ *)
(* Test 1: one value per bucket, plus one in the overflow bucket *)
(* bounds [1; 2; 5] → 4 buckets: (≤1) (1,2] (2,5] (5,∞) *)
let () =
let h =
Instrument.Histogram.create ~name:"test.latency"
~description:"test histogram" ~bounds:[ 1.; 2.; 5. ] ()
in
Instrument.Histogram.record h 0.5; (* bucket 0: ≤1 *)
Instrument.Histogram.record h 1.5; (* bucket 1: ≤2 *)
Instrument.Histogram.record h 3.0; (* bucket 2: ≤5 *)
Instrument.Histogram.record h 10.; (* bucket 3: >5 *)
(* count=4 sum=15.0 bucket_counts=[1;1;1;1] *)
pp_metrics (emit h)
(* ------------------------------------------------------------------ *)
(* Test 2: multiple values pile into the same bucket *)
let () =
let h =
Instrument.Histogram.create ~name:"test.size" ~bounds:[ 1.; 5. ] ()
in
Instrument.Histogram.record h 0.1;
Instrument.Histogram.record h 0.2;
Instrument.Histogram.record h 0.3; (* 3 values in bucket 0 *)
Instrument.Histogram.record h 2.0; (* 1 value in bucket 1 *)
(* count=4 sum=2.6 bucket_counts=[3;1;0] *)
pp_metrics (emit h)
(* ------------------------------------------------------------------ *)
(* Test 3: empty histogram *)
let () =
let h =
Instrument.Histogram.create ~name:"test.empty" ~bounds:[ 1.; 2.; 5. ] ()
in
(* count=0 sum=0.0 bucket_counts=[0;0;0;0] *)
pp_metrics (emit h)
(* ------------------------------------------------------------------ *)
(* Test 4: value exactly on a bound goes into that bound's bucket *)
let () =
let h =
Instrument.Histogram.create ~name:"test.boundary" ~bounds:[ 1.; 2.; 5. ] ()
in
Instrument.Histogram.record h 1.0; (* exactly on bound → bucket 0 *)
Instrument.Histogram.record h 2.0; (* exactly on bound → bucket 1 *)
Instrument.Histogram.record h 5.0; (* exactly on bound → bucket 2 *)
(* count=3 sum=8.0 bucket_counts=[1;1;1;0] *)
pp_metrics (emit h)