From 72b0662f56b31330c0ef7124b776766e11c4a71d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 1 Dec 2025 23:44:16 -0500 Subject: [PATCH] feat: extract GC metrics sampling in `client` --- .../opentelemetry_client_cohttp_eio.ml | 60 +++---------------- .../opentelemetry_client_cohttp_lwt.ml | 42 +++---------- .../opentelemetry_client_ocurl_lwt.ml | 40 ++----------- src/client/client.ml | 1 + src/client/gc_metrics_sampling.ml | 37 ++++++++++++ src/client/gc_metrics_sampling.mli | 9 +++ src/core/AList.ml | 25 +++++--- src/core/AList.mli | 5 +- src/core/dune | 3 +- 9 files changed, 92 insertions(+), 130 deletions(-) create mode 100644 src/client/gc_metrics_sampling.ml create mode 100644 src/client/gc_metrics_sampling.mli diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 189b341e..da544800 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -19,54 +19,6 @@ let set_headers = Config.Env.set_headers let get_headers = Config.Env.get_headers -let needs_gc_metrics = Atomic.make false - -let last_gc_metrics = Atomic.make (Mtime_clock.now ()) - -let timeout_gc_metrics = Mtime.Span.(20 * s) - -(* Cross-domain, thread-safe storage for GC metrics gathered from different fibres. *) -module GC_metrics : sig - val add : Proto.Metrics.resource_metrics -> unit - - val drain : unit -> Proto.Metrics.resource_metrics list -end = struct - (* Used to prevent data races across domains *) - let mutex = Eio.Mutex.create () - - let gc_metrics = ref [] - - let add m = - Eio.Mutex.use_rw ~protect:true mutex (fun () -> - gc_metrics := m :: !gc_metrics) - - let drain () = - Eio.Mutex.use_rw ~protect:true mutex (fun () -> - let metrics = !gc_metrics in - gc_metrics := []; - metrics) -end - -(* capture current GC metrics if {!needs_gc_metrics} is true, - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) -let sample_gc_metrics_if_needed () = - let now = Mtime_clock.now () in - let alarm = Atomic.compare_and_set needs_gc_metrics true false in - let timeout () = - let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in - Mtime.Span.compare elapsed timeout_gc_metrics > 0 - in - if alarm || timeout () then ( - Atomic.set last_gc_metrics now; - let l = - OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics () - in - GC_metrics.add l - ) - type error = [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string @@ -276,7 +228,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = let push_metrics x = let@ () = guard_exn_ "push metrics" in - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); push_to_batch batch_metrics x let push_logs x = @@ -293,7 +245,9 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = let emit_metrics_maybe = maybe_emit batch_metrics config.url_metrics (fun collected_metrics -> - let gc_metrics = GC_metrics.drain () in + let gc_metrics = + Opentelemetry_client.Gc_metrics_sampling.pop_gc_metrics () + in gc_metrics @ collected_metrics |> Signal.Encode.metrics) let emit_logs_maybe = @@ -330,7 +284,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = if Config.Env.get_debug () then Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int); run_tick_callbacks (); - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); emit_all ~force:false let cleanup ~on_done () = @@ -338,7 +292,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = Printf.eprintf "opentelemetry: exiting…\n%!"; Atomic.set stop true; run_tick_callbacks (); - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); emit_all ~force:true; on_done () end in @@ -370,7 +324,7 @@ module Backend (Emitter : EMITTER) : Opentelemetry.Collector.BACKEND = struct let signal_emit_gc_metrics () = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true + Opentelemetry_client.Gc_metrics_sampling.signal_we_need_gc_metrics () let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 4cb8f22e..0fdc845b 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -18,35 +18,6 @@ external reraise : exn -> 'a = "%reraise" (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to use Lwt's latest version *) -let needs_gc_metrics = Atomic.make false - -let last_gc_metrics = Atomic.make (Mtime_clock.now ()) - -let timeout_gc_metrics = Mtime.Span.(20 * s) - -let gc_metrics = ref [] -(* side channel for GC, appended to {!E_metrics}'s data *) - -(* capture current GC metrics if {!needs_gc_metrics} is true, - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) -let sample_gc_metrics_if_needed () = - let now = Mtime_clock.now () in - let alarm = Atomic.compare_and_set needs_gc_metrics true false in - let timeout () = - let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in - Mtime.Span.compare elapsed timeout_gc_metrics > 0 - in - if alarm || timeout () then ( - Atomic.set last_gc_metrics now; - let l = - OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics () - in - gc_metrics := l :: !gc_metrics - ) - type error = [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string @@ -245,8 +216,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = match Batch.pop_if_ready ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> - let batch = !gc_metrics @ l in - gc_metrics := []; + let batch = + List.rev_append + (Opentelemetry_client.Gc_metrics_sampling.pop_gc_metrics ()) + l + in let+ () = send_metrics_http httpc batch in true @@ -315,7 +289,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_metrics e = let@ () = guard_exn_ "push metrics" in - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); push_to_batch batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> @@ -335,7 +309,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let tick_ () = if Config.Env.get_debug () then Printf.eprintf "tick (from %d)\n%!" (tid ()); - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); List.iter (fun f -> try f () @@ -398,7 +372,7 @@ module Backend let signal_emit_gc_metrics () = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true + Opentelemetry_client.Gc_metrics_sampling.signal_we_need_gc_metrics () let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 8502db39..bafa5948 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -18,35 +18,6 @@ external reraise : exn -> 'a = "%reraise" (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to use Lwt's latest version *) -let needs_gc_metrics = Atomic.make false - -let last_gc_metrics = Atomic.make (Mtime_clock.now ()) - -let timeout_gc_metrics = Mtime.Span.(20 * s) - -let gc_metrics = ref [] -(* side channel for GC, appended to {!E_metrics}'s data *) - -(* capture current GC metrics if {!needs_gc_metrics} is true, - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) -let sample_gc_metrics_if_needed () = - let now = Mtime_clock.now () in - let alarm = Atomic.compare_and_set needs_gc_metrics true false in - let timeout () = - let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in - Mtime.Span.compare elapsed timeout_gc_metrics > 0 - in - if alarm || timeout () then ( - Atomic.set last_gc_metrics now; - let l = - OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics () - in - gc_metrics := l :: !gc_metrics - ) - type error = [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string @@ -231,8 +202,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = match Batch.pop_if_ready ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> - let batch = !gc_metrics @ l in - gc_metrics := []; + let batch = + Opentelemetry_client.Gc_metrics_sampling.pop_gc_metrics () @ l + in let+ () = send_metrics_http httpc batch in true @@ -301,7 +273,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_metrics e = let@ () = guard_exn_ "push metrics" in - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); push_to_batch batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> @@ -321,7 +293,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let tick_ () = if Config.Env.get_debug () then Printf.eprintf "tick (from %d)\n%!" (tid ()); - sample_gc_metrics_if_needed (); + Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed (); List.iter (fun f -> try f () @@ -384,7 +356,7 @@ module Backend let signal_emit_gc_metrics () = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true + Opentelemetry_client.Gc_metrics_sampling.signal_we_need_gc_metrics () let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) diff --git a/src/client/client.ml b/src/client/client.ml index fa69c983..9765c585 100644 --- a/src/client/client.ml +++ b/src/client/client.ml @@ -6,3 +6,4 @@ module Config = Config module Signal = Signal module Self_trace = Self_trace +module Gc_metrics_sampling = Gc_metrics_sampling diff --git a/src/client/gc_metrics_sampling.ml b/src/client/gc_metrics_sampling.ml new file mode 100644 index 00000000..d0fc3a53 --- /dev/null +++ b/src/client/gc_metrics_sampling.ml @@ -0,0 +1,37 @@ +module OT = Opentelemetry +module AList = Opentelemetry.AList + +let needs_gc_metrics = Atomic.make false + +let last_gc_metrics = Atomic.make (Mtime_clock.now ()) + +let timeout_gc_metrics = Mtime.Span.(20 * s) + +let gc_metrics = AList.make () + +let[@inline] signal_we_need_gc_metrics () = Atomic.set needs_gc_metrics true + +let[@inline] pop_gc_metrics () = AList.pop_all gc_metrics + +(* capture current GC metrics if {!needs_gc_metrics} is true, + or it has been a long time since the last GC metrics collection, + and push them into {!gc_metrics} for later collection *) +let sample_gc_metrics_if_needed () = + let now = lazy (Mtime_clock.now ()) in + let needs_gc_metrics_true = Atomic.exchange needs_gc_metrics false in + + let[@inline] timeout () = + let (lazy now) = now in + let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in + Mtime.Span.compare elapsed timeout_gc_metrics > 0 + in + if needs_gc_metrics_true || timeout () then ( + let (lazy now) = now in + Atomic.set last_gc_metrics now; + let l = + OT.Metrics.make_resource_metrics + ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) + @@ Opentelemetry.GC_metrics.get_metrics () + in + AList.add gc_metrics l + ) diff --git a/src/client/gc_metrics_sampling.mli b/src/client/gc_metrics_sampling.mli new file mode 100644 index 00000000..e356de1d --- /dev/null +++ b/src/client/gc_metrics_sampling.mli @@ -0,0 +1,9 @@ +val signal_we_need_gc_metrics : unit -> unit +(** External trigger to force the emission of GC metrics. Reentrant. *) + +val pop_gc_metrics : unit -> Opentelemetry_proto.Metrics.resource_metrics list + +val sample_gc_metrics_if_needed : unit -> unit +(** Make sure we sample GC metrics if needed (timeout, or signaled via + {!signal_we_need_gc_metrics}) *) + diff --git a/src/core/AList.ml b/src/core/AList.ml index 356f2630..72a7acfc 100644 --- a/src/core/AList.ml +++ b/src/core/AList.ml @@ -1,3 +1,4 @@ +module Dom = Opentelemetry_domain module Atomic = Opentelemetry_atomic.Atomic type 'a t = 'a list Atomic.t @@ -12,17 +13,27 @@ let[@inline] is_empty self : bool = let get = Atomic.get let add self x = + let backoff = ref 1 in while let old = Atomic.get self in let l' = x :: old in not (Atomic.compare_and_set self old l') do - () + (* backoff *) + Dom.relax_loop !backoff; + backoff := min 128 (2 * !backoff) done -let rec pop_all self = - let l = Atomic.get self in - if Atomic.compare_and_set self l [] then - l - else - pop_all self +let pop_all (type res) self : res list = + let exception Return of res list in + let backoff = ref 1 in + try + while true do + let l = Atomic.get self in + if Atomic.compare_and_set self l [] then raise_notrace (Return l); + + (* backoff *) + Dom.relax_loop !backoff; + backoff := min 128 (2 * !backoff) + done + with Return r -> r diff --git a/src/core/AList.mli b/src/core/AList.mli index 832e3c2e..6e12d4a0 100644 --- a/src/core/AList.mli +++ b/src/core/AList.mli @@ -1,4 +1,7 @@ -(** Atomic list *) +(** Atomic list. + + Note that we add at the front, so [add x 1; add x 2; pop_all x] will return + [2;1]. *) type 'a t diff --git a/src/core/dune b/src/core/dune index 248e3aff..0f2ca7a3 100644 --- a/src/core/dune +++ b/src/core/dune @@ -5,10 +5,11 @@ (libraries opentelemetry.proto opentelemetry.ambient-context + opentelemetry.atomic + opentelemetry.domain ptime ptime.clock.os pbrt threads - opentelemetry.atomic hmap) (public_name opentelemetry))