fix: yet another fix for emitting GC metrics

now, the frontend (Opentelemetry) is responsible for signalling the
backend when to emit GC stats; but the backend just samples GC metrics
on the next `tick()` and pushes them in the next batch. This saves us
from having to worry about re-entrancy and GC metrics being emitted
during the emission of something else.
This commit is contained in:
Simon Cruanes 2022-04-11 16:29:59 -04:00
parent 786ebb611a
commit 3d0d031bcd
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 47 additions and 56 deletions

View file

@ -39,14 +39,15 @@ let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_ur
let get_url () = !url let get_url () = !url
let set_url s = url := s let set_url s = url := s
let enable_gc_metrics = ref false
let needs_gc_metrics = Atomic.make false let needs_gc_metrics = Atomic.make false
let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *)
let emit_gc_metrics () = (* capture current GC metrics and push them into {!gc_metrics} for later
let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in collection *)
let sample_gc_metrics () =
Atomic.set needs_gc_metrics false; Atomic.set needs_gc_metrics false;
let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in
AList.add gc_metrics l AList.add gc_metrics l
let lock_ : (unit -> unit) ref = ref ignore let lock_ : (unit -> unit) ref = ref ignore
@ -324,6 +325,9 @@ let start_bg_thread (f: unit -> unit) : unit =
in in
ignore (Thread.create run () : Thread.t) ignore (Thread.create run () : Thread.t)
let l_is_empty = function [] -> true | _::_ -> false
let batch_is_empty = List.for_all l_is_empty
(* make an emitter. (* make an emitter.
exceptions inside should be caught, see exceptions inside should be caught, see
@ -333,10 +337,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let continue = ref true in let continue = ref true in
(* to ensure reentrancy, we keep track of whether we're already
emitting something *)
let emitting = Atomic.make false in
let ((module E_trace) : Trace.resource_spans list push), on_trace_full = let ((module E_trace) : Trace.resource_spans list push), on_trace_full =
mk_push ?batch:config.batch_traces () in mk_push ?batch:config.batch_traces () in
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full =
@ -394,28 +394,24 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
in in
let emit_metrics ?(force=false) () : bool = let emit_metrics ?(force=false) () : bool =
if not (Atomic.get emitting) && if force || (not force && E_metrics.is_big_enough ()) then (
(force && not (E_metrics.is_empty())) || let batch = ref [AList.pop_all gc_metrics] in
(not force && E_metrics.is_big_enough ()) then (
Atomic.set emitting true;
let batch = ref [] in
E_metrics.pop_iter_all (fun l -> batch := l :: !batch); E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
emit_metrics !batch; if not (batch_is_empty !batch) then (
emit_metrics !batch;
);
Atomic.set last_wakeup (Mtime_clock.now()); Atomic.set last_wakeup (Mtime_clock.now());
Atomic.set emitting false;
true true
) else false ) else false
in in
let emit_traces ?(force=false) () : bool = let emit_traces ?(force=false) () : bool =
if not (Atomic.get emitting) && if force || (not force && E_trace.is_big_enough ()) then (
(force && not (E_trace.is_empty())) ||
(not force && E_trace.is_big_enough ()) then (
Atomic.set emitting true;
let batch = ref [] in let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch); E_trace.pop_iter_all (fun l -> batch := l :: !batch);
emit_traces !batch; if not (l_is_empty !batch) then (
emit_traces !batch;
);
Atomic.set last_wakeup (Mtime_clock.now()); Atomic.set last_wakeup (Mtime_clock.now());
Atomic.set emitting false;
true true
) else false ) else false
in in
@ -478,7 +474,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
on_trace_full wakeup; on_trace_full wakeup;
let tick() = let tick() =
if Atomic.get needs_gc_metrics then emit_gc_metrics(); if Atomic.get needs_gc_metrics then sample_gc_metrics();
if batch_timeout() then wakeup() if batch_timeout() then wakeup()
in in
@ -510,6 +506,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
) else ( ) else (
on_metrics_full (fun () -> on_metrics_full (fun () ->
if Atomic.get needs_gc_metrics then sample_gc_metrics();
ignore (emit_metrics () : bool)); ignore (emit_metrics () : bool));
on_trace_full (fun () -> on_trace_full (fun () ->
ignore (emit_traces () : bool)); ignore (emit_traces () : bool));
@ -531,7 +528,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
if batch_timeout() then emit_all_force() if batch_timeout() then emit_all_force()
let tick () = let tick () =
if Atomic.get needs_gc_metrics then emit_gc_metrics(); if Atomic.get needs_gc_metrics then sample_gc_metrics();
if batch_timeout() then emit_all_force() if batch_timeout() then emit_all_force()
let cleanup = cleanup let cleanup = cleanup
@ -560,15 +557,9 @@ module Backend(Arg : sig val config : Config.t end)()
let last_sent_metrics = Atomic.make (Mtime_clock.now()) let last_sent_metrics = Atomic.make (Mtime_clock.now())
let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *)
let enable_emit_gc_metrics () = let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true
if not !enable_gc_metrics then (
enable_gc_metrics := true;
(* any time the GC runs, switch this boolean *)
let toggle_gc () = Atomic.set needs_gc_metrics true in
ignore (Gc.create_alarm toggle_gc : Gc.alarm);
)
let additional_metrics () : _ list = let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *) (* add exporter metrics to the lot? *)
let last_emit = Atomic.get last_sent_metrics in let last_emit = Atomic.get last_sent_metrics in
let now = Mtime_clock.now() in let now = Mtime_clock.now() in
@ -577,25 +568,20 @@ module Backend(Arg : sig val config : Config.t end)()
Mtime.Span.compare elapsed timeout_sent_metrics > 0 Mtime.Span.compare elapsed timeout_sent_metrics > 0
in in
let l = AList.pop_all gc_metrics in if add_own_metrics then (
let l = let open OT.Metrics in
if add_own_metrics then ( Atomic.set last_sent_metrics now;
let open OT.Metrics in [make_resource_metrics [
Atomic.set last_sent_metrics now; sum ~name:"otel-export.dropped" ~is_monotonic:true [
make_resource_metrics [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
sum ~name:"otel-export.dropped" ~is_monotonic:true [ ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ];
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); sum ~name:"otel-export.errors" ~is_monotonic:true [
]; int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
sum ~name:"otel-export.errors" ~is_monotonic:true [ ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ];
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); ]]
]; ) else []
] :: l
) else l
in
if l <> [] then l else []
let send_metrics : Metrics.resource_metrics list sender = { let send_metrics : Metrics.resource_metrics list sender = {
send=fun m ~ret -> send=fun m ~ret ->

View file

@ -98,8 +98,10 @@ module Collector = struct
val rand_bytes_8 : unit -> bytes val rand_bytes_8 : unit -> bytes
(** Generate 16 bytes of random data *) (** Generate 16 bytes of random data *)
val enable_emit_gc_metrics : unit -> unit val signal_emit_gc_metrics : unit -> unit
(** Enable the emission of GC metrics. This sets up a GC alarm. *) (** Signal the backend that it should emit GC metrics when it has the
chance. This should be installed in a GC alarm or another form
of regular trigger. *)
val tick : unit -> unit val tick : unit -> unit
(** Should be called regularly for background processing, (** Should be called regularly for background processing,
@ -681,9 +683,12 @@ module GC_metrics : sig
end = struct end = struct
let basic_setup () = let basic_setup () =
match !Collector.backend with let trigger() =
| None -> () match !Collector.backend with
| Some (module C) -> C.enable_emit_gc_metrics() | None -> ()
| Some (module C) -> C.signal_emit_gc_metrics()
in
ignore (Gc.create_alarm trigger : Gc.alarm)
let bytes_per_word = Sys.word_size / 8 let bytes_per_word = Sys.word_size / 8
let word_to_bytes n = n * bytes_per_word let word_to_bytes n = n * bytes_per_word
@ -705,7 +710,7 @@ end = struct
~is_monotonic:true ~is_monotonic:true
~unit_:"B" ~unit_:"B"
[ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ]; [ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ];
sum ~name:"ocaml.gc.minor_collections" sum ~name:"ocaml_gc_minor_collections"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true ~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.minor_collections ]; [ int ~start_time_unix_nano gc.Gc.minor_collections ];
@ -713,7 +718,7 @@ end = struct
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true ~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.major_collections ]; [ int ~start_time_unix_nano gc.Gc.major_collections ];
sum ~name:"ocaml.gc.compactions" sum ~name:"ocaml_gc_compactions"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true ~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.compactions ]; [ int ~start_time_unix_nano gc.Gc.compactions ];