Refactor client configuration

Enabling sharing all common configuration logic
This commit is contained in:
Shon Feder 2025-06-09 21:12:44 -04:00
parent 885d0b6a75
commit d62f680fc3
No known key found for this signature in database
14 changed files with 393 additions and 413 deletions

View file

@ -5,55 +5,3 @@ let[@inline] ( let@ ) f x = f x
let spf = Printf.sprintf let spf = Printf.sprintf
let tid () = Thread.id @@ Thread.self () let tid () = Thread.id @@ Thread.self ()
let debug_ =
ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
| Some ("1" | "true") -> true
| _ -> false)
let default_url = "http://localhost:4318"
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value
let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"
let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"
let get_url_logs_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url
let parse_headers s =
let parse_header s =
match String.split_on_char '=' s with
| [ key; value ] -> key, value
| _ -> failwith "Unexpected format for header"
in
String.split_on_char ',' s |> List.map parse_header
let default_headers = []
let headers =
ref
(try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS")
with _ -> default_headers)
let get_headers () = !headers
let set_headers s = headers := s

View file

@ -1,79 +1,7 @@
open Common_ type t = Client.Config.t
type t = { module Env = Client.Config.Env ()
debug: bool;
url_traces: string;
url_metrics: string;
url_logs: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
}
let pp out self : unit = let pp = Client.Config.pp
let ppiopt = Format.pp_print_option Format.pp_print_int in
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ url_traces=%S;@ url_metrics=%S;@ url_logs=%S;@ \
headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \
batch_timeout_ms=%d; @]}"
debug url_traces url_metrics url_logs ppheaders headers ppiopt batch_traces
ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms
let make ?(debug = !debug_) ?url ?url_traces ?url_metrics ?url_logs let make = Env.make (fun common () -> common)
?(headers = get_headers ()) ?(batch_traces = Some 400)
?(batch_metrics = Some 20) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) () : t =
let url_traces, url_metrics, url_logs =
let base_url =
let base_url =
match get_url_from_env () with
| None -> Option.value url ~default:default_url
| Some url -> remove_trailing_slash url
in
remove_trailing_slash base_url
in
let url_traces =
match get_url_traces_from_env () with
| None -> Option.value url_traces ~default:(base_url ^ "/v1/traces")
| Some url -> url
in
let url_metrics =
match get_url_metrics_from_env () with
| None -> Option.value url_metrics ~default:(base_url ^ "/v1/metrics")
| Some url -> url
in
let url_logs =
match get_url_logs_from_env () with
| None -> Option.value url_logs ~default:(base_url ^ "/v1/logs")
| Some url -> url
in
url_traces, url_metrics, url_logs
in
{
debug;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
batch_timeout_ms;
batch_logs;
}

View file

@ -1,78 +1,12 @@
type t = private { type t = Client.Config.t
debug: bool;
url_traces: string; (** Url to send traces *)
url_metrics: string; (** Url to send metrics*)
url_logs: string; (** Url to send logs *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most) [i]
items. If [None], there is no batching.
Note that traces and metrics are batched separately. Default
[Some 400]. *)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately. Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details. Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete. Note that the batch might take longer than that, because
this is only checked when a new event occurs. Default 500. *)
}
(** Configuration. (** Configuration.
To build one, use {!make} below. This might be extended with more fields in To build one, use {!make} below. This might be extended with more fields in
the future. *) the future. *)
val make :
?debug:bool ->
?url:string ->
?url_traces:string ->
?url_metrics:string ->
?url_logs:string ->
?headers:(string * string) list ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?batch_timeout_ms:int ->
unit ->
t
(** Make a configuration.
@param thread
if true and [bg_threads] is not provided, we will pick a number of bg
threads. Otherwise the number of [bg_threads] superseeds this option.
@param url
base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url
http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal
type.
@param url_traces
url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics
url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_logs
url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is
used as-is without any modification. *)
val pp : Format.formatter -> t -> unit val pp : Format.formatter -> t -> unit
val make : (unit -> t) Client.Config.make
(** Make a configuration {!t}. *)
module Env : Client.Config.Env

