mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
wip: various fixes
This commit is contained in:
parent
3fdb0eebd5
commit
ebed5d7ce8
4 changed files with 12 additions and 62 deletions
|
|
@ -1,7 +1,7 @@
|
||||||
type t = Opentelemetry_client.Config.t
|
type t = Opentelemetry_client.Client_config.t
|
||||||
|
|
||||||
module Env = Opentelemetry_client.Config.Env ()
|
module Env = Opentelemetry_client.Client_config.Env ()
|
||||||
|
|
||||||
let pp = Opentelemetry_client.Config.pp
|
let pp = Opentelemetry_client.Client_config.pp
|
||||||
|
|
||||||
let make = Env.make (fun common () -> common)
|
let make = Env.make (fun common () -> common)
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
type t = Opentelemetry_client.Config.t
|
type t = Opentelemetry_client.Client_config.t
|
||||||
(** Configuration.
|
(** Configuration.
|
||||||
|
|
||||||
To build one, use {!make} below. This might be extended with more fields in
|
To build one, use {!make} below. This might be extended with more fields in
|
||||||
|
|
@ -6,7 +6,7 @@ type t = Opentelemetry_client.Config.t
|
||||||
|
|
||||||
val pp : Format.formatter -> t -> unit
|
val pp : Format.formatter -> t -> unit
|
||||||
|
|
||||||
val make : (unit -> t) Opentelemetry_client.Config.make
|
val make : (unit -> t) Opentelemetry_client.Client_config.make
|
||||||
(** Make a configuration {!t}. *)
|
(** Make a configuration {!t}. *)
|
||||||
|
|
||||||
module Env : Opentelemetry_client.Config.ENV
|
module Env : Opentelemetry_client.Client_config.ENV
|
||||||
|
|
|
||||||
|
|
@ -25,48 +25,6 @@ let last_gc_metrics = Atomic.make (Mtime_clock.now ())
|
||||||
|
|
||||||
let timeout_gc_metrics = Mtime.Span.(20 * s)
|
let timeout_gc_metrics = Mtime.Span.(20 * s)
|
||||||
|
|
||||||
(* Cross-domain, thread-safe storage for GC metrics gathered from different fibres. *)
|
|
||||||
module GC_metrics : sig
|
|
||||||
val add : Proto.Metrics.resource_metrics -> unit
|
|
||||||
|
|
||||||
val drain : unit -> Proto.Metrics.resource_metrics list
|
|
||||||
end = struct
|
|
||||||
(* Used to prevent data races across domains *)
|
|
||||||
let mutex = Eio.Mutex.create ()
|
|
||||||
|
|
||||||
let gc_metrics = ref []
|
|
||||||
|
|
||||||
let add m =
|
|
||||||
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
|
|
||||||
gc_metrics := m :: !gc_metrics)
|
|
||||||
|
|
||||||
let drain () =
|
|
||||||
Eio.Mutex.use_rw ~protect:true mutex (fun () ->
|
|
||||||
let metrics = !gc_metrics in
|
|
||||||
gc_metrics := [];
|
|
||||||
metrics)
|
|
||||||
end
|
|
||||||
|
|
||||||
(* capture current GC metrics if {!needs_gc_metrics} is true,
|
|
||||||
or it has been a long time since the last GC metrics collection,
|
|
||||||
and push them into {!gc_metrics} for later collection *)
|
|
||||||
let sample_gc_metrics_if_needed () =
|
|
||||||
let now = Mtime_clock.now () in
|
|
||||||
let alarm = Atomic.compare_and_set needs_gc_metrics true false in
|
|
||||||
let timeout () =
|
|
||||||
let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in
|
|
||||||
Mtime.Span.compare elapsed timeout_gc_metrics > 0
|
|
||||||
in
|
|
||||||
if alarm || timeout () then (
|
|
||||||
Atomic.set last_gc_metrics now;
|
|
||||||
let l =
|
|
||||||
OT.Metrics.make_resource_metrics
|
|
||||||
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
|
|
||||||
@@ Opentelemetry.GC_metrics.get_metrics ()
|
|
||||||
in
|
|
||||||
GC_metrics.add l
|
|
||||||
)
|
|
||||||
|
|
||||||
type error =
|
type error =
|
||||||
[ `Status of int * Opentelemetry.Proto.Status.status
|
[ `Status of int * Opentelemetry.Proto.Status.status
|
||||||
| `Failure of string
|
| `Failure of string
|
||||||
|
|
@ -282,7 +240,6 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
|
||||||
|
|
||||||
let push_metrics x =
|
let push_metrics x =
|
||||||
let@ () = guard_exn_ "push metrics" in
|
let@ () = guard_exn_ "push metrics" in
|
||||||
sample_gc_metrics_if_needed ();
|
|
||||||
push_to_batch batch_metrics x
|
push_to_batch batch_metrics x
|
||||||
|
|
||||||
let push_logs x =
|
let push_logs x =
|
||||||
|
|
@ -299,8 +256,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
|
||||||
|
|
||||||
let emit_metrics_maybe =
|
let emit_metrics_maybe =
|
||||||
maybe_emit batch_metrics config.url_metrics (fun collected_metrics ->
|
maybe_emit batch_metrics config.url_metrics (fun collected_metrics ->
|
||||||
let gc_metrics = GC_metrics.drain () in
|
collected_metrics |> Signal.Encode.metrics)
|
||||||
gc_metrics @ collected_metrics |> Signal.Encode.metrics)
|
|
||||||
|
|
||||||
let emit_logs_maybe =
|
let emit_logs_maybe =
|
||||||
maybe_emit batch_logs config.url_logs Signal.Encode.logs
|
maybe_emit batch_logs config.url_logs Signal.Encode.logs
|
||||||
|
|
@ -336,7 +292,6 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
|
||||||
if Config.Env.get_debug () then
|
if Config.Env.get_debug () then
|
||||||
Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int);
|
Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int);
|
||||||
run_tick_callbacks ();
|
run_tick_callbacks ();
|
||||||
sample_gc_metrics_if_needed ();
|
|
||||||
emit_all ~force:false
|
emit_all ~force:false
|
||||||
|
|
||||||
let cleanup ~on_done () =
|
let cleanup ~on_done () =
|
||||||
|
|
@ -344,13 +299,12 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
|
||||||
Printf.eprintf "opentelemetry: exiting…\n%!";
|
Printf.eprintf "opentelemetry: exiting…\n%!";
|
||||||
Atomic.set stop true;
|
Atomic.set stop true;
|
||||||
run_tick_callbacks ();
|
run_tick_callbacks ();
|
||||||
sample_gc_metrics_if_needed ();
|
|
||||||
emit_all ~force:true;
|
emit_all ~force:true;
|
||||||
on_done ()
|
on_done ()
|
||||||
end in
|
end in
|
||||||
(module M : EMITTER)
|
(module M : EMITTER)
|
||||||
|
|
||||||
module Backend (Emitter : EMITTER) : Opentelemetry.Collector.BACKEND = struct
|
module Backend (Emitter : EMITTER) : Opentelemetry.Exporter.t = struct
|
||||||
open Opentelemetry.Proto
|
open Opentelemetry.Proto
|
||||||
open Opentelemetry.Collector
|
open Opentelemetry.Collector
|
||||||
open Emitter
|
open Emitter
|
||||||
|
|
|
||||||
|
|
@ -34,15 +34,11 @@ let emit_telemetry do_emit = Logs.Tag.(empty |> add emit_telemetry_tag do_emit)
|
||||||
(*****************************************************************************)
|
(*****************************************************************************)
|
||||||
|
|
||||||
(* Log a message to otel with some attrs *)
|
(* Log a message to otel with some attrs *)
|
||||||
let log ?service_name ?(attrs = []) ?(scope = Otel.Scope.get_ambient_scope ())
|
let log ?service_name ?(attrs = []) ?(scope = Otel.Ambient_span.get ()) ~level
|
||||||
~level msg =
|
msg =
|
||||||
let log_level = Logs.level_to_string (Some level) in
|
let log_level = Logs.level_to_string (Some level) in
|
||||||
let span_id =
|
let span_id = Option.map Otel.Span.id scope in
|
||||||
Option.map (fun (scope : Otel.Scope.t) -> scope.span_id) scope
|
let trace_id = Option.map Otel.Span.trace_id scope in
|
||||||
in
|
|
||||||
let trace_id =
|
|
||||||
Option.map (fun (scope : Otel.Scope.t) -> scope.trace_id) scope
|
|
||||||
in
|
|
||||||
let severity = log_level_to_severity level in
|
let severity = log_level_to_severity level in
|
||||||
let log =
|
let log =
|
||||||
Otel.Log_record.make_str ~severity ~log_level ?trace_id ?span_id msg
|
Otel.Log_record.make_str ~severity ~log_level ?trace_id ?span_id msg
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue