feat: extract GC metrics sampling in client

This commit is contained in:
Simon Cruanes 2025-12-01 23:44:16 -05:00
parent db2b2b8a31
commit 72b0662f56
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
9 changed files with 92 additions and 130 deletions

View file

@ -19,54 +19,6 @@ let set_headers = Config.Env.set_headers
let get_headers = Config.Env.get_headers let get_headers = Config.Env.get_headers
let needs_gc_metrics = Atomic.make false
let last_gc_metrics = Atomic.make (Mtime_clock.now ())
let timeout_gc_metrics = Mtime.Span.(20 * s)
(* Cross-domain, thread-safe storage for GC metrics gathered from different fibres. *)
module GC_metrics : sig
val add : Proto.Metrics.resource_metrics -> unit
val drain : unit -> Proto.Metrics.resource_metrics list
end = struct
(* Used to prevent data races across domains *)
let mutex = Eio.Mutex.create ()
let gc_metrics = ref []
let add m =
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
gc_metrics := m :: !gc_metrics)
let drain () =
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
let metrics = !gc_metrics in
gc_metrics := [];
metrics)
end
(* capture current GC metrics if {!needs_gc_metrics} is true,
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
let sample_gc_metrics_if_needed () =
let now = Mtime_clock.now () in
let alarm = Atomic.compare_and_set needs_gc_metrics true false in
let timeout () =
let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in
Mtime.Span.compare elapsed timeout_gc_metrics > 0
in
if alarm || timeout () then (
Atomic.set last_gc_metrics now;
let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics ()
in
GC_metrics.add l
)
type error = type error =
[ `Status of int * Opentelemetry.Proto.Status.status [ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string | `Failure of string
@ -276,7 +228,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
let push_metrics x = let push_metrics x =
let@ () = guard_exn_ "push metrics" in let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
push_to_batch batch_metrics x push_to_batch batch_metrics x
let push_logs x = let push_logs x =
@ -293,7 +245,9 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
let emit_metrics_maybe = let emit_metrics_maybe =
maybe_emit batch_metrics config.url_metrics (fun collected_metrics -> maybe_emit batch_metrics config.url_metrics (fun collected_metrics ->
let gc_metrics = GC_metrics.drain () in let gc_metrics =
Opentelemetry_client.Gc_metrics_sampling.pop_gc_metrics ()
in
gc_metrics @ collected_metrics |> Signal.Encode.metrics) gc_metrics @ collected_metrics |> Signal.Encode.metrics)
let emit_logs_maybe = let emit_logs_maybe =
@ -330,7 +284,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int); Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int);
run_tick_callbacks (); run_tick_callbacks ();
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
emit_all ~force:false emit_all ~force:false
let cleanup ~on_done () = let cleanup ~on_done () =
@ -338,7 +292,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
Printf.eprintf "opentelemetry: exiting…\n%!"; Printf.eprintf "opentelemetry: exiting…\n%!";
Atomic.set stop true; Atomic.set stop true;
run_tick_callbacks (); run_tick_callbacks ();
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
emit_all ~force:true; emit_all ~force:true;
on_done () on_done ()
end in end in
@ -370,7 +324,7 @@ module Backend (Emitter : EMITTER) : Opentelemetry.Collector.BACKEND = struct
let signal_emit_gc_metrics () = let signal_emit_gc_metrics () =
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true Opentelemetry_client.Gc_metrics_sampling.signal_we_need_gc_metrics ()
let additional_metrics () : Metrics.resource_metrics list = let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *) (* add exporter metrics to the lot? *)

View file