View file

@ -6,6 +6,7 @@
(pps lwt_ppx)) (pps lwt_ppx))
(libraries (libraries
opentelemetry opentelemetry
opentelemetry.client
lwt lwt
cohttp-lwt cohttp-lwt
cohttp-lwt-unix cohttp-lwt-unix

View file

@ -6,7 +6,11 @@
module OT = Opentelemetry module OT = Opentelemetry
module Config = Config module Config = Config
open Opentelemetry open Opentelemetry
include Common_ open Common_
let set_headers = Config.Env.set_headers
let get_headers = Config.Env.get_headers
external reraise : exn -> 'a = "%reraise" external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
@ -98,7 +102,7 @@ end = struct
let uri = Uri.of_string url in let uri = Uri.of_string url in
let open Cohttp in let open Cohttp in
let headers = Header.(add_list (init ()) !headers) in let headers = Header.(add_list (init ()) (Config.Env.get_headers ())) in
let headers = let headers =
Header.(add headers "Content-Type" "application/x-protobuf") Header.(add headers "Content-Type" "application/x-protobuf")
in in
@ -312,7 +316,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Metrics_service.default_export_metrics_service_request Metrics_service.default_export_metrics_service_request
~resource_metrics:l () ~resource_metrics:l ()
in in
let url = config.Config.url_metrics in let url = config.url_metrics in
send_http_ curl encoder ~url send_http_ curl encoder ~url
~encode:Metrics_service.encode_pb_export_metrics_service_request x ~encode:Metrics_service.encode_pb_export_metrics_service_request x
@ -321,7 +325,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let x = let x =
Trace_service.default_export_trace_service_request ~resource_spans:l () Trace_service.default_export_trace_service_request ~resource_spans:l ()
in in
let url = config.Config.url_traces in let url = config.url_traces in
send_http_ curl encoder ~url send_http_ curl encoder ~url
~encode:Trace_service.encode_pb_export_trace_service_request x ~encode:Trace_service.encode_pb_export_trace_service_request x
@ -330,7 +334,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let x = let x =
Logs_service.default_export_logs_service_request ~resource_logs:l () Logs_service.default_export_logs_service_request ~resource_logs:l ()
in in
let url = config.Config.url_logs in let url = config.url_logs in
send_http_ curl encoder ~url send_http_ curl encoder ~url
~encode:Logs_service.encode_pb_export_logs_service_request x ~encode:Logs_service.encode_pb_export_logs_service_request x
@ -374,7 +378,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
() ()
let tick_common_ () = let tick_common_ () =
if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ()); if Config.Env.get_debug () then
Printf.eprintf "tick (from %d)\n%!" (tid ());
sample_gc_metrics_if_needed (); sample_gc_metrics_if_needed ();
List.iter List.iter
(fun f -> (fun f ->
@ -449,7 +454,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let tick () = Lwt.async tick_ let tick () = Lwt.async tick_
let cleanup ~on_done () = let cleanup ~on_done () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: exiting…\n%!";
Lwt.async (fun () -> Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc; Httpc.cleanup httpc;
@ -474,7 +480,7 @@ module Backend
{ {
send = send =
(fun l ~ret -> (fun l ~ret ->
(if !debug_ then (if Config.Env.get_debug () then
let@ () = Lock.with_lock in let@ () = Lock.with_lock in
Format.eprintf "send spans %a@." Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans) (Format.pp_print_list Trace.pp_resource_spans)
@ -489,7 +495,7 @@ module Backend
(* send metrics from time to time *) (* send metrics from time to time *)
let signal_emit_gc_metrics () = let signal_emit_gc_metrics () =
if !debug_ then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true Atomic.set needs_gc_metrics true
@ -531,7 +537,7 @@ module Backend
{ {
send = send =
(fun m ~ret -> (fun m ~ret ->
(if !debug_ then (if Config.Env.get_debug () then
let@ () = Lock.with_lock in let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@." Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics) (Format.pp_print_list Metrics.pp_resource_metrics)
@ -546,7 +552,7 @@ module Backend
{ {
send = send =
(fun m ~ret -> (fun m ~ret ->
(if !debug_ then (if Config.Env.get_debug () then
let@ () = Lock.with_lock in let@ () = Lock.with_lock in
Format.eprintf "send logs %a@." Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs) (Format.pp_print_list Logs.pp_resource_logs)
@ -558,8 +564,6 @@ module Backend
end end
let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
debug_ := config.debug;
let module B = let module B =
Backend Backend
(struct (struct

View file

@ -6,55 +6,3 @@ let spf = Printf.sprintf
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let tid () = Thread.id @@ Thread.self () let tid () = Thread.id @@ Thread.self ()
let debug_ =
ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
| Some ("1" | "true") -> true
| _ -> false)
let default_url = "http://localhost:4318"
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value
let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"
let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"
let get_url_logs_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url
let parse_headers s =
let parse_header s =
match String.split_on_char '=' s with
| [ key; value ] -> key, value
| _ -> failwith "Unexpected format for header"
in
String.split_on_char ',' s |> List.map parse_header
let default_headers = []
let headers =
ref
(try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS")
with _ -> default_headers)
let get_headers () = !headers
let set_headers s = headers := s

View file

@ -1,83 +1,30 @@
open Common_
type t = { type t = {
debug: bool;
url_traces: string;
url_metrics: string;
url_logs: string;
headers: (string * string) list;
batch_timeout_ms: int;
bg_threads: int; bg_threads: int;
(** Are there background threads, and how many? Default [4]. This will be
adjusted to be at least [1] and at most [32]. *)
ticker_thread: bool; ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should be
sent to the collector. Default [true] *)
ticker_interval_ms: int; ticker_interval_ms: int;
self_trace: bool; (** Interval for ticker thread, in milliseconds. This is only useful if
[ticker_thread] is [true]. This will be clamped between [2 ms] and
some longer interval (maximum [60s] currently). Default 500.
@since 0.7 *)
common: Client.Config.t;
(** Common configuration options
@since 0.12*)
} }
let pp out self = let pp fmt _ = Format.pp_print_string fmt "TODO"
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
url_traces;
url_metrics;
url_logs;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ url_traces=%S;@ url_metrics=%S;@ url_logs=%S;@ \
headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B;@ \
ticker_interval_ms=%d;@ self_trace=%B @]}"
debug url_traces url_metrics url_logs ppheaders headers batch_timeout_ms
bg_threads ticker_thread ticker_interval_ms self_trace
let make ?(debug = !debug_) ?url ?url_traces ?url_metrics ?url_logs module Env = Client.Config.Env ()
?(headers = get_headers ()) ?(batch_timeout_ms = 2_000) ?(bg_threads = 4)
?(ticker_thread = true) ?(ticker_interval_ms = 500) ?(self_trace = false) ()
: t =
let bg_threads = max 1 (min bg_threads 32) in
let url_traces, url_metrics, url_logs = let make =
let base_url = Env.make
let base_url = (fun
match get_url_from_env () with common
| None -> Option.value url ~default:default_url ?(bg_threads = 4)
| Some url -> remove_trailing_slash url ?(ticker_thread = true)
in ?(ticker_interval_ms = 500)
remove_trailing_slash base_url ()
in -> { bg_threads; ticker_thread; ticker_interval_ms; common })
let url_traces =
match get_url_traces_from_env () with
| None -> Option.value url_traces ~default:(base_url ^ "/v1/traces")
| Some url -> url
in
let url_metrics =
match get_url_metrics_from_env () with
| None -> Option.value url_metrics ~default:(base_url ^ "/v1/metrics")
| Some url -> url
in
let url_logs =
match get_url_logs_from_env () with
| None -> Option.value url_logs ~default:(base_url ^ "/v1/logs")
| Some url -> url
in
url_traces, url_metrics, url_logs
in
{
debug;
url_traces;
url_metrics;
url_logs;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
}

View file

@ -1,18 +1,6 @@
(** Configuration for the ocurl backend *) (** Configuration for the ocurl backend *)
type t = private { type t = {
debug: bool;
url_traces: string; (** Url to send traces *)
url_metrics: string; (** Url to send metrics*)
url_logs: string; (** Url to send logs *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete. Note that the batch might take longer than that, because
this is only checked when a new event occurs or when a tick is
emitted. Default 2_000. *)
bg_threads: int; bg_threads: int;
(** Are there background threads, and how many? Default [4]. This will be (** Are there background threads, and how many? Default [4]. This will be
adjusted to be at least [1] and at most [32]. *) adjusted to be at least [1] and at most [32]. *)
@ -24,56 +12,24 @@ type t = private {
[ticker_thread] is [true]. This will be clamped between [2 ms] and [ticker_thread] is [true]. This will be clamped between [2 ms] and
some longer interval (maximum [60s] currently). Default 500. some longer interval (maximum [60s] currently). Default 500.
@since 0.7 *) @since 0.7 *)
self_trace: bool; common: Client.Config.t;
(** If true, the OTEL library will also emit its own spans. Default (** Common configuration options
[false]. @since NEXT_RELEASE*)
@since 0.7 *)
} }
(** Configuration. (** Configuration.
To build one, use {!make} below. This might be extended with more fields in To build one, use {!make} below. This might be extended with more fields in
the future. *) the future. *)
val pp : Format.formatter -> t -> unit
val make : val make :
?debug:bool -> (?bg_threads:int ->
?url:string ->
?url_traces:string ->
?url_metrics:string ->
?url_logs:string ->
?headers:(string * string) list ->
?batch_timeout_ms:int ->
?bg_threads:int ->
?ticker_thread:bool -> ?ticker_thread:bool ->
?ticker_interval_ms:int -> ?ticker_interval_ms:int ->
?self_trace:bool ->
unit -> unit ->
t t)
(** Make a configuration. Client.Config.make
(** Make a configuration {!t}. *)
@param url module Env : Client.Config.Env
base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url
http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal
type.
@param url_traces
url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics
url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_logs
url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is
used as-is without any modification. *)
val pp : Format.formatter -> t -> unit

View file

@ -4,6 +4,7 @@
(libraries (libraries
opentelemetry opentelemetry
opentelemetry.atomic opentelemetry.atomic
opentelemetry.client
curl curl
pbrt pbrt
threads threads

View file

@ -8,6 +8,10 @@ module Config = Config
open Opentelemetry open Opentelemetry
include Common_ include Common_
let get_headers = Config.Env.get_headers
let set_headers = Config.Env.set_headers
let needs_gc_metrics = Atomic.make false let needs_gc_metrics = Atomic.make false
let last_gc_metrics = Atomic.make (Mtime_clock.now ()) let last_gc_metrics = Atomic.make (Mtime_clock.now ())
@ -147,7 +151,8 @@ end = struct
mutable send_threads: Thread.t array; (** Threads that send data via http *) mutable send_threads: Thread.t array; (** Threads that send data via http *)
} }
let send_http_ ~stop ~config (client : Curl.t) encoder ~url ~encode x : unit = let send_http_ ~stop ~(config : Config.t) (client : Curl.t) encoder ~url
~encode x : unit =
let@ _sc = let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
in in
@ -161,11 +166,11 @@ end = struct
Pbrt.Encoder.to_string encoder Pbrt.Encoder.to_string encoder
in in
if !debug_ || config.Config.debug then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
(String.length data); (String.length data);
let headers = let headers =
("Content-Type", "application/x-protobuf") :: config.headers ("Content-Type", "application/x-protobuf") :: config.common.headers
in in
match match
let@ _sc = let@ _sc =
@ -175,14 +180,14 @@ end = struct
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
with with
| Ok { code; _ } when code >= 200 && code < 300 -> | Ok { code; _ } when code >= 200 && code < 300 ->
if !debug_ || config.debug then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: got response code=%d\n%!" code Printf.eprintf "opentelemetry: got response code=%d\n%!" code
| Ok { code; body; headers = _; info = _ } -> | Ok { code; body; headers = _; info = _ } ->
Atomic.incr n_errors; Atomic.incr n_errors;
Self_trace.add_event _sc Self_trace.add_event _sc
@@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]; @@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ];
if !debug_ || config.debug then ( if Config.Env.get_debug () then (
let dec = Pbrt.Decoder.of_string body in let dec = Pbrt.Decoder.of_string body in
let body = let body =
try try
@ -221,7 +226,7 @@ end = struct
let x = let x =
Logs_service.default_export_logs_service_request ~resource_logs:l () Logs_service.default_export_logs_service_request ~resource_logs:l ()
in in
send_http_ ~stop ~config client encoder ~url:config.Config.url_logs send_http_ ~stop ~config client encoder ~url:config.Config.common.url_logs
~encode:Logs_service.encode_pb_export_logs_service_request x ~encode:Logs_service.encode_pb_export_logs_service_request x
let send_metrics_http ~stop ~config curl encoder let send_metrics_http ~stop ~config curl encoder
@ -236,7 +241,7 @@ end = struct
Metrics_service.default_export_metrics_service_request ~resource_metrics:l Metrics_service.default_export_metrics_service_request ~resource_metrics:l
() ()
in in
send_http_ ~stop ~config curl encoder ~url:config.Config.url_metrics send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_metrics
~encode:Metrics_service.encode_pb_export_metrics_service_request x ~encode:Metrics_service.encode_pb_export_metrics_service_request x
let send_traces_http ~stop ~config curl encoder let send_traces_http ~stop ~config curl encoder
@ -250,7 +255,7 @@ end = struct
let x = let x =
Trace_service.default_export_trace_service_request ~resource_spans:l () Trace_service.default_export_trace_service_request ~resource_spans:l ()
in in
send_http_ ~stop ~config curl encoder ~url:config.Config.url_traces send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_traces
~encode:Trace_service.encode_pb_export_trace_service_request x ~encode:Trace_service.encode_pb_export_trace_service_request x
let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev
@ -287,7 +292,7 @@ end = struct
(Batch.len b > 0 || side != []) (Batch.len b > 0 || side != [])
&& (Batch.len b >= batch_max_size_ && (Batch.len b >= batch_max_size_
|| ||
let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in let timeout = Mtime.Span.(config.Config.common.batch_timeout_ms * ms) in
let elapsed = Mtime.span now (Batch.time_started b) in let elapsed = Mtime.span now (Batch.time_started b) in
Mtime.Span.compare elapsed timeout >= 0) Mtime.Span.compare elapsed timeout >= 0)
@ -423,7 +428,7 @@ let create_backend ?(stop = Atomic.make false)
let timeout_sent_metrics = Mtime.Span.(5 * s) let timeout_sent_metrics = Mtime.Span.(5 * s)
let signal_emit_gc_metrics () = let signal_emit_gc_metrics () =
if !debug_ || config.debug then if config.common.debug then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true Atomic.set needs_gc_metrics true
@ -508,7 +513,7 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
let backend = create_backend ~stop ~config () in let backend = create_backend ~stop ~config () in
Opentelemetry.Collector.set_backend backend; Opentelemetry.Collector.set_backend backend;
Atomic.set Self_trace.enabled config.self_trace; Atomic.set Self_trace.enabled config.common.self_trace;
if config.ticker_thread then ( if config.ticker_thread then (
(* at most a minute *) (* at most a minute *)

304
src/client/client.ml Normal file
View file

@ -0,0 +1,304 @@
(** Utilities for writing clients
These are used for implementing e.g., the [opentelemetry-client-cohttp-lwt]
and [opentelemetry-client-ocurl] packages package. *)
(** Constructing and managing the configuration needed in common by all clients
*)
module Config : sig
type t = private {
debug: bool;
url_traces: string; (** Url to send traces *)
url_metrics: string; (** Url to send metrics*)
url_logs: string; (** Url to send logs *)
headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately. Default
[Some 400]. *)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately. Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details. Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete. Note that the batch might take longer than that, because
this is only checked when a new event occurs or when a tick is
emitted. Default 2_000. *)
self_trace: bool;
(** If true, the OTEL library will also emit its own spans. Default
[false].
@since 0.7 *)
}
(** Configuration.
To build one, use {!make} below. This might be extended with more fields
in the future. *)
val pp : Format.formatter -> t -> unit
type 'k make =
?debug:bool ->
?url:string ->
?url_traces:string ->
?url_metrics:string ->
?url_logs:string ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?headers:(string * string) list ->
?batch_timeout_ms:int ->
?self_trace:bool ->
'k
(** A function that gathers all the values needed to construct a {!t}, and
produces a ['k]. ['k] is typically a continuation used to construct a
configuration that includes a {!t}.
@param url
base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url
http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal
type.
@param url_traces
url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics
url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_logs
url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url
is used as-is without any modification. *)
(** Construct, inspect, and update {!t} configurations, drawing defaults from
the environment and encapsulating state *)
module type Env = sig
val get_debug : unit -> bool
val set_debug : bool -> unit
val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit
val make : (t -> 'a) -> 'a make
(** [make f] is a {!type:make} function that will give [f] a safely
constructed {!t}.
Typically this is used to extend the constructor for {!t} with new
optional arguments.
E.g., we can construct a configuration that includes a {!t} alongside a
more specific field like so:
{[
type extended_confg =
{ new_field: string
; common: t
}
let make : (?new_field -> unit) make =
Env.make (fun common ?new_field () -> {new_field; common})
let _example : extended_config =
make ~new_field:"foo" ~url_traces:"foo/bar" ~debug:true ()
]}
As a special case, we can get the simple constructor function for {!t}
with [Env.make (fun common () -> common)] *)
end
(** A generative functor that produces a state-space that can read
configuration values from the environment, provide stateful configuration
setting and accessing operations, and a way to make a new {!t}
configuration record *)
module Env : functor () -> Env
end = struct
type t = {
debug: bool;
url_traces: string;
url_metrics: string;
url_logs: string;
headers: (string * string) list;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
self_trace: bool;
}
let pp out (self : t) : unit =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in
let {
debug;
self_trace;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
} =
self
in
Format.fprintf out
"{@[ debug=%B;@ self_trace=%B; url_traces=%S;@ url_metrics=%S;@ \
url_logs=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d @]}"
debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt
batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms
type 'k make =
?debug:bool ->
?url:string ->
?url_traces:string ->
?url_metrics:string ->
?url_logs:string ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?headers:(string * string) list ->
?batch_timeout_ms:int ->
?self_trace:bool ->
'k
module type Env = sig
val get_debug : unit -> bool
val set_debug : bool -> unit
val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit
val make : (t -> 'a) -> 'a make
end
module Env () : Env = struct
let debug_ =
ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with
| Some ("1" | "true") -> true
| _ -> false)
let get_debug () = !debug_
let set_debug b = debug_ := b
let default_url = "http://localhost:4318"
let make_get_from_env env_name =
let value = ref None in
fun () ->
match !value with
| None ->
value := Sys.getenv_opt env_name;
!value
| Some value -> Some value
let get_url_from_env = make_get_from_env "OTEL_EXPORTER_OTLP_ENDPOINT"
let get_url_traces_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
let get_url_metrics_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"
let get_url_logs_from_env =
make_get_from_env "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
let remove_trailing_slash url =
if url <> "" && String.get url (String.length url - 1) = '/' then
String.sub url 0 (String.length url - 1)
else
url
let parse_headers s =
let parse_header s =
match String.split_on_char '=' s with
| [ key; value ] -> key, value
| _ -> failwith "Unexpected format for header"
in
String.split_on_char ',' s |> List.map parse_header
let default_headers = []
let headers =
ref
(try parse_headers (Sys.getenv "OTEL_EXPORTER_OTLP_HEADERS")
with _ -> default_headers)
let get_headers () = !headers
let set_headers s = headers := s
let make k ?(debug = get_debug ()) ?url ?url_traces ?url_metrics ?url_logs
?(batch_traces = Some 400) ?(batch_metrics = Some 20)
?(batch_logs = Some 400) ?(headers = get_headers ())
?(batch_timeout_ms = 2_000) ?(self_trace = false) =
(* Ensure the state is synced, in case these values are passed in explicitly *)
set_debug debug;
set_headers headers;
let url_traces, url_metrics, url_logs =
let base_url =
let base_url =
match get_url_from_env () with
| None -> Option.value url ~default:default_url
| Some url -> remove_trailing_slash url
in
remove_trailing_slash base_url
in
let url_traces =
match get_url_traces_from_env () with
| None -> Option.value url_traces ~default:(base_url ^ "/v1/traces")
| Some url -> url
in
let url_metrics =
match get_url_metrics_from_env () with
| None -> Option.value url_metrics ~default:(base_url ^ "/v1/metrics")
| Some url -> url
in
let url_logs =
match get_url_logs_from_env () with
| None -> Option.value url_logs ~default:(base_url ^ "/v1/logs")
| Some url -> url
in
url_traces, url_metrics, url_logs
in
k
{
debug;
url_traces;
url_metrics;
url_logs;
headers;
batch_traces;
batch_metrics;
batch_logs;
batch_timeout_ms;
self_trace;
}
end
end

4
src/client/dune Normal file
View file

@ -0,0 +1,4 @@
(library
(name client)
(public_name opentelemetry.client)
(synopsis "Common types and logic shared between client implementations"))

View file

@ -1,10 +1,10 @@
open Opentelemetry_client_cohttp_lwt open Opentelemetry_client_cohttp_lwt
let test_urls ~name config = let test_urls ~name (config : Config.t) =
Printf.printf "--- %s ---\n" name; Printf.printf "--- %s ---\n" name;
Printf.printf "url_traces = %s\n" config.Config.url_traces; Printf.printf "url_traces = %s\n" config.url_traces;
Printf.printf "url_metrics = %s\n" config.Config.url_metrics; Printf.printf "url_metrics = %s\n" config.url_metrics;
Printf.printf "url_logs = %s\n" config.Config.url_logs; Printf.printf "url_logs = %s\n" config.url_logs;
print_endline "------\n" print_endline "------\n"
let default_url () = let default_url () =

View file

@ -2,9 +2,9 @@ open Opentelemetry_client_ocurl
let test_urls ~name config = let test_urls ~name config =
Printf.printf "--- %s ---\n" name; Printf.printf "--- %s ---\n" name;
Printf.printf "url_traces = %s\n" config.Config.url_traces; Printf.printf "url_traces = %s\n" config.Config.common.url_traces;
Printf.printf "url_metrics = %s\n" config.Config.url_metrics; Printf.printf "url_metrics = %s\n" config.Config.common.url_metrics;
Printf.printf "url_logs = %s\n" config.Config.url_logs; Printf.printf "url_logs = %s\n" config.Config.common.url_logs;
print_endline "------\n" print_endline "------\n"
let default_url () = let default_url () =