mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-07 18:37:56 -05:00
self exported metrics
This commit is contained in:
parent
bcbb07027f
commit
3a72a73c15
2 changed files with 53 additions and 7 deletions
|
|
@ -6,6 +6,7 @@
|
||||||
|
|
||||||
(* TODO *)
|
(* TODO *)
|
||||||
|
|
||||||
|
module OT = Opentelemetry
|
||||||
open Opentelemetry
|
open Opentelemetry
|
||||||
|
|
||||||
let[@inline] (let@) f x = f x
|
let[@inline] (let@) f x = f x
|
||||||
|
|
@ -74,6 +75,9 @@ type error = [
|
||||||
| `Failure of string
|
| `Failure of string
|
||||||
]
|
]
|
||||||
|
|
||||||
|
let n_errors = Atomic.make 0
|
||||||
|
let n_dropped = Atomic.make 0
|
||||||
|
|
||||||
let report_err_ = function
|
let report_err_ = function
|
||||||
| `Failure msg ->
|
| `Failure msg ->
|
||||||
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
|
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
|
||||||
|
|
@ -267,7 +271,10 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -
|
||||||
let is_big_enough () = FQueue.size q >= n
|
let is_big_enough () = FQueue.size q >= n
|
||||||
let push x =
|
let push x =
|
||||||
if not (FQueue.push q x) || FQueue.size q > n then (
|
if not (FQueue.push q x) || FQueue.size q > n then (
|
||||||
!on_full(); (* drop *)
|
!on_full();
|
||||||
|
if not (FQueue.push q x) then (
|
||||||
|
Atomic.incr n_dropped; (* drop item *)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
let pop_iter_all f = FQueue.pop_iter_all q f
|
let pop_iter_all f = FQueue.pop_iter_all q f
|
||||||
end in
|
end in
|
||||||
|
|
@ -308,7 +315,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||||
(Pbrt.Encoder.to_string encoder)
|
(Pbrt.Encoder.to_string encoder)
|
||||||
with
|
with
|
||||||
| Ok () -> ()
|
| Ok () -> ()
|
||||||
| Error err -> report_err_ err
|
| Error err ->
|
||||||
|
(* TODO: log error _via_ otel? *)
|
||||||
|
Atomic.incr n_errors;
|
||||||
|
report_err_ err
|
||||||
end;
|
end;
|
||||||
(* signal completion *)
|
(* signal completion *)
|
||||||
List.iter (fun (_,over) -> over()) l;
|
List.iter (fun (_,over) -> over()) l;
|
||||||
|
|
@ -326,7 +336,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
|
||||||
(Pbrt.Encoder.to_string encoder)
|
(Pbrt.Encoder.to_string encoder)
|
||||||
with
|
with
|
||||||
| Ok () -> ()
|
| Ok () -> ()
|
||||||
| Error err -> report_err_ err
|
| Error err ->
|
||||||
|
(* TODO: log error _via_ otel? *)
|
||||||
|
Atomic.incr n_errors;
|
||||||
|
report_err_ err
|
||||||
end;
|
end;
|
||||||
(* signal completion *)
|
(* signal completion *)
|
||||||
List.iter (fun (_,over) -> over()) l;
|
List.iter (fun (_,over) -> over()) l;
|
||||||
|
|
@ -474,10 +487,39 @@ module Backend(Arg : sig val config : Config.t end)()
|
||||||
ret()
|
ret()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let last_sent_metrics = Atomic.make (Mtime_clock.now())
|
||||||
|
let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *)
|
||||||
|
|
||||||
|
let additional_metrics () : _ list =
|
||||||
|
(* add exporter metrics to the lot? *)
|
||||||
|
let last_emit = Atomic.get last_sent_metrics in
|
||||||
|
let now = Mtime_clock.now() in
|
||||||
|
let add_own_metrics =
|
||||||
|
let elapsed = Mtime.span last_emit now in
|
||||||
|
Mtime.Span.compare elapsed timeout_sent_metrics > 0
|
||||||
|
in
|
||||||
|
|
||||||
|
if add_own_metrics then (
|
||||||
|
let open OT.Metrics in
|
||||||
|
Atomic.set last_sent_metrics now;
|
||||||
|
[make_resource_metrics [
|
||||||
|
sum ~name:"otel-export.dropped" ~is_monotonic:true [
|
||||||
|
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
|
||||||
|
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
|
||||||
|
];
|
||||||
|
sum ~name:"otel-export.errors" ~is_monotonic:true [
|
||||||
|
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
|
||||||
|
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
|
||||||
|
];
|
||||||
|
]]
|
||||||
|
) else []
|
||||||
|
|
||||||
let send_metrics : Metrics.resource_metrics list sender = {
|
let send_metrics : Metrics.resource_metrics list sender = {
|
||||||
send=fun m ~over ~ret ->
|
send=fun m ~over ~ret ->
|
||||||
let@() = with_lock_ in
|
let@() = with_lock_ in
|
||||||
if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m;
|
if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m;
|
||||||
|
|
||||||
|
let m = List.rev_append (additional_metrics()) m in
|
||||||
push_metrics m ~over;
|
push_metrics m ~over;
|
||||||
ret()
|
ret()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -486,13 +486,17 @@ module Metrics = struct
|
||||||
(* TODO: summary *)
|
(* TODO: summary *)
|
||||||
(* TODO: exemplar *)
|
(* TODO: exemplar *)
|
||||||
|
|
||||||
(** Emit some metrics to the collector (sync). *)
|
(** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *)
|
||||||
let emit (l:t list) : unit =
|
let make_resource_metrics (l:t list) : resource_metrics =
|
||||||
let lm =
|
let lm =
|
||||||
default_instrumentation_library_metrics
|
default_instrumentation_library_metrics
|
||||||
~instrumentation_library:(Some Globals.instrumentation_library)
|
~instrumentation_library:(Some Globals.instrumentation_library)
|
||||||
~metrics:l () in
|
~metrics:l () in
|
||||||
let rm = default_resource_metrics
|
default_resource_metrics
|
||||||
~instrumentation_library_metrics:[lm] () in
|
~instrumentation_library_metrics:[lm] ()
|
||||||
|
|
||||||
|
(** Emit some metrics to the collector (sync). *)
|
||||||
|
let emit (l:t list) : unit =
|
||||||
|
let rm = make_resource_metrics l in
|
||||||
Collector.send_metrics [rm] ~over:ignore ~ret:ignore
|
Collector.send_metrics [rm] ~over:ignore ~ret:ignore
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue