mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-11 13:08:35 -04:00
feat(otel): Metrics_callbacks module
this module allows the user's programs and libraries to register some callbacks that will be regularly called to produce metrics.
This commit is contained in:
parent
198027a519
commit
6744123075
2 changed files with 74 additions and 6 deletions
|
|
@ -124,6 +124,7 @@ module type EMITTER = sig
|
||||||
|
|
||||||
val push_trace : Trace.resource_spans list -> unit
|
val push_trace : Trace.resource_spans list -> unit
|
||||||
val push_metrics : Metrics.resource_metrics list -> unit
|
val push_metrics : Metrics.resource_metrics list -> unit
|
||||||
|
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
|
||||||
|
|
||||||
val tick : unit -> unit
|
val tick : unit -> unit
|
||||||
val cleanup : unit -> unit
|
val cleanup : unit -> unit
|
||||||
|
|
@ -200,6 +201,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||||
|
|
||||||
let ((module C) as curl) = (module Curl() : CURL) in
|
let ((module C) as curl) = (module Curl() : CURL) in
|
||||||
|
|
||||||
|
let on_tick_cbs_ = ref (ref []) in
|
||||||
|
let set_on_tick_callbacks = (:=) on_tick_cbs_ in
|
||||||
|
|
||||||
let send_metrics_http (l:Metrics.resource_metrics list list) =
|
let send_metrics_http (l:Metrics.resource_metrics list list) =
|
||||||
Pbrt.Encoder.reset encoder;
|
Pbrt.Encoder.reset encoder;
|
||||||
let resource_metrics =
|
let resource_metrics =
|
||||||
|
|
@ -332,6 +336,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||||
|
|
||||||
let tick() =
|
let tick() =
|
||||||
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
||||||
|
List.iter
|
||||||
|
(fun f ->
|
||||||
|
try f()
|
||||||
|
with e ->
|
||||||
|
Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e))
|
||||||
|
!(!on_tick_cbs_);
|
||||||
if batch_timeout() then wakeup()
|
if batch_timeout() then wakeup()
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
@ -354,6 +364,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||||
let push_metrics e =
|
let push_metrics e =
|
||||||
E_metrics.push e;
|
E_metrics.push e;
|
||||||
if batch_timeout() then wakeup()
|
if batch_timeout() then wakeup()
|
||||||
|
let set_on_tick_callbacks = set_on_tick_callbacks
|
||||||
let tick=tick
|
let tick=tick
|
||||||
let cleanup () =
|
let cleanup () =
|
||||||
continue := false;
|
continue := false;
|
||||||
|
|
@ -384,6 +395,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||||
E_metrics.push e;
|
E_metrics.push e;
|
||||||
if batch_timeout() then emit_all_force()
|
if batch_timeout() then emit_all_force()
|
||||||
|
|
||||||
|
let set_on_tick_callbacks = set_on_tick_callbacks
|
||||||
|
|
||||||
let tick () =
|
let tick () =
|
||||||
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
if Atomic.get needs_gc_metrics then sample_gc_metrics();
|
||||||
if batch_timeout() then emit_all_force()
|
if batch_timeout() then emit_all_force()
|
||||||
|
|
@ -454,7 +467,7 @@ end
|
||||||
let setup_ ~(config:Config.t) () =
|
let setup_ ~(config:Config.t) () =
|
||||||
debug_ := config.debug;
|
debug_ := config.debug;
|
||||||
let module B = Backend(struct let config=config end)() in
|
let module B = Backend(struct let config=config end)() in
|
||||||
Opentelemetry.Collector.backend := Some (module B);
|
Opentelemetry.Collector.set_backend (module B);
|
||||||
B.cleanup
|
B.cleanup
|
||||||
|
|
||||||
let setup ?(config=Config.make()) ?(enable=true) () =
|
let setup ?(config=Config.make()) ?(enable=true) () =
|
||||||
|
|
|
||||||
|
|
@ -115,16 +115,36 @@ module Collector = struct
|
||||||
(** Should be called regularly for background processing,
|
(** Should be called regularly for background processing,
|
||||||
timeout checks, etc. *)
|
timeout checks, etc. *)
|
||||||
|
|
||||||
|
val set_on_tick_callbacks : (unit -> unit) list ref -> unit
|
||||||
|
(** Give the collector the list of callbacks to be executed
|
||||||
|
when [tick()] is called. Each such callback should be short and
|
||||||
|
reentrant. Depending on the collector's implementation, it might be
|
||||||
|
called from a thread that is not the one that called [on_tick]. *)
|
||||||
|
|
||||||
val cleanup : unit -> unit
|
val cleanup : unit -> unit
|
||||||
end
|
end
|
||||||
|
|
||||||
type backend = (module BACKEND)
|
type backend = (module BACKEND)
|
||||||
|
|
||||||
let backend : backend option ref = ref None
|
(* hidden *)
|
||||||
|
open struct
|
||||||
|
let on_tick_cbs_ = ref []
|
||||||
|
|
||||||
|
let backend : backend option ref = ref None
|
||||||
|
end
|
||||||
|
|
||||||
|
(** Set collector backend *)
|
||||||
|
let set_backend (b:backend) : unit =
|
||||||
|
let (module B) = b in
|
||||||
|
B.set_on_tick_callbacks on_tick_cbs_;
|
||||||
|
backend := Some b
|
||||||
|
|
||||||
(** Is there a configured backend? *)
|
(** Is there a configured backend? *)
|
||||||
let[@inline] has_backend () : bool = !backend != None
|
let[@inline] has_backend () : bool = !backend != None
|
||||||
|
|
||||||
|
(** Current backend, if any *)
|
||||||
|
let[@inline] get_backend () : backend option = !backend
|
||||||
|
|
||||||
let send_trace (l:Trace.resource_spans list) ~ret =
|
let send_trace (l:Trace.resource_spans list) ~ret =
|
||||||
match !backend with
|
match !backend with
|
||||||
| None -> ret()
|
| None -> ret()
|
||||||
|
|
@ -145,6 +165,8 @@ module Collector = struct
|
||||||
| None -> Bytes.make 8 '?'
|
| None -> Bytes.make 8 '?'
|
||||||
| Some (module B) -> B.rand_bytes_8()
|
| Some (module B) -> B.rand_bytes_8()
|
||||||
|
|
||||||
|
let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_
|
||||||
|
|
||||||
(** Do background work. Call this regularly if the collector doesn't
|
(** Do background work. Call this regularly if the collector doesn't
|
||||||
already have a ticker thread or internal timer. *)
|
already have a ticker thread or internal timer. *)
|
||||||
let tick () =
|
let tick () =
|
||||||
|
|
@ -580,9 +602,15 @@ end
|
||||||
module Metrics = struct
|
module Metrics = struct
|
||||||
open Metrics_types
|
open Metrics_types
|
||||||
|
|
||||||
|
(** A single metric, measuring some time-varying quantity or statistical
|
||||||
|
distribution. It is composed of one or more data points that have
|
||||||
|
precise values and time stamps. Each distinct metric should have a
|
||||||
|
distinct name. *)
|
||||||
type t = Metrics_types.metric
|
type t = Metrics_types.metric
|
||||||
|
|
||||||
let _program_start = Timestamp_ns.now_unix_ns()
|
open struct
|
||||||
|
let _program_start = Timestamp_ns.now_unix_ns()
|
||||||
|
end
|
||||||
|
|
||||||
(** Number data point, as a float *)
|
(** Number data point, as a float *)
|
||||||
let float ?(start_time_unix_nano=_program_start)
|
let float ?(start_time_unix_nano=_program_start)
|
||||||
|
|
@ -677,6 +705,32 @@ module Metrics = struct
|
||||||
Collector.send_metrics [rm] ~ret:ignore
|
Collector.send_metrics [rm] ~ret:ignore
|
||||||
end
|
end
|
||||||
|
|
||||||
|
(** A set of callbacks that produce metrics when called.
|
||||||
|
|
||||||
|
The metrics are automatically called regularly.
|
||||||
|
|
||||||
|
This allows applications to register metrics callbacks from various points
|
||||||
|
in the program (or even in librariese), and not worry about setting
|
||||||
|
alarms/intervals to emit them. *)
|
||||||
|
module Metrics_callbacks = struct
|
||||||
|
open struct
|
||||||
|
let cbs_ : (unit -> Metrics.t list) list ref = ref []
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
(** [register f] adds the callback [f] to the list.
|
||||||
|
[f] will be called at unspecified times and is expected to return
|
||||||
|
a list of metrics. *)
|
||||||
|
let register f : unit =
|
||||||
|
if !cbs_ = [] then (
|
||||||
|
(* make sure we call [f] (and others) at each tick *)
|
||||||
|
Collector.on_tick (fun () ->
|
||||||
|
let m = List.map (fun f -> f()) !cbs_ |> List.flatten in
|
||||||
|
Metrics.emit m)
|
||||||
|
);
|
||||||
|
cbs_ := f :: !cbs_
|
||||||
|
end
|
||||||
|
|
||||||
module Logs = struct
|
module Logs = struct
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
@ -778,12 +832,13 @@ end = struct
|
||||||
let get_runtime_attributes () = Lazy.force runtime_attributes
|
let get_runtime_attributes () = Lazy.force runtime_attributes
|
||||||
|
|
||||||
let basic_setup () =
|
let basic_setup () =
|
||||||
let trigger() =
|
(* emit metrics when GC is called *)
|
||||||
match !Collector.backend with
|
let on_gc() =
|
||||||
|
match Collector.get_backend() with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some (module C) -> C.signal_emit_gc_metrics()
|
| Some (module C) -> C.signal_emit_gc_metrics()
|
||||||
in
|
in
|
||||||
ignore (Gc.create_alarm trigger : Gc.alarm)
|
ignore (Gc.create_alarm on_gc : Gc.alarm)
|
||||||
|
|
||||||
let bytes_per_word = Sys.word_size / 8
|
let bytes_per_word = Sys.word_size / 8
|
||||||
let word_to_bytes n = n * bytes_per_word
|
let word_to_bytes n = n * bytes_per_word
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue