From 5180be31bc6efa92202bd60ef411890868178f5f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 11 Apr 2022 13:09:13 -0400 Subject: [PATCH] feat(collector): move GC metrics logic to the collector this helps avoiding reentrancy issue, where the collector allocates, triggers the GC metrics alarm, which tries to push new metrics in the middle of a batch (currently leading to a potential deadlock). --- src/client/opentelemetry_client_ocurl.ml | 99 ++++++++++++++++++------ src/opentelemetry.ml | 84 +++++++++++--------- 2 files changed, 123 insertions(+), 60 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 815f45a2..fd3793b8 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -10,6 +10,26 @@ module OT = Opentelemetry open Opentelemetry module Atomic = Opentelemetry_atomic.Atomic +(** Atomic list *) +module AList : sig + type 'a t + val make : unit -> 'a t + val add : 'a t -> 'a -> unit + val pop_all : 'a t -> 'a list +end = struct + type 'a t = 'a list Atomic.t + let make () = Atomic.make [] + let add self x = + while + let old = Atomic.get self in + let l' = x :: old in + not (Atomic.compare_and_set self old l') + do () done + let rec pop_all self = + let l = Atomic.get self in + if Atomic.compare_and_set self l [] then l else pop_all self +end + let[@inline] (let@) f x = f x let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false) @@ -19,6 +39,15 @@ let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_ur let get_url () = !url let set_url s = url := s +let enable_gc_metrics = ref false +let needs_gc_metrics = Atomic.make false + +let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) + +let emit_gc_metrics () = + let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in + AList.add gc_metrics l + let lock_ : (unit -> unit) ref = ref ignore let unlock_ : (unit -> unit) ref = ref ignore let set_mutex ~lock ~unlock : unit = @@ -303,6 +332,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let continue = ref true in + (* to ensure reentrancy, we keep track of whether we're already + emitting something *) + let emitting = Atomic.make false in + let ((module E_trace) : Trace.resource_spans list push), on_trace_full = mk_push ?batch:config.batch_traces () in let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = @@ -320,9 +353,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = (Metrics_service.default_export_metrics_service_request ~resource_metrics ()) encoder; + let data = Pbrt.Encoder.to_string encoder in begin match C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) + data with | Ok () -> () | Error err -> @@ -359,22 +393,28 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let emit_metrics ?(force=false) () : bool = - if (force && not (E_metrics.is_empty())) || + if not (Atomic.get emitting) && + (force && not (E_metrics.is_empty())) || (not force && E_metrics.is_big_enough ()) then ( + Atomic.set emitting true; let batch = ref [] in E_metrics.pop_iter_all (fun l -> batch := l :: !batch); emit_metrics !batch; Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set emitting false; true ) else false in let emit_traces ?(force=false) () : bool = - if (force && not (E_trace.is_empty())) || + if not (Atomic.get emitting) && + (force && not (E_trace.is_empty())) || (not force && E_trace.is_big_enough ()) then ( + Atomic.set emitting true; let batch = ref [] in E_trace.pop_iter_all (fun l -> batch := l :: !batch); emit_traces !batch; Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set emitting false; true ) else false in @@ -437,6 +477,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = on_trace_full wakeup; let tick() = + if Atomic.get needs_gc_metrics then emit_gc_metrics(); if batch_timeout() then wakeup() in @@ -489,6 +530,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = if batch_timeout() then emit_all_force() let tick () = + if Atomic.get needs_gc_metrics then emit_gc_metrics(); if batch_timeout() then emit_all_force() let cleanup = cleanup @@ -517,29 +559,42 @@ module Backend(Arg : sig val config : Config.t end)() let last_sent_metrics = Atomic.make (Mtime_clock.now()) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) - let additional_metrics () : _ list = - (* add exporter metrics to the lot? *) - let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now() in - let add_own_metrics = - let elapsed = Mtime.span last_emit now in - Mtime.Span.compare elapsed timeout_sent_metrics > 0 - in + let enable_emit_gc_metrics () = + if not !enable_gc_metrics then ( + enable_gc_metrics := true; + (* any time the GC runs, switch this boolean *) + let toggle_gc () = Atomic.set needs_gc_metrics true in + ignore (Gc.create_alarm toggle_gc : Gc.alarm); + ) + let additional_metrics () : _ list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now() in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + let l = AList.pop_all gc_metrics in + let l = if add_own_metrics then ( let open OT.Metrics in Atomic.set last_sent_metrics now; - [make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel-export.errors" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]] - ) else [] + make_resource_metrics [ + sum ~name:"otel-export.dropped" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ] :: l + ) else l + in + + if l <> [] then l else [] let send_metrics : Metrics.resource_metrics list sender = { send=fun m ~ret -> diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 67b72871..e865a140 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -98,6 +98,9 @@ module Collector = struct val rand_bytes_8 : unit -> bytes (** Generate 16 bytes of random data *) + val enable_emit_gc_metrics : unit -> unit + (** Enable the emission of GC metrics. This sets up a GC alarm. *) + val tick : unit -> unit (** Should be called regularly for background processing, timeout checks, etc. *) @@ -669,45 +672,50 @@ end (** Export GC metrics. These metrics are emitted after each GC collection. *) -module GC_metrics = struct +module GC_metrics : sig + val basic_setup : unit -> unit + (** Setup a hook that will emit GC statistics regularly *) + + val get_metrics : unit -> Metrics.t list + (** Get a few metrics from the current state of the GC *) +end = struct + + let basic_setup () = + match !Collector.backend with + | None -> () + | Some (module C) -> C.enable_emit_gc_metrics() + let bytes_per_word = Sys.word_size / 8 + let word_to_bytes n = n * bytes_per_word + let word_to_bytes_f n = n *. float bytes_per_word - (** Basic setup: a few stats *) - let basic_setup () : unit = - let last = ref (Timestamp_ns.now_unix_ns()) in - - let word_to_bytes n = n * bytes_per_word in - let word_to_bytes_f n = n *. float bytes_per_word in - - let emit() = - let gc = Gc.quick_stat () in - let start_time_unix_nano = !last in - last := Timestamp_ns.now_unix_ns(); - Metrics.( - emit - [ - gauge ~name:"ocaml.gc.major_heap" ~unit_:"B" - [ int (word_to_bytes gc.Gc.heap_words) ]; - sum ~name:"ocaml.gc_minor_allocated" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - ~unit_:"B" - [ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ]; - sum ~name:"ocaml.gc.minor_collections" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~start_time_unix_nano gc.Gc.minor_collections ]; - sum ~name:"ocaml.gc.major_collections" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~start_time_unix_nano gc.Gc.major_collections ]; - sum ~name:"ocaml.gc.compactions" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~start_time_unix_nano gc.Gc.compactions ]; - ]) - in - ignore (Gc.create_alarm emit : Gc.alarm); - () + (* TODO: use atomic *) + let last = ref (Timestamp_ns.now_unix_ns()) + let get_metrics () : Metrics.t list = + let gc = Gc.quick_stat () in + let start_time_unix_nano = !last in + last := Timestamp_ns.now_unix_ns(); + Metrics.( + [ + gauge ~name:"ocaml.gc.major_heap" ~unit_:"B" + [ int (word_to_bytes gc.Gc.heap_words) ]; + sum ~name:"ocaml.gc_minor_allocated" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + ~unit_:"B" + [ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ]; + sum ~name:"ocaml.gc.minor_collections" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~start_time_unix_nano gc.Gc.minor_collections ]; + sum ~name:"ocaml.gc.major_collections" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~start_time_unix_nano gc.Gc.major_collections ]; + sum ~name:"ocaml.gc.compactions" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~start_time_unix_nano gc.Gc.compactions ]; + ]) end