feat(collector): move GC metrics logic to the collector

this helps avoiding reentrancy issue, where the collector allocates,
triggers the GC metrics alarm, which tries to push new metrics in the
middle of a batch (currently leading to a potential deadlock).
This commit is contained in:
Simon Cruanes 2022-04-11 13:09:13 -04:00
parent 591cbad4b2
commit 5180be31bc
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 123 additions and 60 deletions

View file

@ -10,6 +10,26 @@ module OT = Opentelemetry
open Opentelemetry open Opentelemetry
module Atomic = Opentelemetry_atomic.Atomic module Atomic = Opentelemetry_atomic.Atomic
(** Atomic list *)
module AList : sig
type 'a t
val make : unit -> 'a t
val add : 'a t -> 'a -> unit
val pop_all : 'a t -> 'a list
end = struct
type 'a t = 'a list Atomic.t
let make () = Atomic.make []
let add self x =
while
let old = Atomic.get self in
let l' = x :: old in
not (Atomic.compare_and_set self old l')
do () done
let rec pop_all self =
let l = Atomic.get self in
if Atomic.compare_and_set self l [] then l else pop_all self
end
let[@inline] (let@) f x = f x let[@inline] (let@) f x = f x
let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false) let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false)
@ -19,6 +39,15 @@ let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_ur
let get_url () = !url let get_url () = !url
let set_url s = url := s let set_url s = url := s
let enable_gc_metrics = ref false
let needs_gc_metrics = Atomic.make false
let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *)
let emit_gc_metrics () =
let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in
AList.add gc_metrics l
let lock_ : (unit -> unit) ref = ref ignore let lock_ : (unit -> unit) ref = ref ignore
let unlock_ : (unit -> unit) ref = ref ignore let unlock_ : (unit -> unit) ref = ref ignore
let set_mutex ~lock ~unlock : unit = let set_mutex ~lock ~unlock : unit =
@ -303,6 +332,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let continue = ref true in let continue = ref true in
(* to ensure reentrancy, we keep track of whether we're already
emitting something *)
let emitting = Atomic.make false in
let ((module E_trace) : Trace.resource_spans list push), on_trace_full = let ((module E_trace) : Trace.resource_spans list push), on_trace_full =
mk_push ?batch:config.batch_traces () in mk_push ?batch:config.batch_traces () in
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full =
@ -320,9 +353,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
(Metrics_service.default_export_metrics_service_request (Metrics_service.default_export_metrics_service_request
~resource_metrics ()) ~resource_metrics ())
encoder; encoder;
let data = Pbrt.Encoder.to_string encoder in
begin match begin match
C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) C.send ~path:"/v1/metrics" ~decode:(fun _ -> ())
(Pbrt.Encoder.to_string encoder) data
with with
| Ok () -> () | Ok () -> ()
| Error err -> | Error err ->
@ -359,22 +393,28 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
in in
let emit_metrics ?(force=false) () : bool = let emit_metrics ?(force=false) () : bool =
if (force && not (E_metrics.is_empty())) || if not (Atomic.get emitting) &&
(force && not (E_metrics.is_empty())) ||
(not force && E_metrics.is_big_enough ()) then ( (not force && E_metrics.is_big_enough ()) then (
Atomic.set emitting true;
let batch = ref [] in let batch = ref [] in
E_metrics.pop_iter_all (fun l -> batch := l :: !batch); E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
emit_metrics !batch; emit_metrics !batch;
Atomic.set last_wakeup (Mtime_clock.now()); Atomic.set last_wakeup (Mtime_clock.now());
Atomic.set emitting false;
true true
) else false ) else false
in in
let emit_traces ?(force=false) () : bool = let emit_traces ?(force=false) () : bool =
if (force && not (E_trace.is_empty())) || if not (Atomic.get emitting) &&
(force && not (E_trace.is_empty())) ||
(not force && E_trace.is_big_enough ()) then ( (not force && E_trace.is_big_enough ()) then (
Atomic.set emitting true;
let batch = ref [] in let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch); E_trace.pop_iter_all (fun l -> batch := l :: !batch);
emit_traces !batch; emit_traces !batch;
Atomic.set last_wakeup (Mtime_clock.now()); Atomic.set last_wakeup (Mtime_clock.now());
Atomic.set emitting false;
true true
) else false ) else false
in in
@ -437,6 +477,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
on_trace_full wakeup; on_trace_full wakeup;
let tick() = let tick() =
if Atomic.get needs_gc_metrics then emit_gc_metrics();
if batch_timeout() then wakeup() if batch_timeout() then wakeup()
in in
@ -489,6 +530,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
if batch_timeout() then emit_all_force() if batch_timeout() then emit_all_force()
let tick () = let tick () =
if Atomic.get needs_gc_metrics then emit_gc_metrics();
if batch_timeout() then emit_all_force() if batch_timeout() then emit_all_force()
let cleanup = cleanup let cleanup = cleanup
@ -517,29 +559,42 @@ module Backend(Arg : sig val config : Config.t end)()
let last_sent_metrics = Atomic.make (Mtime_clock.now()) let last_sent_metrics = Atomic.make (Mtime_clock.now())
let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *)
let additional_metrics () : _ list = let enable_emit_gc_metrics () =
(* add exporter metrics to the lot? *) if not !enable_gc_metrics then (
let last_emit = Atomic.get last_sent_metrics in enable_gc_metrics := true;
let now = Mtime_clock.now() in (* any time the GC runs, switch this boolean *)
let add_own_metrics = let toggle_gc () = Atomic.set needs_gc_metrics true in
let elapsed = Mtime.span last_emit now in ignore (Gc.create_alarm toggle_gc : Gc.alarm);
Mtime.Span.compare elapsed timeout_sent_metrics > 0 )
in
let additional_metrics () : _ list =
(* add exporter metrics to the lot? *)
let last_emit = Atomic.get last_sent_metrics in
let now = Mtime_clock.now() in
let add_own_metrics =
let elapsed = Mtime.span last_emit now in
Mtime.Span.compare elapsed timeout_sent_metrics > 0
in
let l = AList.pop_all gc_metrics in
let l =
if add_own_metrics then ( if add_own_metrics then (
let open OT.Metrics in let open OT.Metrics in
Atomic.set last_sent_metrics now; Atomic.set last_sent_metrics now;
[make_resource_metrics [ make_resource_metrics [
sum ~name:"otel-export.dropped" ~is_monotonic:true [ sum ~name:"otel-export.dropped" ~is_monotonic:true [
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
]; ];
sum ~name:"otel-export.errors" ~is_monotonic:true [ sum ~name:"otel-export.errors" ~is_monotonic:true [
int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors);
]; ];
]] ] :: l
) else [] ) else l
in
if l <> [] then l else []
let send_metrics : Metrics.resource_metrics list sender = { let send_metrics : Metrics.resource_metrics list sender = {
send=fun m ~ret -> send=fun m ~ret ->

View file

@ -98,6 +98,9 @@ module Collector = struct
val rand_bytes_8 : unit -> bytes val rand_bytes_8 : unit -> bytes
(** Generate 16 bytes of random data *) (** Generate 16 bytes of random data *)
val enable_emit_gc_metrics : unit -> unit
(** Enable the emission of GC metrics. This sets up a GC alarm. *)
val tick : unit -> unit val tick : unit -> unit
(** Should be called regularly for background processing, (** Should be called regularly for background processing,
timeout checks, etc. *) timeout checks, etc. *)
@ -669,45 +672,50 @@ end
(** Export GC metrics. (** Export GC metrics.
These metrics are emitted after each GC collection. *) These metrics are emitted after each GC collection. *)
module GC_metrics = struct module GC_metrics : sig
val basic_setup : unit -> unit
(** Setup a hook that will emit GC statistics regularly *)
val get_metrics : unit -> Metrics.t list
(** Get a few metrics from the current state of the GC *)
end = struct
let basic_setup () =
match !Collector.backend with
| None -> ()
| Some (module C) -> C.enable_emit_gc_metrics()
let bytes_per_word = Sys.word_size / 8 let bytes_per_word = Sys.word_size / 8
let word_to_bytes n = n * bytes_per_word
let word_to_bytes_f n = n *. float bytes_per_word
(** Basic setup: a few stats *) (* TODO: use atomic *)
let basic_setup () : unit = let last = ref (Timestamp_ns.now_unix_ns())
let last = ref (Timestamp_ns.now_unix_ns()) in
let word_to_bytes n = n * bytes_per_word in
let word_to_bytes_f n = n *. float bytes_per_word in
let emit() =
let gc = Gc.quick_stat () in
let start_time_unix_nano = !last in
last := Timestamp_ns.now_unix_ns();
Metrics.(
emit
[
gauge ~name:"ocaml.gc.major_heap" ~unit_:"B"
[ int (word_to_bytes gc.Gc.heap_words) ];
sum ~name:"ocaml.gc_minor_allocated"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
~unit_:"B"
[ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ];
sum ~name:"ocaml.gc.minor_collections"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.minor_collections ];
sum ~name:"ocaml.gc.major_collections"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.major_collections ];
sum ~name:"ocaml.gc.compactions"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.compactions ];
])
in
ignore (Gc.create_alarm emit : Gc.alarm);
()
let get_metrics () : Metrics.t list =
let gc = Gc.quick_stat () in
let start_time_unix_nano = !last in
last := Timestamp_ns.now_unix_ns();
Metrics.(
[
gauge ~name:"ocaml.gc.major_heap" ~unit_:"B"
[ int (word_to_bytes gc.Gc.heap_words) ];
sum ~name:"ocaml.gc_minor_allocated"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
~unit_:"B"
[ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ];
sum ~name:"ocaml.gc.minor_collections"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.minor_collections ];
sum ~name:"ocaml.gc.major_collections"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.major_collections ];
sum ~name:"ocaml.gc.compactions"
~aggregation_temporality:Metrics.Aggregation_temporality_cumulative
~is_monotonic:true
[ int ~start_time_unix_nano gc.Gc.compactions ];
])
end end