From 3d0d031bcd27c40ec1ae57512a383c58dae5d452 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 11 Apr 2022 16:29:59 -0400 Subject: [PATCH] fix: yet another fix for emitting GC metrics now, the frontend (Opentelemetry) is responsible for signalling the backend when to emit GC stats; but the backend just samples GC metrics on the next `tick()` and pushes them in the next batch. This saves us from having to worry about re-entrancy and GC metrics being emitted during the emission of something else. --- src/client/opentelemetry_client_ocurl.ml | 84 ++++++++++-------------- src/opentelemetry.ml | 19 ++++-- 2 files changed, 47 insertions(+), 56 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 25632488..2f99a524 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -39,14 +39,15 @@ let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_ur let get_url () = !url let set_url s = url := s -let enable_gc_metrics = ref false let needs_gc_metrics = Atomic.make false let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) -let emit_gc_metrics () = - let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in +(* capture current GC metrics and push them into {!gc_metrics} for later + collection *) +let sample_gc_metrics () = Atomic.set needs_gc_metrics false; + let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in AList.add gc_metrics l let lock_ : (unit -> unit) ref = ref ignore @@ -324,6 +325,9 @@ let start_bg_thread (f: unit -> unit) : unit = in ignore (Thread.create run () : Thread.t) +let l_is_empty = function [] -> true | _::_ -> false +let batch_is_empty = List.for_all l_is_empty + (* make an emitter. exceptions inside should be caught, see @@ -333,10 +337,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let continue = ref true in - (* to ensure reentrancy, we keep track of whether we're already - emitting something *) - let emitting = Atomic.make false in - let ((module E_trace) : Trace.resource_spans list push), on_trace_full = mk_push ?batch:config.batch_traces () in let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = @@ -394,28 +394,24 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let emit_metrics ?(force=false) () : bool = - if not (Atomic.get emitting) && - (force && not (E_metrics.is_empty())) || - (not force && E_metrics.is_big_enough ()) then ( - Atomic.set emitting true; - let batch = ref [] in + if force || (not force && E_metrics.is_big_enough ()) then ( + let batch = ref [AList.pop_all gc_metrics] in E_metrics.pop_iter_all (fun l -> batch := l :: !batch); - emit_metrics !batch; + if not (batch_is_empty !batch) then ( + emit_metrics !batch; + ); Atomic.set last_wakeup (Mtime_clock.now()); - Atomic.set emitting false; true ) else false in let emit_traces ?(force=false) () : bool = - if not (Atomic.get emitting) && - (force && not (E_trace.is_empty())) || - (not force && E_trace.is_big_enough ()) then ( - Atomic.set emitting true; + if force || (not force && E_trace.is_big_enough ()) then ( let batch = ref [] in E_trace.pop_iter_all (fun l -> batch := l :: !batch); - emit_traces !batch; + if not (l_is_empty !batch) then ( + emit_traces !batch; + ); Atomic.set last_wakeup (Mtime_clock.now()); - Atomic.set emitting false; true ) else false in @@ -478,7 +474,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = on_trace_full wakeup; let tick() = - if Atomic.get needs_gc_metrics then emit_gc_metrics(); + if Atomic.get needs_gc_metrics then sample_gc_metrics(); if batch_timeout() then wakeup() in @@ -510,6 +506,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = ) else ( on_metrics_full (fun () -> + if Atomic.get needs_gc_metrics then sample_gc_metrics(); ignore (emit_metrics () : bool)); on_trace_full (fun () -> ignore (emit_traces () : bool)); @@ -531,7 +528,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = if batch_timeout() then emit_all_force() let tick () = - if Atomic.get needs_gc_metrics then emit_gc_metrics(); + if Atomic.get needs_gc_metrics then sample_gc_metrics(); if batch_timeout() then emit_all_force() let cleanup = cleanup @@ -560,15 +557,9 @@ module Backend(Arg : sig val config : Config.t end)() let last_sent_metrics = Atomic.make (Mtime_clock.now()) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) - let enable_emit_gc_metrics () = - if not !enable_gc_metrics then ( - enable_gc_metrics := true; - (* any time the GC runs, switch this boolean *) - let toggle_gc () = Atomic.set needs_gc_metrics true in - ignore (Gc.create_alarm toggle_gc : Gc.alarm); - ) + let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true - let additional_metrics () : _ list = + let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) let last_emit = Atomic.get last_sent_metrics in let now = Mtime_clock.now() in @@ -577,25 +568,20 @@ module Backend(Arg : sig val config : Config.t end)() Mtime.Span.compare elapsed timeout_sent_metrics > 0 in - let l = AList.pop_all gc_metrics in - let l = - if add_own_metrics then ( - let open OT.Metrics in - Atomic.set last_sent_metrics now; - make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel-export.errors" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ] :: l - ) else l - in - - if l <> [] then l else [] + if add_own_metrics then ( + let open OT.Metrics in + Atomic.set last_sent_metrics now; + [make_resource_metrics [ + sum ~name:"otel-export.dropped" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]] + ) else [] let send_metrics : Metrics.resource_metrics list sender = { send=fun m ~ret -> diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index e865a140..fdd29c67 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -98,8 +98,10 @@ module Collector = struct val rand_bytes_8 : unit -> bytes (** Generate 16 bytes of random data *) - val enable_emit_gc_metrics : unit -> unit - (** Enable the emission of GC metrics. This sets up a GC alarm. *) + val signal_emit_gc_metrics : unit -> unit + (** Signal the backend that it should emit GC metrics when it has the + chance. This should be installed in a GC alarm or another form + of regular trigger. *) val tick : unit -> unit (** Should be called regularly for background processing, @@ -681,9 +683,12 @@ module GC_metrics : sig end = struct let basic_setup () = - match !Collector.backend with - | None -> () - | Some (module C) -> C.enable_emit_gc_metrics() + let trigger() = + match !Collector.backend with + | None -> () + | Some (module C) -> C.signal_emit_gc_metrics() + in + ignore (Gc.create_alarm trigger : Gc.alarm) let bytes_per_word = Sys.word_size / 8 let word_to_bytes n = n * bytes_per_word @@ -705,7 +710,7 @@ end = struct ~is_monotonic:true ~unit_:"B" [ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ]; - sum ~name:"ocaml.gc.minor_collections" + sum ~name:"ocaml_gc_minor_collections" ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~is_monotonic:true [ int ~start_time_unix_nano gc.Gc.minor_collections ]; @@ -713,7 +718,7 @@ end = struct ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~is_monotonic:true [ int ~start_time_unix_nano gc.Gc.major_collections ]; - sum ~name:"ocaml.gc.compactions" + sum ~name:"ocaml_gc_compactions" ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative ~is_monotonic:true [ int ~start_time_unix_nano gc.Gc.compactions ];