diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 815f45a2..fd3793b8 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -10,6 +10,26 @@ module OT = Opentelemetry open Opentelemetry module Atomic = Opentelemetry_atomic.Atomic +(** Atomic list *) +module AList : sig + type 'a t + val make : unit -> 'a t + val add : 'a t -> 'a -> unit + val pop_all : 'a t -> 'a list +end = struct + type 'a t = 'a list Atomic.t + let make () = Atomic.make [] + let add self x = + while + let old = Atomic.get self in + let l' = x :: old in + not (Atomic.compare_and_set self old l') + do () done + let rec pop_all self = + let l = Atomic.get self in + if Atomic.compare_and_set self l [] then l else pop_all self +end + let[@inline] (let@) f x = f x let debug_ = ref (try bool_of_string @@ Sys.getenv "DEBUG" with _ -> false) @@ -19,6 +39,15 @@ let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_ur let get_url () = !url let set_url s = url := s +let enable_gc_metrics = ref false +let needs_gc_metrics = Atomic.make false + +let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) + +let emit_gc_metrics () = + let l = OT.Metrics.make_resource_metrics @@ Opentelemetry.GC_metrics.get_metrics() in + AList.add gc_metrics l + let lock_ : (unit -> unit) ref = ref ignore let unlock_ : (unit -> unit) ref = ref ignore let set_mutex ~lock ~unlock : unit = @@ -303,6 +332,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let continue = ref true in + (* to ensure reentrancy, we keep track of whether we're already + emitting something *) + let emitting = Atomic.make false in + let ((module E_trace) : Trace.resource_spans list push), on_trace_full = mk_push ?batch:config.batch_traces () in let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = @@ -320,9 +353,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = (Metrics_service.default_export_metrics_service_request ~resource_metrics ()) encoder; + let data = Pbrt.Encoder.to_string encoder in begin match C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) + data with | Ok () -> () | Error err -> @@ -359,22 +393,28 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let emit_metrics ?(force=false) () : bool = - if (force && not (E_metrics.is_empty())) || + if not (Atomic.get emitting) && + (force && not (E_metrics.is_empty())) || (not force && E_metrics.is_big_enough ()) then ( + Atomic.set emitting true; let batch = ref [] in E_metrics.pop_iter_all (fun l -> batch := l :: !batch); emit_metrics !batch; Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set emitting false; true ) else false in let emit_traces ?(force=false) () : bool = - if (force && not (E_trace.is_empty())) || + if not (Atomic.get emitting) && + (force && not (E_trace.is_empty())) || (not force && E_trace.is_big_enough ()) then ( + Atomic.set emitting true; let batch = ref [] in E_trace.pop_iter_all (fun l -> batch := l :: !batch); emit_traces !batch; Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set emitting false; true ) else false in @@ -437,6 +477,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = on_trace_full wakeup; let tick() = + if Atomic.get needs_gc_metrics then emit_gc_metrics(); if batch_timeout() then wakeup() in @@ -489,6 +530,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = if batch_timeout() then emit_all_force() let tick () = + if Atomic.get needs_gc_metrics then emit_gc_metrics(); if batch_timeout() then emit_all_force() let cleanup = cleanup @@ -517,29 +559,42 @@ module Backend(Arg : sig val config : Config.t end)() let last_sent_metrics = Atomic.make (Mtime_clock.now()) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) - let additional_metrics () : _ list = - (* add exporter metrics to the lot? *) - let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now() in - let add_own_metrics = - let elapsed = Mtime.span last_emit now in - Mtime.Span.compare elapsed timeout_sent_metrics > 0 - in + let enable_emit_gc_metrics () = + if not !enable_gc_metrics then ( + enable_gc_metrics := true; + (* any time the GC runs, switch this boolean *) + let toggle_gc () = Atomic.set needs_gc_metrics true in + ignore (Gc.create_alarm toggle_gc : Gc.alarm); + ) + let additional_metrics () : _ list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now() in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + let l = AList.pop_all gc_metrics in + let l = if add_own_metrics then ( let open OT.Metrics in Atomic.set last_sent_metrics now; - [make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel-export.errors" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]] - ) else [] + make_resource_metrics [ + sum ~name:"otel-export.dropped" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ] :: l + ) else l + in + + if l <> [] then l else [] let send_metrics : Metrics.resource_metrics list sender = { send=fun m ~ret -> diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 67b72871..e865a140 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -98,6 +98,9 @@ module Collector = struct val rand_bytes_8 : unit -> bytes (** Generate 16 bytes of random data *) + val enable_emit_gc_metrics : unit -> unit + (** Enable the emission of GC metrics. This sets up a GC alarm. *) + val tick : unit -> unit (** Should be called regularly for background processing, timeout checks, etc. *) @@ -669,45 +672,50 @@ end (** Export GC metrics. These metrics are emitted after each GC collection. *) -module GC_metrics = struct +module GC_metrics : sig + val basic_setup : unit -> unit + (** Setup a hook that will emit GC statistics regularly *) + + val get_metrics : unit -> Metrics.t list + (** Get a few metrics from the current state of the GC *) +end = struct + + let basic_setup () = + match !Collector.backend with + | None -> () + | Some (module C) -> C.enable_emit_gc_metrics() + let bytes_per_word = Sys.word_size / 8 + let word_to_bytes n = n * bytes_per_word + let word_to_bytes_f n = n *. float bytes_per_word - (** Basic setup: a few stats *) - let basic_setup () : unit = - let last = ref (Timestamp_ns.now_unix_ns()) in - - let word_to_bytes n = n * bytes_per_word in - let word_to_bytes_f n = n *. float bytes_per_word in - - let emit() = - let gc = Gc.quick_stat () in - let start_time_unix_nano = !last in - last := Timestamp_ns.now_unix_ns(); - Metrics.( - emit - [ - gauge ~name:"ocaml.gc.major_heap" ~unit_:"B" - [ int (word_to_bytes gc.Gc.heap_words) ]; - sum ~name:"ocaml.gc_minor_allocated" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - ~unit_:"B" - [ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ]; - sum ~name:"ocaml.gc.minor_collections" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~start_time_unix_nano gc.Gc.minor_collections ]; - sum ~name:"ocaml.gc.major_collections" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~start_time_unix_nano gc.Gc.major_collections ]; - sum ~name:"ocaml.gc.compactions" - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~start_time_unix_nano gc.Gc.compactions ]; - ]) - in - ignore (Gc.create_alarm emit : Gc.alarm); - () + (* TODO: use atomic *) + let last = ref (Timestamp_ns.now_unix_ns()) + let get_metrics () : Metrics.t list = + let gc = Gc.quick_stat () in + let start_time_unix_nano = !last in + last := Timestamp_ns.now_unix_ns(); + Metrics.( + [ + gauge ~name:"ocaml.gc.major_heap" ~unit_:"B" + [ int (word_to_bytes gc.Gc.heap_words) ]; + sum ~name:"ocaml.gc_minor_allocated" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + ~unit_:"B" + [ float ~start_time_unix_nano (word_to_bytes_f gc.Gc.minor_words) ]; + sum ~name:"ocaml.gc.minor_collections" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~start_time_unix_nano gc.Gc.minor_collections ]; + sum ~name:"ocaml.gc.major_collections" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~start_time_unix_nano gc.Gc.major_collections ]; + sum ~name:"ocaml.gc.compactions" + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~start_time_unix_nano gc.Gc.compactions ]; + ]) end