mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-11 13:08:35 -04:00
client ocurl: do self-tracing in cheaper way
we don't go through OTEL for self tracing as it can create regular span emission where normally there would be none (emitting a self-tracing span might create a batch, which then has to be sent after the batch timeout, and sending that one creates a new span, etc.)
This commit is contained in:
parent
bd8b483e81
commit
d4186f64f4
3 changed files with 118 additions and 15 deletions
|
|
@ -7,20 +7,39 @@ type t = {
|
||||||
batch_timeout_ms: int;
|
batch_timeout_ms: int;
|
||||||
bg_threads: int;
|
bg_threads: int;
|
||||||
ticker_thread: bool;
|
ticker_thread: bool;
|
||||||
|
self_trace: bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
let pp out self =
|
let pp out self =
|
||||||
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
|
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
|
||||||
let ppheaders = Format.pp_print_list pp_header in
|
let ppheaders = Format.pp_print_list pp_header in
|
||||||
let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } =
|
let {
|
||||||
|
debug;
|
||||||
|
url;
|
||||||
|
headers;
|
||||||
|
batch_timeout_ms;
|
||||||
|
bg_threads;
|
||||||
|
ticker_thread;
|
||||||
|
self_trace;
|
||||||
|
} =
|
||||||
self
|
self
|
||||||
in
|
in
|
||||||
Format.fprintf out
|
Format.fprintf out
|
||||||
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
|
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
|
||||||
ticker_thread=%B @]}"
|
ticker_thread=%B;@ self_trace=%B @]}"
|
||||||
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
|
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
|
||||||
|
self_trace
|
||||||
|
|
||||||
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
|
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
|
||||||
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t =
|
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true)
|
||||||
|
?(self_trace = true) () : t =
|
||||||
let bg_threads = max 2 (min bg_threads 32) in
|
let bg_threads = max 2 (min bg_threads 32) in
|
||||||
{ debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread }
|
{
|
||||||
|
debug;
|
||||||
|
url;
|
||||||
|
headers;
|
||||||
|
batch_timeout_ms;
|
||||||
|
bg_threads;
|
||||||
|
ticker_thread;
|
||||||
|
self_trace;
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,9 @@ type t = private {
|
||||||
ticker_thread: bool;
|
ticker_thread: bool;
|
||||||
(** If true, start a thread that regularly checks if signals should
|
(** If true, start a thread that regularly checks if signals should
|
||||||
be sent to the collector. Default [true] *)
|
be sent to the collector. Default [true] *)
|
||||||
|
self_trace: bool;
|
||||||
|
(** If true, the OTEL library will also emit its own spans.
|
||||||
|
@since NEXT_RELEASE *)
|
||||||
}
|
}
|
||||||
(** Configuration.
|
(** Configuration.
|
||||||
|
|
||||||
|
|
@ -31,6 +34,7 @@ val make :
|
||||||
?batch_timeout_ms:int ->
|
?batch_timeout_ms:int ->
|
||||||
?bg_threads:int ->
|
?bg_threads:int ->
|
||||||
?ticker_thread:bool ->
|
?ticker_thread:bool ->
|
||||||
|
?self_trace:bool ->
|
||||||
unit ->
|
unit ->
|
||||||
t
|
t
|
||||||
(** Make a configuration.
|
(** Make a configuration.
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
|
|
||||||
module OT = Opentelemetry
|
module OT = Opentelemetry
|
||||||
module Config = Config
|
module Config = Config
|
||||||
module Trace' = Opentelemetry.Trace
|
|
||||||
open Opentelemetry
|
open Opentelemetry
|
||||||
include Common_
|
include Common_
|
||||||
|
|
||||||
|
|
@ -18,6 +17,61 @@ let timeout_gc_metrics = Mtime.Span.(20 * s)
|
||||||
(** side channel for GC, appended to metrics batch data *)
|
(** side channel for GC, appended to metrics batch data *)
|
||||||
let gc_metrics = AList.make ()
|
let gc_metrics = AList.make ()
|
||||||
|
|
||||||
|
(** Side channel for self-tracing *)
|
||||||
|
let self_spans = AList.make ()
|
||||||
|
|
||||||
|
(** Mini tracing module that doesn't go through the
|
||||||
|
collector (and thus doesn't create new batches, etc.) *)
|
||||||
|
module Self_trace = struct
|
||||||
|
let enabled = Atomic.make true
|
||||||
|
|
||||||
|
let add_event (scope : Scope.t) ev = scope.events <- ev :: scope.events
|
||||||
|
|
||||||
|
let dummy_trace_id_ = Trace_id.create ()
|
||||||
|
|
||||||
|
let dummy_span_id = Span_id.create ()
|
||||||
|
|
||||||
|
let with_ ?kind ?(attrs = []) name f =
|
||||||
|
if Atomic.get enabled then (
|
||||||
|
let scope = Scope.get_ambient_scope () in
|
||||||
|
let parent, trace_id =
|
||||||
|
match scope with
|
||||||
|
| None -> None, Trace_id.create ()
|
||||||
|
| Some sc -> Some sc.span_id, sc.trace_id
|
||||||
|
in
|
||||||
|
let span_id = Span_id.create () in
|
||||||
|
let start_time = Timestamp_ns.now_unix_ns () in
|
||||||
|
|
||||||
|
let scope = { Scope.trace_id; span_id; events = []; attrs } in
|
||||||
|
|
||||||
|
let finally () =
|
||||||
|
let span, _ =
|
||||||
|
(* TODO: should the attrs passed to with_ go on the Span
|
||||||
|
(in Span.create) or on the ResourceSpan (in emit)?
|
||||||
|
(question also applies to Opentelemetry_lwt.Trace.with) *)
|
||||||
|
Span.create ~trace_id ?parent ?kind ~attrs:scope.attrs ~id:span_id
|
||||||
|
~start_time
|
||||||
|
~end_time:(Timestamp_ns.now_unix_ns ())
|
||||||
|
name
|
||||||
|
in
|
||||||
|
AList.add self_spans span
|
||||||
|
in
|
||||||
|
let@ () = Scope.with_ambient_scope scope in
|
||||||
|
Fun.protect ~finally (fun () -> f scope)
|
||||||
|
) else (
|
||||||
|
(* do nothing *)
|
||||||
|
let scope =
|
||||||
|
{
|
||||||
|
Scope.trace_id = dummy_trace_id_;
|
||||||
|
span_id = dummy_span_id;
|
||||||
|
attrs = [];
|
||||||
|
events = [];
|
||||||
|
}
|
||||||
|
in
|
||||||
|
f scope
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
(** capture current GC metrics if {!needs_gc_metrics} is true
|
(** capture current GC metrics if {!needs_gc_metrics} is true
|
||||||
or it has been a long time since the last GC metrics collection,
|
or it has been a long time since the last GC metrics collection,
|
||||||
and push them into {!gc_metrics} for later collection *)
|
and push them into {!gc_metrics} for later collection *)
|
||||||
|
|
@ -122,11 +176,13 @@ end = struct
|
||||||
let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit
|
let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit
|
||||||
=
|
=
|
||||||
let@ _sc =
|
let@ _sc =
|
||||||
Trace'.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
|
Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
|
||||||
in
|
in
|
||||||
|
|
||||||
let data =
|
let data =
|
||||||
let@ _sc = Trace'.with_ ~kind:Span.Span_kind_internal "encode-proto" in
|
let@ _sc =
|
||||||
|
Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto"
|
||||||
|
in
|
||||||
Pbrt.Encoder.reset encoder;
|
Pbrt.Encoder.reset encoder;
|
||||||
encode x encoder;
|
encode x encoder;
|
||||||
Pbrt.Encoder.to_string encoder
|
Pbrt.Encoder.to_string encoder
|
||||||
|
|
@ -148,16 +204,16 @@ end = struct
|
||||||
in
|
in
|
||||||
match
|
match
|
||||||
let@ _sc =
|
let@ _sc =
|
||||||
Trace'.with_ ~kind:Span.Span_kind_internal "curl.post"
|
Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post"
|
||||||
~attrs:[ "sz", `Int (String.length data) ]
|
~attrs:[ "sz", `Int (String.length data); "url", `String url ]
|
||||||
in
|
in
|
||||||
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
|
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
|
||||||
with
|
with
|
||||||
| Ok { code; _ } when code >= 200 && code < 300 -> ()
|
| Ok { code; _ } when code >= 200 && code < 300 -> ()
|
||||||
| Ok { code; body; headers = _; info = _ } ->
|
| Ok { code; body; headers = _; info = _ } ->
|
||||||
Atomic.incr n_errors;
|
Atomic.incr n_errors;
|
||||||
Trace'.add_event _sc (fun () ->
|
Self_trace.add_event _sc
|
||||||
Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]);
|
@@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ];
|
||||||
|
|
||||||
if !debug_ || config.debug then (
|
if !debug_ || config.debug then (
|
||||||
let dec = Pbrt.Decoder.of_string body in
|
let dec = Pbrt.Decoder.of_string body in
|
||||||
|
|
@ -187,6 +243,11 @@ end = struct
|
||||||
let send_logs_http ~stop ~config (client : Curl.t) encoder
|
let send_logs_http ~stop ~config (client : Curl.t) encoder
|
||||||
(l : Logs.resource_logs list list) : unit =
|
(l : Logs.resource_logs list list) : unit =
|
||||||
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
||||||
|
let@ _sp =
|
||||||
|
Self_trace.with_ ~kind:Span_kind_producer "send-logs"
|
||||||
|
~attrs:[ "n", `Int (List.length l) ]
|
||||||
|
in
|
||||||
|
|
||||||
let x =
|
let x =
|
||||||
Logs_service.default_export_logs_service_request ~resource_logs:l ()
|
Logs_service.default_export_logs_service_request ~resource_logs:l ()
|
||||||
in
|
in
|
||||||
|
|
@ -196,6 +257,11 @@ end = struct
|
||||||
let send_metrics_http ~stop ~config curl encoder
|
let send_metrics_http ~stop ~config curl encoder
|
||||||
(l : Metrics.resource_metrics list list) : unit =
|
(l : Metrics.resource_metrics list list) : unit =
|
||||||
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
||||||
|
let@ _sp =
|
||||||
|
Self_trace.with_ ~kind:Span_kind_producer "send-metrics"
|
||||||
|
~attrs:[ "n", `Int (List.length l) ]
|
||||||
|
in
|
||||||
|
|
||||||
let x =
|
let x =
|
||||||
Metrics_service.default_export_metrics_service_request ~resource_metrics:l
|
Metrics_service.default_export_metrics_service_request ~resource_metrics:l
|
||||||
()
|
()
|
||||||
|
|
@ -206,6 +272,11 @@ end = struct
|
||||||
let send_traces_http ~stop ~config curl encoder
|
let send_traces_http ~stop ~config curl encoder
|
||||||
(l : Trace.resource_spans list list) : unit =
|
(l : Trace.resource_spans list list) : unit =
|
||||||
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
|
||||||
|
let@ _sp =
|
||||||
|
Self_trace.with_ ~kind:Span_kind_producer "send-traces"
|
||||||
|
~attrs:[ "n", `Int (List.length l) ]
|
||||||
|
in
|
||||||
|
|
||||||
let x =
|
let x =
|
||||||
Trace_service.default_export_trace_service_request ~resource_spans:l ()
|
Trace_service.default_export_trace_service_request ~resource_spans:l ()
|
||||||
in
|
in
|
||||||
|
|
@ -264,8 +335,8 @@ end = struct
|
||||||
in
|
in
|
||||||
|
|
||||||
let send_metrics () =
|
let send_metrics () =
|
||||||
B_queue.push self.send_q
|
let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in
|
||||||
(To_send.Send_metric (Batch.pop_all batches.metrics))
|
B_queue.push self.send_q (To_send.Send_metric metrics)
|
||||||
in
|
in
|
||||||
|
|
||||||
let send_logs () =
|
let send_logs () =
|
||||||
|
|
@ -273,8 +344,16 @@ end = struct
|
||||||
in
|
in
|
||||||
|
|
||||||
let send_traces () =
|
let send_traces () =
|
||||||
B_queue.push self.send_q
|
let traces = Batch.pop_all batches.traces in
|
||||||
(To_send.Send_trace (Batch.pop_all batches.traces))
|
let self_spans = AList.pop_all self_spans in
|
||||||
|
let traces =
|
||||||
|
if self_spans != [] then (
|
||||||
|
let resource = Opentelemetry.Trace.make_resource_spans self_spans in
|
||||||
|
[ resource ] :: traces
|
||||||
|
) else
|
||||||
|
traces
|
||||||
|
in
|
||||||
|
B_queue.push self.send_q (To_send.Send_trace traces)
|
||||||
in
|
in
|
||||||
|
|
||||||
try
|
try
|
||||||
|
|
@ -463,6 +542,7 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
|
||||||
Opentelemetry.Collector.set_backend backend;
|
Opentelemetry.Collector.set_backend backend;
|
||||||
|
|
||||||
if config.url <> get_url () then set_url config.url;
|
if config.url <> get_url () then set_url config.url;
|
||||||
|
Atomic.set Self_trace.enabled config.self_trace;
|
||||||
|
|
||||||
if config.ticker_thread then (
|
if config.ticker_thread then (
|
||||||
let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in
|
let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue