mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-09 04:17:56 -04:00
feat OTEL: move some stuff to client or util; rate limit GC metrics
This commit is contained in:
parent
3f98d0c484
commit
6b6fb34342
13 changed files with 44 additions and 149 deletions
|
|
@ -1,28 +0,0 @@
|
||||||
module Atomic = Opentelemetry_atomic.Atomic
|
|
||||||
|
|
||||||
type 'a t = 'a list Atomic.t
|
|
||||||
|
|
||||||
let make () = Atomic.make []
|
|
||||||
|
|
||||||
let[@inline] is_empty self : bool =
|
|
||||||
match Atomic.get self with
|
|
||||||
| [] -> true
|
|
||||||
| _ :: _ -> false
|
|
||||||
|
|
||||||
let get = Atomic.get
|
|
||||||
|
|
||||||
let add self x =
|
|
||||||
while
|
|
||||||
let old = Atomic.get self in
|
|
||||||
let l' = x :: old in
|
|
||||||
not (Atomic.compare_and_set self old l')
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done
|
|
||||||
|
|
||||||
let rec pop_all self =
|
|
||||||
let l = Atomic.get self in
|
|
||||||
if Atomic.compare_and_set self l [] then
|
|
||||||
l
|
|
||||||
else
|
|
||||||
pop_all self
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
(** Atomic list *)
|
|
||||||
|
|
||||||
type 'a t
|
|
||||||
|
|
||||||
val get : 'a t -> 'a list
|
|
||||||
(** Snapshot *)
|
|
||||||
|
|
||||||
val is_empty : _ t -> bool
|
|
||||||
|
|
||||||
val make : unit -> 'a t
|
|
||||||
|
|
||||||
val add : 'a t -> 'a -> unit
|
|
||||||
|
|
||||||
val pop_all : 'a t -> 'a list
|
|
||||||
|
|
@ -1,9 +1,10 @@
|
||||||
(library
|
(library
|
||||||
(name opentelemetry)
|
(name opentelemetry)
|
||||||
(synopsis "API for opentelemetry instrumentation")
|
(synopsis "API for opentelemetry instrumentation")
|
||||||
(flags :standard -warn-error -a+8)
|
(flags :standard -warn-error -a+8 -open Opentelemetry_util)
|
||||||
(libraries
|
(libraries
|
||||||
opentelemetry.proto
|
opentelemetry.proto
|
||||||
|
opentelemetry.util
|
||||||
opentelemetry.ambient-context
|
opentelemetry.ambient-context
|
||||||
ptime
|
ptime
|
||||||
ptime.clock.os
|
ptime.clock.os
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ end
|
||||||
|
|
||||||
(** Dummy exporter, does nothing *)
|
(** Dummy exporter, does nothing *)
|
||||||
let dummy : t =
|
let dummy : t =
|
||||||
let ticker = Tick_callbacks.create () in
|
let tick_cbs = Cb_set.create () in
|
||||||
object
|
object
|
||||||
method send_trace = ignore
|
method send_trace = ignore
|
||||||
|
|
||||||
|
|
@ -47,9 +47,9 @@ let dummy : t =
|
||||||
|
|
||||||
method send_logs = ignore
|
method send_logs = ignore
|
||||||
|
|
||||||
method tick () = Tick_callbacks.tick ticker
|
method tick () = Cb_set.trigger tick_cbs
|
||||||
|
|
||||||
method add_on_tick_callback cb = Tick_callbacks.on_tick ticker cb
|
method add_on_tick_callback cb = Cb_set.register tick_cbs cb
|
||||||
|
|
||||||
method cleanup ~on_done () = on_done ()
|
method cleanup ~on_done () = on_done ()
|
||||||
end
|
end
|
||||||
|
|
@ -78,14 +78,15 @@ module Main_exporter = struct
|
||||||
(* hidden *)
|
(* hidden *)
|
||||||
open struct
|
open struct
|
||||||
(* a list of callbacks automatically added to the main exporter *)
|
(* a list of callbacks automatically added to the main exporter *)
|
||||||
let on_tick_cbs_ = AList.make ()
|
let on_tick_cbs_ = Alist.make ()
|
||||||
|
|
||||||
let exporter : t option Atomic.t = Atomic.make None
|
let exporter : t option Atomic.t = Atomic.make None
|
||||||
end
|
end
|
||||||
|
|
||||||
(** Set the global exporter *)
|
(** Set the global exporter *)
|
||||||
let set (exp : t) : unit =
|
let set (exp : #t) : unit =
|
||||||
List.iter exp#add_on_tick_callback (AList.get on_tick_cbs_);
|
let exp = (exp :> t) in
|
||||||
|
List.iter exp#add_on_tick_callback (Alist.get on_tick_cbs_);
|
||||||
Atomic.set exporter (Some exp)
|
Atomic.set exporter (Some exp)
|
||||||
|
|
||||||
(** Remove current exporter, if any.
|
(** Remove current exporter, if any.
|
||||||
|
|
@ -104,25 +105,25 @@ module Main_exporter = struct
|
||||||
let[@inline] get () : t option = Atomic.get exporter
|
let[@inline] get () : t option = Atomic.get exporter
|
||||||
|
|
||||||
let add_on_tick_callback f =
|
let add_on_tick_callback f =
|
||||||
AList.add on_tick_cbs_ f;
|
Alist.add on_tick_cbs_ f;
|
||||||
Option.iter (fun exp -> exp#add_on_tick_callback f) (get ())
|
Option.iter (fun exp -> exp#add_on_tick_callback f) (get ())
|
||||||
end
|
end
|
||||||
|
|
||||||
let set_backend = Main_exporter.set [@@deprecated "use `Main_exporter.set`"]
|
let (set_backend [@deprecated "use `Main_exporter.set`"]) = Main_exporter.set
|
||||||
|
|
||||||
let remove_backend = Main_exporter.remove
|
let (remove_backend [@deprecated "use `Main_exporter.remove`"]) =
|
||||||
[@@deprecated "use `Main_exporter.remove`"]
|
Main_exporter.remove
|
||||||
|
|
||||||
let has_backend = Main_exporter.present
|
let (has_backend [@deprecated "use `Main_exporter.present`"]) =
|
||||||
[@@deprecated "use `Main_exporter.present`"]
|
Main_exporter.present
|
||||||
|
|
||||||
let get_backend = Main_exporter.get [@@deprecated "use `Main_exporter.ge"]
|
let (get_backend [@deprecated "use `Main_exporter.ge"]) = Main_exporter.get
|
||||||
|
|
||||||
let with_setup_debug_backend ?(on_done = ignore) (exp : #t) ?(enable = true) ()
|
let with_setup_debug_backend ?(on_done = ignore) (exp : #t) ?(enable = true) ()
|
||||||
f =
|
f =
|
||||||
let exp = (exp :> t) in
|
let exp = (exp :> t) in
|
||||||
if enable then (
|
if enable then (
|
||||||
set_backend exp;
|
Main_exporter.set exp;
|
||||||
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f
|
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f
|
||||||
) else
|
) else
|
||||||
f ()
|
f ()
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@ open struct
|
||||||
let[@inline] word_to_bytes n = n * bytes_per_word
|
let[@inline] word_to_bytes n = n * bytes_per_word
|
||||||
|
|
||||||
let[@inline] word_to_bytes_f n = n *. float bytes_per_word
|
let[@inline] word_to_bytes_f n = n *. float bytes_per_word
|
||||||
|
|
||||||
|
let default_interval_s = 20
|
||||||
end
|
end
|
||||||
|
|
||||||
let get_metrics () : Metrics.t list =
|
let get_metrics () : Metrics.t list =
|
||||||
|
|
@ -34,16 +36,23 @@ let get_metrics () : Metrics.t list =
|
||||||
[ int ~now gc.Gc.compactions ];
|
[ int ~now gc.Gc.compactions ];
|
||||||
]
|
]
|
||||||
|
|
||||||
let setup (exp : #Exporter.t) =
|
let setup ?(min_interval_s = default_interval_s) (exp : #Exporter.t) =
|
||||||
|
(* limit rate *)
|
||||||
|
let min_interval_s = max 5 min_interval_s in
|
||||||
|
let min_interval = Mtime.Span.(min_interval_s * s) in
|
||||||
|
let limiter = Interval_limiter.create ~min_interval () in
|
||||||
|
|
||||||
let on_tick () =
|
let on_tick () =
|
||||||
|
if Interval_limiter.make_attempt limiter then (
|
||||||
let m = get_metrics () in
|
let m = get_metrics () in
|
||||||
exp#send_metrics m
|
exp#send_metrics m
|
||||||
|
)
|
||||||
in
|
in
|
||||||
Exporter.on_tick exp on_tick
|
Exporter.on_tick exp on_tick
|
||||||
|
|
||||||
let setup_on_main_exporter () =
|
let setup_on_main_exporter ?min_interval_s () =
|
||||||
match Exporter.Main_exporter.get () with
|
match Exporter.Main_exporter.get () with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some exp -> setup exp
|
| Some exp -> setup ?min_interval_s exp
|
||||||
|
|
||||||
let basic_setup = setup_on_main_exporter
|
let basic_setup () = setup_on_main_exporter ()
|
||||||
|
|
|
||||||
|
|
@ -5,13 +5,17 @@
|
||||||
val get_metrics : unit -> Metrics.t list
|
val get_metrics : unit -> Metrics.t list
|
||||||
(** Get a few metrics from the current state of the GC. *)
|
(** Get a few metrics from the current state of the GC. *)
|
||||||
|
|
||||||
val setup : #Exporter.t -> unit
|
val setup : ?min_interval_s:int -> #Exporter.t -> unit
|
||||||
(** Setup a hook that will emit GC statistics on every tick. It does assume that
|
(** Setup a hook that will emit GC statistics on every tick. It does assume that
|
||||||
[tick] is called regularly on the exporter. For example, if we ensure the
|
[tick] is called regularly on the exporter. For example, if we ensure the
|
||||||
exporter's [tick] function is called every 5s, we'll get GC metrics every
|
exporter's [tick] function is called every 5s, we'll get GC metrics every
|
||||||
5s. *)
|
5s.
|
||||||
|
|
||||||
val setup_on_main_exporter : unit -> unit
|
@param min_interval_s
|
||||||
|
if provided, GC metrics will be emitted at most every [min_interval_s]
|
||||||
|
seconds. This prevents flooding. Default value is 20s. *)
|
||||||
|
|
||||||
|
val setup_on_main_exporter : ?min_interval_s:int -> unit -> unit
|
||||||
(** Setup the hook on the main exporter. *)
|
(** Setup the hook on the main exporter. *)
|
||||||
|
|
||||||
val basic_setup : unit -> unit [@@deprecated "use setup_on_main_exporter"]
|
val basic_setup : unit -> unit [@@deprecated "use setup_on_main_exporter"]
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
open Common_
|
open Common_
|
||||||
|
|
||||||
type t = { cbs: (unit -> Metrics.t list) AList.t } [@@unboxed]
|
type t = { cbs: (unit -> Metrics.t list) Alist.t } [@@unboxed]
|
||||||
|
|
||||||
let create () : t = { cbs = AList.make () }
|
let create () : t = { cbs = Alist.make () }
|
||||||
|
|
||||||
let[@inline] add_metrics_cb (self : t) f = AList.add self.cbs f
|
let[@inline] add_metrics_cb (self : t) f = Alist.add self.cbs f
|
||||||
|
|
||||||
let add_to_exporter (exp : #Exporter.t) (self : t) =
|
let add_to_exporter (exp : #Exporter.t) (self : t) =
|
||||||
let on_tick () =
|
let on_tick () =
|
||||||
|
|
@ -14,7 +14,7 @@ let add_to_exporter (exp : #Exporter.t) (self : t) =
|
||||||
(fun f ->
|
(fun f ->
|
||||||
let f_metrics = f () in
|
let f_metrics = f () in
|
||||||
res := List.rev_append f_metrics !res)
|
res := List.rev_append f_metrics !res)
|
||||||
(AList.get self.cbs);
|
(Alist.get self.cbs);
|
||||||
let metrics = !res in
|
let metrics = !res in
|
||||||
|
|
||||||
(* emit the metrics *)
|
(* emit the metrics *)
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ open Common_
|
||||||
module Rand_bytes = Rand_bytes
|
module Rand_bytes = Rand_bytes
|
||||||
(** Generation of random identifiers. *)
|
(** Generation of random identifiers. *)
|
||||||
|
|
||||||
module AList = AList
|
module Alist = Alist
|
||||||
(** Atomic list, for internal usage
|
(** Atomic list, for internal usage
|
||||||
@since 0.7 *)
|
@since 0.7 *)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
type cb = unit -> unit
|
|
||||||
|
|
||||||
type t = { cbs: cb AList.t } [@@unboxed]
|
|
||||||
|
|
||||||
let create () : t = { cbs = AList.make () }
|
|
||||||
|
|
||||||
let[@inline] on_tick self f = AList.add self.cbs f
|
|
||||||
|
|
||||||
let[@inline] tick self = List.iter (fun f -> f ()) (AList.get self.cbs)
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
(** A collection of callbacks that are regularly called. *)
|
|
||||||
|
|
||||||
type t
|
|
||||||
|
|
||||||
val create : unit -> t
|
|
||||||
|
|
||||||
val on_tick : t -> (unit -> unit) -> unit
|
|
||||||
|
|
||||||
val tick : t -> unit
|
|
||||||
|
|
@ -1,47 +0,0 @@
|
||||||
open Common_
|
|
||||||
|
|
||||||
let int_to_hex (i : int) =
|
|
||||||
if i < 10 then
|
|
||||||
Char.chr (i + Char.code '0')
|
|
||||||
else
|
|
||||||
Char.chr (i - 10 + Char.code 'a')
|
|
||||||
|
|
||||||
let bytes_to_hex_into b res off : unit =
|
|
||||||
for i = 0 to Bytes.length b - 1 do
|
|
||||||
let n = Char.code (Bytes.get b i) in
|
|
||||||
Bytes.set res ((2 * i) + off) (int_to_hex ((n land 0xf0) lsr 4));
|
|
||||||
Bytes.set res ((2 * i) + 1 + off) (int_to_hex (n land 0x0f))
|
|
||||||
done
|
|
||||||
|
|
||||||
let bytes_to_hex (b : bytes) : string =
|
|
||||||
let res = Bytes.create (2 * Bytes.length b) in
|
|
||||||
bytes_to_hex_into b res 0;
|
|
||||||
Bytes.unsafe_to_string res
|
|
||||||
|
|
||||||
let int_of_hex = function
|
|
||||||
| '0' .. '9' as c -> Char.code c - Char.code '0'
|
|
||||||
| 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a'
|
|
||||||
| c -> raise (Invalid_argument (spf "invalid hex char: %C" c))
|
|
||||||
|
|
||||||
let bytes_of_hex_substring (s : string) off len =
|
|
||||||
if len mod 2 <> 0 then
|
|
||||||
raise (Invalid_argument "hex sequence must be of even length");
|
|
||||||
let res = Bytes.make (len / 2) '\x00' in
|
|
||||||
for i = 0 to (len / 2) - 1 do
|
|
||||||
let n1 = int_of_hex (String.get s (off + (2 * i))) in
|
|
||||||
let n2 = int_of_hex (String.get s (off + (2 * i) + 1)) in
|
|
||||||
let n = (n1 lsl 4) lor n2 in
|
|
||||||
Bytes.set res i (Char.chr n)
|
|
||||||
done;
|
|
||||||
res
|
|
||||||
|
|
||||||
let bytes_of_hex (s : string) : bytes =
|
|
||||||
bytes_of_hex_substring s 0 (String.length s)
|
|
||||||
|
|
||||||
let bytes_non_zero (self : bytes) : bool =
|
|
||||||
try
|
|
||||||
for i = 0 to Bytes.length self - 1 do
|
|
||||||
if Char.code (Bytes.unsafe_get self i) <> 0 then raise_notrace Exit
|
|
||||||
done;
|
|
||||||
false
|
|
||||||
with Exit -> true
|
|
||||||
|
|
@ -1,12 +0,0 @@
|
||||||
(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *)
|
|
||||||
(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
|
|
||||||
let[@inline never] protect m f =
|
|
||||||
Mutex.lock m;
|
|
||||||
match f () with
|
|
||||||
| x ->
|
|
||||||
Mutex.unlock m;
|
|
||||||
x
|
|
||||||
| exception e ->
|
|
||||||
(* NOTE: [unlock] does not poll for asynchronous exceptions *)
|
|
||||||
Mutex.unlock m;
|
|
||||||
Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ())
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
val protect : Mutex.t -> (unit -> 'a) -> 'a
|
|
||||||
Loading…
Add table
Reference in a new issue