@ -18,35 +18,6 @@ external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
use Lwt's latest version *) use Lwt's latest version *)
let needs_gc_metrics = Atomic.make false
let last_gc_metrics = Atomic.make (Mtime_clock.now ())
let timeout_gc_metrics = Mtime.Span.(20 * s)
let gc_metrics = ref []
(* side channel for GC, appended to {!E_metrics}'s data *)
(* capture current GC metrics if {!needs_gc_metrics} is true,
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
let sample_gc_metrics_if_needed () =
let now = Mtime_clock.now () in
let alarm = Atomic.compare_and_set needs_gc_metrics true false in
let timeout () =
let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in
Mtime.Span.compare elapsed timeout_gc_metrics > 0
in
if alarm || timeout () then (
Atomic.set last_gc_metrics now;
let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics ()
in
gc_metrics := l :: !gc_metrics
)
type error = type error =
[ `Status of int * Opentelemetry.Proto.Status.status [ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string | `Failure of string
@ -245,8 +216,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
match Batch.pop_if_ready ?force ~now batch_metrics with match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> Lwt.return false | None -> Lwt.return false
| Some l -> | Some l ->
let batch = !gc_metrics @ l in let batch =
gc_metrics := []; List.rev_append
(Opentelemetry_client.Gc_metrics_sampling.pop_gc_metrics ())
l
in
let+ () = send_metrics_http httpc batch in let+ () = send_metrics_http httpc batch in
true true
@ -315,7 +289,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let push_metrics e = let push_metrics e =
let@ () = guard_exn_ "push metrics" in let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
push_to_batch batch_metrics e; push_to_batch batch_metrics e;
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
Lwt.async (fun () -> Lwt.async (fun () ->
@ -335,7 +309,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let tick_ () = let tick_ () =
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "tick (from %d)\n%!" (tid ()); Printf.eprintf "tick (from %d)\n%!" (tid ());
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
List.iter List.iter
(fun f -> (fun f ->
try f () try f ()
@ -398,7 +372,7 @@ module Backend
let signal_emit_gc_metrics () = let signal_emit_gc_metrics () =
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true Opentelemetry_client.Gc_metrics_sampling.signal_we_need_gc_metrics ()
let additional_metrics () : Metrics.resource_metrics list = let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *) (* add exporter metrics to the lot? *)

View file

@ -18,35 +18,6 @@ external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
use Lwt's latest version *) use Lwt's latest version *)
let needs_gc_metrics = Atomic.make false
let last_gc_metrics = Atomic.make (Mtime_clock.now ())
let timeout_gc_metrics = Mtime.Span.(20 * s)
let gc_metrics = ref []
(* side channel for GC, appended to {!E_metrics}'s data *)
(* capture current GC metrics if {!needs_gc_metrics} is true,
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
let sample_gc_metrics_if_needed () =
let now = Mtime_clock.now () in
let alarm = Atomic.compare_and_set needs_gc_metrics true false in
let timeout () =
let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in
Mtime.Span.compare elapsed timeout_gc_metrics > 0
in
if alarm || timeout () then (
Atomic.set last_gc_metrics now;
let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics ()
in
gc_metrics := l :: !gc_metrics
)
type error = type error =
[ `Status of int * Opentelemetry.Proto.Status.status [ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string | `Failure of string
@ -231,8 +202,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
match Batch.pop_if_ready ?force ~now batch_metrics with match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> Lwt.return false | None -> Lwt.return false
| Some l -> | Some l ->
let batch = !gc_metrics @ l in let batch =
gc_metrics := []; Opentelemetry_client.Gc_metrics_sampling.pop_gc_metrics () @ l
in
let+ () = send_metrics_http httpc batch in let+ () = send_metrics_http httpc batch in
true true
@ -301,7 +273,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let push_metrics e = let push_metrics e =
let@ () = guard_exn_ "push metrics" in let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
push_to_batch batch_metrics e; push_to_batch batch_metrics e;
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
Lwt.async (fun () -> Lwt.async (fun () ->
@ -321,7 +293,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let tick_ () = let tick_ () =
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "tick (from %d)\n%!" (tid ()); Printf.eprintf "tick (from %d)\n%!" (tid ());
sample_gc_metrics_if_needed (); Opentelemetry_client.Gc_metrics_sampling.sample_gc_metrics_if_needed ();
List.iter List.iter
(fun f -> (fun f ->
try f () try f ()
@ -384,7 +356,7 @@ module Backend
let signal_emit_gc_metrics () = let signal_emit_gc_metrics () =
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true Opentelemetry_client.Gc_metrics_sampling.signal_we_need_gc_metrics ()
let additional_metrics () : Metrics.resource_metrics list = let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *) (* add exporter metrics to the lot? *)

View file

@ -6,3 +6,4 @@
module Config = Config module Config = Config
module Signal = Signal module Signal = Signal
module Self_trace = Self_trace module Self_trace = Self_trace
module Gc_metrics_sampling = Gc_metrics_sampling

View file

@ -0,0 +1,37 @@
module OT = Opentelemetry
module AList = Opentelemetry.AList
let needs_gc_metrics = Atomic.make false
let last_gc_metrics = Atomic.make (Mtime_clock.now ())
let timeout_gc_metrics = Mtime.Span.(20 * s)
let gc_metrics = AList.make ()
let[@inline] signal_we_need_gc_metrics () = Atomic.set needs_gc_metrics true
let[@inline] pop_gc_metrics () = AList.pop_all gc_metrics
(* capture current GC metrics if {!needs_gc_metrics} is true,
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
let sample_gc_metrics_if_needed () =
let now = lazy (Mtime_clock.now ()) in
let needs_gc_metrics_true = Atomic.exchange needs_gc_metrics false in
let[@inline] timeout () =
let (lazy now) = now in
let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in
Mtime.Span.compare elapsed timeout_gc_metrics > 0
in
if needs_gc_metrics_true || timeout () then (
let (lazy now) = now in
Atomic.set last_gc_metrics now;
let l =
OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics ()
in
AList.add gc_metrics l
)

View file

@ -0,0 +1,9 @@
val signal_we_need_gc_metrics : unit -> unit
(** External trigger to force the emission of GC metrics. Reentrant. *)
val pop_gc_metrics : unit -> Opentelemetry_proto.Metrics.resource_metrics list
val sample_gc_metrics_if_needed : unit -> unit
(** Make sure we sample GC metrics if needed (timeout, or signaled via
{!signal_we_need_gc_metrics}) *)

View file

@ -1,3 +1,4 @@
module Dom = Opentelemetry_domain
module Atomic = Opentelemetry_atomic.Atomic module Atomic = Opentelemetry_atomic.Atomic
type 'a t = 'a list Atomic.t type 'a t = 'a list Atomic.t
@ -12,17 +13,27 @@ let[@inline] is_empty self : bool =
let get = Atomic.get let get = Atomic.get
let add self x = let add self x =
let backoff = ref 1 in
while while
let old = Atomic.get self in let old = Atomic.get self in
let l' = x :: old in let l' = x :: old in
not (Atomic.compare_and_set self old l') not (Atomic.compare_and_set self old l')
do do
() (* backoff *)
Dom.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done done
let rec pop_all self = let pop_all (type res) self : res list =
let exception Return of res list in
let backoff = ref 1 in
try
while true do
let l = Atomic.get self in let l = Atomic.get self in
if Atomic.compare_and_set self l [] then if Atomic.compare_and_set self l [] then raise_notrace (Return l);
l
else (* backoff *)
pop_all self Dom.relax_loop !backoff;
backoff := min 128 (2 * !backoff)
done
with Return r -> r

View file

@ -1,4 +1,7 @@
(** Atomic list *) (** Atomic list.
Note that we add at the front, so [add x 1; add x 2; pop_all x] will return
[2;1]. *)
type 'a t type 'a t

View file

@ -5,10 +5,11 @@
(libraries (libraries
opentelemetry.proto opentelemetry.proto
opentelemetry.ambient-context opentelemetry.ambient-context
opentelemetry.atomic
opentelemetry.domain
ptime ptime
ptime.clock.os ptime.clock.os
pbrt pbrt
threads threads
opentelemetry.atomic
hmap) hmap)
(public_name opentelemetry)) (public_name opentelemetry))