From 198027a5194a77279433278c6a19d6c7a27a8fd5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 28 Apr 2022 10:17:36 -0400 Subject: [PATCH 1/4] add histogram metrics (untested) --- src/opentelemetry.ml | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 5d09e8c9..58580172 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -625,15 +625,34 @@ module Metrics = struct ~aggregation_temporality ()) in default_metric ~name ?description ?unit_ ~data () - (* TODO + (** Histogram data + @param count number of values in population (non negative) + @param sum sum of values in population (0 if count is 0) + @param bucket_counts count value of histogram for each bucket. Sum of + the counts must be equal to [count]. + length must be [1+length explicit_bounds] + @param explicit_bounds strictly increasing list of bounds for the buckets *) + let histogram_data_point + ?(start_time_unix_nano=_program_start) + ?(now=Timestamp_ns.now_unix_ns()) + ?(attrs=[]) + ?(exemplars=[]) + ?(explicit_bounds=[]) + ?sum + ~bucket_counts + ~count + () : histogram_data_point = + let attributes = attrs |> List.map _conv_key_value in + default_histogram_data_point ~start_time_unix_nano ~time_unix_nano:now + ~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum () + let histogram ~name ?description ?unit_ ?aggregation_temporality - (l:number_data_point list) : t = - let data h= + (l:histogram_data_point list) : t = + let data = Histogram (default_histogram ~data_points:l ?aggregation_temporality ()) in default_metric ~name ?description ?unit_ ~data () - *) (* TODO: exponential history *) (* TODO: summary *) From 674412307510fb0b047fa58584a6a965d5a307f7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 29 Apr 2022 16:50:06 -0400 Subject: [PATCH 2/4] feat(otel): Metrics_callbacks module this module allows the user's programs and libraries to register some callbacks that will be regularly called to produce metrics. --- src/client/opentelemetry_client_ocurl.ml | 15 +++++- src/opentelemetry.ml | 65 ++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 780ef6c2..c7a34181 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -124,6 +124,7 @@ module type EMITTER = sig val push_trace : Trace.resource_spans list -> unit val push_metrics : Metrics.resource_metrics list -> unit + val set_on_tick_callbacks : (unit -> unit) list ref -> unit val tick : unit -> unit val cleanup : unit -> unit @@ -200,6 +201,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let ((module C) as curl) = (module Curl() : CURL) in + let on_tick_cbs_ = ref (ref []) in + let set_on_tick_callbacks = (:=) on_tick_cbs_ in + let send_metrics_http (l:Metrics.resource_metrics list list) = Pbrt.Encoder.reset encoder; let resource_metrics = @@ -332,6 +336,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let tick() = if Atomic.get needs_gc_metrics then sample_gc_metrics(); + List.iter + (fun f -> + try f() + with e -> + Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) + !(!on_tick_cbs_); if batch_timeout() then wakeup() in @@ -354,6 +364,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let push_metrics e = E_metrics.push e; if batch_timeout() then wakeup() + let set_on_tick_callbacks = set_on_tick_callbacks let tick=tick let cleanup () = continue := false; @@ -384,6 +395,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = E_metrics.push e; if batch_timeout() then emit_all_force() + let set_on_tick_callbacks = set_on_tick_callbacks + let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics(); if batch_timeout() then emit_all_force() @@ -454,7 +467,7 @@ end let setup_ ~(config:Config.t) () = debug_ := config.debug; let module B = Backend(struct let config=config end)() in - Opentelemetry.Collector.backend := Some (module B); + Opentelemetry.Collector.set_backend (module B); B.cleanup let setup ?(config=Config.make()) ?(enable=true) () = diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 58580172..6b2a2aa8 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -115,16 +115,36 @@ module Collector = struct (** Should be called regularly for background processing, timeout checks, etc. *) + val set_on_tick_callbacks : (unit -> unit) list ref -> unit + (** Give the collector the list of callbacks to be executed + when [tick()] is called. Each such callback should be short and + reentrant. Depending on the collector's implementation, it might be + called from a thread that is not the one that called [on_tick]. *) + val cleanup : unit -> unit end type backend = (module BACKEND) - let backend : backend option ref = ref None + (* hidden *) + open struct + let on_tick_cbs_ = ref [] + + let backend : backend option ref = ref None + end + + (** Set collector backend *) + let set_backend (b:backend) : unit = + let (module B) = b in + B.set_on_tick_callbacks on_tick_cbs_; + backend := Some b (** Is there a configured backend? *) let[@inline] has_backend () : bool = !backend != None + (** Current backend, if any *) + let[@inline] get_backend () : backend option = !backend + let send_trace (l:Trace.resource_spans list) ~ret = match !backend with | None -> ret() @@ -145,6 +165,8 @@ module Collector = struct | None -> Bytes.make 8 '?' | Some (module B) -> B.rand_bytes_8() + let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_ + (** Do background work. Call this regularly if the collector doesn't already have a ticker thread or internal timer. *) let tick () = @@ -580,9 +602,15 @@ end module Metrics = struct open Metrics_types + (** A single metric, measuring some time-varying quantity or statistical + distribution. It is composed of one or more data points that have + precise values and time stamps. Each distinct metric should have a + distinct name. *) type t = Metrics_types.metric - let _program_start = Timestamp_ns.now_unix_ns() + open struct + let _program_start = Timestamp_ns.now_unix_ns() + end (** Number data point, as a float *) let float ?(start_time_unix_nano=_program_start) @@ -677,6 +705,32 @@ module Metrics = struct Collector.send_metrics [rm] ~ret:ignore end +(** A set of callbacks that produce metrics when called. + + The metrics are automatically called regularly. + + This allows applications to register metrics callbacks from various points + in the program (or even in librariese), and not worry about setting + alarms/intervals to emit them. *) +module Metrics_callbacks = struct + open struct + let cbs_ : (unit -> Metrics.t list) list ref = ref [] + end + + + (** [register f] adds the callback [f] to the list. + [f] will be called at unspecified times and is expected to return + a list of metrics. *) + let register f : unit = + if !cbs_ = [] then ( + (* make sure we call [f] (and others) at each tick *) + Collector.on_tick (fun () -> + let m = List.map (fun f -> f()) !cbs_ |> List.flatten in + Metrics.emit m) + ); + cbs_ := f :: !cbs_ +end + module Logs = struct end @@ -778,12 +832,13 @@ end = struct let get_runtime_attributes () = Lazy.force runtime_attributes let basic_setup () = - let trigger() = - match !Collector.backend with + (* emit metrics when GC is called *) + let on_gc() = + match Collector.get_backend() with | None -> () | Some (module C) -> C.signal_emit_gc_metrics() in - ignore (Gc.create_alarm trigger : Gc.alarm) + ignore (Gc.create_alarm on_gc : Gc.alarm) let bytes_per_word = Sys.word_size / 8 let word_to_bytes n = n * bytes_per_word From 5b3f49890b5d55a0542a4d01e0bf2a461afcbb4b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 29 Apr 2022 16:50:52 -0400 Subject: [PATCH 3/4] test: use Metrics_callbacks from emit1 --- tests/bin/emit1.ml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index c7adf5f6..1db309fc 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -5,11 +5,15 @@ let (let@) f x = f x let sleep_inner = ref 0.1 let sleep_outer = ref 2.0 +let num_sleep = ref 0 let run () = Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url()); T.GC_metrics.basic_setup(); + T.Metrics_callbacks.register (fun () -> + T.Metrics.[ sum ~name:"num-sleep" ~is_monotonic:true [int !num_sleep] ]); + let i = ref 0 in while true do let@ scope = @@ -22,7 +26,9 @@ let run () = let@ scope = T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope ~attrs:["j", `Int j] "loop.inner" in + Unix.sleepf !sleep_outer; + incr num_sleep; incr i; @@ -33,7 +39,10 @@ let run () = (* allocate some stuff *) let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in ignore _arr; + Unix.sleepf !sleep_inner; + incr num_sleep; + if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); From 080e0de1386ec83f556ffdd090ff8f20ccc3b5df Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 30 Apr 2022 13:26:12 -0400 Subject: [PATCH 4/4] Update src/opentelemetry.ml Co-authored-by: Matt Bray --- src/opentelemetry.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 6b2a2aa8..bfd9c4b4 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -710,7 +710,7 @@ end The metrics are automatically called regularly. This allows applications to register metrics callbacks from various points - in the program (or even in librariese), and not worry about setting + in the program (or even in libraries), and not worry about setting alarms/intervals to emit them. *) module Metrics_callbacks = struct open struct