From d4186f64f4321600b0fc2087e3833a58eefc18a5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 20 Dec 2023 15:47:37 -0500 Subject: [PATCH] client ocurl: do self-tracing in cheaper way we don't go through OTEL for self tracing as it can create regular span emission where normally there would be none (emitting a self-tracing span might create a batch, which then has to be sent after the batch timeout, and sending that one creates a new span, etc.) --- src/client-ocurl/config.ml | 27 ++++- src/client-ocurl/config.mli | 4 + .../opentelemetry_client_ocurl.ml | 102 ++++++++++++++++-- 3 files changed, 118 insertions(+), 15 deletions(-) diff --git a/src/client-ocurl/config.ml b/src/client-ocurl/config.ml index 0abca7e1..9ae54fc3 100644 --- a/src/client-ocurl/config.ml +++ b/src/client-ocurl/config.ml @@ -7,20 +7,39 @@ type t = { batch_timeout_ms: int; bg_threads: int; ticker_thread: bool; + self_trace: bool; } let pp out self = let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in let ppheaders = Format.pp_print_list pp_header in - let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } = + let { + debug; + url; + headers; + batch_timeout_ms; + bg_threads; + ticker_thread; + self_trace; + } = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \ - ticker_thread=%B @]}" + ticker_thread=%B;@ self_trace=%B @]}" debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread + self_trace let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) - ?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t = + ?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) + ?(self_trace = true) () : t = let bg_threads = max 2 (min bg_threads 32) in - { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } + { + debug; + url; + headers; + batch_timeout_ms; + bg_threads; + ticker_thread; + self_trace; + } diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 17a7711e..91f6508d 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -18,6 +18,9 @@ type t = private { ticker_thread: bool; (** If true, start a thread that regularly checks if signals should be sent to the collector. Default [true] *) + self_trace: bool; + (** If true, the OTEL library will also emit its own spans. + @since NEXT_RELEASE *) } (** Configuration. @@ -31,6 +34,7 @@ val make : ?batch_timeout_ms:int -> ?bg_threads:int -> ?ticker_thread:bool -> + ?self_trace:bool -> unit -> t (** Make a configuration. diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 41029e8b..de3fc819 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -5,7 +5,6 @@ module OT = Opentelemetry module Config = Config -module Trace' = Opentelemetry.Trace open Opentelemetry include Common_ @@ -18,6 +17,61 @@ let timeout_gc_metrics = Mtime.Span.(20 * s) (** side channel for GC, appended to metrics batch data *) let gc_metrics = AList.make () +(** Side channel for self-tracing *) +let self_spans = AList.make () + +(** Mini tracing module that doesn't go through the + collector (and thus doesn't create new batches, etc.) *) +module Self_trace = struct + let enabled = Atomic.make true + + let add_event (scope : Scope.t) ev = scope.events <- ev :: scope.events + + let dummy_trace_id_ = Trace_id.create () + + let dummy_span_id = Span_id.create () + + let with_ ?kind ?(attrs = []) name f = + if Atomic.get enabled then ( + let scope = Scope.get_ambient_scope () in + let parent, trace_id = + match scope with + | None -> None, Trace_id.create () + | Some sc -> Some sc.span_id, sc.trace_id + in + let span_id = Span_id.create () in + let start_time = Timestamp_ns.now_unix_ns () in + + let scope = { Scope.trace_id; span_id; events = []; attrs } in + + let finally () = + let span, _ = + (* TODO: should the attrs passed to with_ go on the Span + (in Span.create) or on the ResourceSpan (in emit)? + (question also applies to Opentelemetry_lwt.Trace.with) *) + Span.create ~trace_id ?parent ?kind ~attrs:scope.attrs ~id:span_id + ~start_time + ~end_time:(Timestamp_ns.now_unix_ns ()) + name + in + AList.add self_spans span + in + let@ () = Scope.with_ambient_scope scope in + Fun.protect ~finally (fun () -> f scope) + ) else ( + (* do nothing *) + let scope = + { + Scope.trace_id = dummy_trace_id_; + span_id = dummy_span_id; + attrs = []; + events = []; + } + in + f scope + ) +end + (** capture current GC metrics if {!needs_gc_metrics} is true or it has been a long time since the last GC metrics collection, and push them into {!gc_metrics} for later collection *) @@ -122,11 +176,13 @@ end = struct let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit = let@ _sc = - Trace'.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" + Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" in let data = - let@ _sc = Trace'.with_ ~kind:Span.Span_kind_internal "encode-proto" in + let@ _sc = + Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" + in Pbrt.Encoder.reset encoder; encode x encoder; Pbrt.Encoder.to_string encoder @@ -148,16 +204,16 @@ end = struct in match let@ _sc = - Trace'.with_ ~kind:Span.Span_kind_internal "curl.post" - ~attrs:[ "sz", `Int (String.length data) ] + Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post" + ~attrs:[ "sz", `Int (String.length data); "url", `String url ] in Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () with | Ok { code; _ } when code >= 200 && code < 300 -> () | Ok { code; body; headers = _; info = _ } -> Atomic.incr n_errors; - Trace'.add_event _sc (fun () -> - Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]); + Self_trace.add_event _sc + @@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]; if !debug_ || config.debug then ( let dec = Pbrt.Decoder.of_string body in @@ -187,6 +243,11 @@ end = struct let send_logs_http ~stop ~config (client : Curl.t) encoder (l : Logs.resource_logs list list) : unit = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer "send-logs" + ~attrs:[ "n", `Int (List.length l) ] + in + let x = Logs_service.default_export_logs_service_request ~resource_logs:l () in @@ -196,6 +257,11 @@ end = struct let send_metrics_http ~stop ~config curl encoder (l : Metrics.resource_metrics list list) : unit = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer "send-metrics" + ~attrs:[ "n", `Int (List.length l) ] + in + let x = Metrics_service.default_export_metrics_service_request ~resource_metrics:l () @@ -206,6 +272,11 @@ end = struct let send_traces_http ~stop ~config curl encoder (l : Trace.resource_spans list list) : unit = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer "send-traces" + ~attrs:[ "n", `Int (List.length l) ] + in + let x = Trace_service.default_export_trace_service_request ~resource_spans:l () in @@ -264,8 +335,8 @@ end = struct in let send_metrics () = - B_queue.push self.send_q - (To_send.Send_metric (Batch.pop_all batches.metrics)) + let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in + B_queue.push self.send_q (To_send.Send_metric metrics) in let send_logs () = @@ -273,8 +344,16 @@ end = struct in let send_traces () = - B_queue.push self.send_q - (To_send.Send_trace (Batch.pop_all batches.traces)) + let traces = Batch.pop_all batches.traces in + let self_spans = AList.pop_all self_spans in + let traces = + if self_spans != [] then ( + let resource = Opentelemetry.Trace.make_resource_spans self_spans in + [ resource ] :: traces + ) else + traces + in + B_queue.push self.send_q (To_send.Send_trace traces) in try @@ -463,6 +542,7 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () Opentelemetry.Collector.set_backend backend; if config.url <> get_url () then set_url config.url; + Atomic.set Self_trace.enabled config.self_trace; if config.ticker_thread then ( let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in