Merge pull request #19 from imandra-ai/metrics-utils

Metrics utils
This commit is contained in:
Simon Cruanes 2022-04-30 13:26:39 -04:00 committed by GitHub
commit b52d7611a6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 10 deletions

View file

@ -124,6 +124,7 @@ module type EMITTER = sig
val push_trace : Trace.resource_spans list -> unit
val push_metrics : Metrics.resource_metrics list -> unit
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
val tick : unit -> unit
val cleanup : unit -> unit
@ -200,6 +201,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let ((module C) as curl) = (module Curl() : CURL) in
let on_tick_cbs_ = ref (ref []) in
let set_on_tick_callbacks = (:=) on_tick_cbs_ in
let send_metrics_http (l:Metrics.resource_metrics list list) =
Pbrt.Encoder.reset encoder;
let resource_metrics =
@ -332,6 +336,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let tick() =
if Atomic.get needs_gc_metrics then sample_gc_metrics();
List.iter
(fun f ->
try f()
with e ->
Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e))
!(!on_tick_cbs_);
if batch_timeout() then wakeup()
in
@ -354,6 +364,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let push_metrics e =
E_metrics.push e;
if batch_timeout() then wakeup()
let set_on_tick_callbacks = set_on_tick_callbacks
let tick=tick
let cleanup () =
continue := false;
@ -384,6 +395,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
E_metrics.push e;
if batch_timeout() then emit_all_force()
let set_on_tick_callbacks = set_on_tick_callbacks
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics();
if batch_timeout() then emit_all_force()
@ -454,7 +467,7 @@ end
let setup_ ~(config:Config.t) () =
debug_ := config.debug;
let module B = Backend(struct let config=config end)() in
Opentelemetry.Collector.backend := Some (module B);
Opentelemetry.Collector.set_backend (module B);
B.cleanup
let setup ?(config=Config.make()) ?(enable=true) () =

View file

@ -115,16 +115,36 @@ module Collector = struct
(** Should be called regularly for background processing,
timeout checks, etc. *)
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
(** Give the collector the list of callbacks to be executed
when [tick()] is called. Each such callback should be short and
reentrant. Depending on the collector's implementation, it might be
called from a thread that is not the one that called [on_tick]. *)
val cleanup : unit -> unit
end
type backend = (module BACKEND)
let backend : backend option ref = ref None
(* hidden *)
open struct
let on_tick_cbs_ = ref []
let backend : backend option ref = ref None
end
(** Set collector backend *)
let set_backend (b:backend) : unit =
let (module B) = b in
B.set_on_tick_callbacks on_tick_cbs_;
backend := Some b
(** Is there a configured backend? *)
let[@inline] has_backend () : bool = !backend != None
(** Current backend, if any *)
let[@inline] get_backend () : backend option = !backend
let send_trace (l:Trace.resource_spans list) ~ret =
match !backend with
| None -> ret()
@ -145,6 +165,8 @@ module Collector = struct
| None -> Bytes.make 8 '?'
| Some (module B) -> B.rand_bytes_8()
let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_
(** Do background work. Call this regularly if the collector doesn't
already have a ticker thread or internal timer. *)
let tick () =
@ -580,9 +602,15 @@ end
module Metrics = struct
open Metrics_types
(** A single metric, measuring some time-varying quantity or statistical
distribution. It is composed of one or more data points that have
precise values and time stamps. Each distinct metric should have a
distinct name. *)
type t = Metrics_types.metric
let _program_start = Timestamp_ns.now_unix_ns()
open struct
let _program_start = Timestamp_ns.now_unix_ns()
end
(** Number data point, as a float *)
let float ?(start_time_unix_nano=_program_start)
@ -625,15 +653,34 @@ module Metrics = struct
~aggregation_temporality ()) in
default_metric ~name ?description ?unit_ ~data ()
(* TODO
(** Histogram data
@param count number of values in population (non negative)
@param sum sum of values in population (0 if count is 0)
@param bucket_counts count value of histogram for each bucket. Sum of
the counts must be equal to [count].
length must be [1+length explicit_bounds]
@param explicit_bounds strictly increasing list of bounds for the buckets *)
let histogram_data_point
?(start_time_unix_nano=_program_start)
?(now=Timestamp_ns.now_unix_ns())
?(attrs=[])
?(exemplars=[])
?(explicit_bounds=[])
?sum
~bucket_counts
~count
() : histogram_data_point =
let attributes = attrs |> List.map _conv_key_value in
default_histogram_data_point ~start_time_unix_nano ~time_unix_nano:now
~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum ()
let histogram ~name ?description ?unit_
?aggregation_temporality
(l:number_data_point list) : t =
let data h=
(l:histogram_data_point list) : t =
let data =
Histogram (default_histogram ~data_points:l
?aggregation_temporality ()) in
default_metric ~name ?description ?unit_ ~data ()
*)
(* TODO: exponential history *)
(* TODO: summary *)
@ -658,6 +705,32 @@ module Metrics = struct
Collector.send_metrics [rm] ~ret:ignore
end
(** A set of callbacks that produce metrics when called.
The metrics are automatically called regularly.
This allows applications to register metrics callbacks from various points
in the program (or even in libraries), and not worry about setting
alarms/intervals to emit them. *)
module Metrics_callbacks = struct
open struct
let cbs_ : (unit -> Metrics.t list) list ref = ref []
end
(** [register f] adds the callback [f] to the list.
[f] will be called at unspecified times and is expected to return
a list of metrics. *)
let register f : unit =
if !cbs_ = [] then (
(* make sure we call [f] (and others) at each tick *)
Collector.on_tick (fun () ->
let m = List.map (fun f -> f()) !cbs_ |> List.flatten in
Metrics.emit m)
);
cbs_ := f :: !cbs_
end
module Logs = struct
end
@ -759,12 +832,13 @@ end = struct
let get_runtime_attributes () = Lazy.force runtime_attributes
let basic_setup () =
let trigger() =
match !Collector.backend with
(* emit metrics when GC is called *)
let on_gc() =
match Collector.get_backend() with
| None -> ()
| Some (module C) -> C.signal_emit_gc_metrics()
in
ignore (Gc.create_alarm trigger : Gc.alarm)
ignore (Gc.create_alarm on_gc : Gc.alarm)
let bytes_per_word = Sys.word_size / 8
let word_to_bytes n = n * bytes_per_word

View file

@ -5,11 +5,15 @@ let (let@) f x = f x
let sleep_inner = ref 0.1
let sleep_outer = ref 2.0
let num_sleep = ref 0
let run () =
Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url());
T.GC_metrics.basic_setup();
T.Metrics_callbacks.register (fun () ->
T.Metrics.[ sum ~name:"num-sleep" ~is_monotonic:true [int !num_sleep] ]);
let i = ref 0 in
while true do
let@ scope =
@ -22,7 +26,9 @@ let run () =
let@ scope = T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope
~attrs:["j", `Int j]
"loop.inner" in
Unix.sleepf !sleep_outer;
incr num_sleep;
incr i;
@ -33,7 +39,10 @@ let run () =
(* allocate some stuff *)
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
ignore _arr;
Unix.sleepf !sleep_inner;
incr num_sleep;
if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *)
T.Trace.add_event scope (fun()->T.Event.make "done with alloc");