diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 5b4f0453..b078c5b2 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -257,7 +257,7 @@ module type EMITTER = sig val push_logs : Logs.resource_logs list -> unit - val set_on_tick_callbacks : (unit -> unit) list ref -> unit + val set_on_tick_callbacks : (unit -> unit) AList.t -> unit val tick : unit -> unit @@ -288,7 +288,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let batch_logs : Logs.resource_logs list Batch.t = Batch.make ?batch:config.batch_logs ?timeout () - let on_tick_cbs_ = Atomic.make (ref []) + let on_tick_cbs_ = Atomic.make (AList.make ()) let set_on_tick_callbacks = Atomic.set on_tick_cbs_ @@ -384,7 +384,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = with e -> Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) - !(Atomic.get on_tick_cbs_); + (AList.get @@ Atomic.get on_tick_cbs_); () (* thread that calls [tick()] regularly, to help enforce timeouts *) diff --git a/src/client-ocurl/config.ml b/src/client-ocurl/config.ml index 0abca7e1..2ea68fb7 100644 --- a/src/client-ocurl/config.ml +++ b/src/client-ocurl/config.ml @@ -7,20 +7,42 @@ type t = { batch_timeout_ms: int; bg_threads: int; ticker_thread: bool; + ticker_interval_ms: int; + self_trace: bool; } let pp out self = let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in let ppheaders = Format.pp_print_list pp_header in - let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } = + let { + debug; + url; + headers; + batch_timeout_ms; + bg_threads; + ticker_thread; + ticker_interval_ms; + self_trace; + } = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \ - ticker_thread=%B @]}" + ticker_thread=%B;@ ticker_interval_ms=%d;@ self_trace=%B @]}" debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread + ticker_interval_ms self_trace let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) - ?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t = - let bg_threads = max 2 (min bg_threads 32) in - { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } + ?(batch_timeout_ms = 2_000) ?(bg_threads = 4) ?(ticker_thread = true) + ?(ticker_interval_ms = 500) ?(self_trace = false) () : t = + let bg_threads = max 1 (min bg_threads 32) in + { + debug; + url; + headers; + batch_timeout_ms; + bg_threads; + ticker_thread; + ticker_interval_ms; + self_trace; + } diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 17a7711e..dfc35c08 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -12,12 +12,24 @@ type t = private { (** Number of milliseconds after which we will emit a batch, even incomplete. Note that the batch might take longer than that, because this is - only checked when a new event occurs. Default 500. *) + only checked when a new event occurs or when a tick + is emitted. Default 2_000. *) bg_threads: int; - (** Are there background threads, and how many? Default [4] *) + (** Are there background threads, and how many? Default [4]. + This will be adjusted to be at least [1] and at most [32]. *) ticker_thread: bool; (** If true, start a thread that regularly checks if signals should be sent to the collector. Default [true] *) + ticker_interval_ms: int; + (** Interval for ticker thread, in milliseconds. This is + only useful if [ticker_thread] is [true]. + This will be clamped between [2 ms] and some longer + interval (maximum [60s] currently). + Default 500. + @since NEXT_RELEASE *) + self_trace: bool; + (** If true, the OTEL library will also emit its own spans. Default [false]. + @since NEXT_RELEASE *) } (** Configuration. @@ -31,6 +43,8 @@ val make : ?batch_timeout_ms:int -> ?bg_threads:int -> ?ticker_thread:bool -> + ?ticker_interval_ms:int -> + ?self_trace:bool -> unit -> t (** Make a configuration. diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 29e11fa4..813eec55 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -17,6 +17,33 @@ let timeout_gc_metrics = Mtime.Span.(20 * s) (** side channel for GC, appended to metrics batch data *) let gc_metrics = AList.make () +(** Mini tracing module (disabled if [config.self_trace=false]) *) +module Self_trace = struct + let enabled = Atomic.make true + + let add_event (scope : Scope.t) ev = scope.events <- ev :: scope.events + + let dummy_trace_id_ = Trace_id.create () + + let dummy_span_id = Span_id.create () + + let with_ ?kind ?attrs name f = + if Atomic.get enabled then + Opentelemetry.Trace.with_ ?kind ?attrs name f + else ( + (* do nothing *) + let scope = + { + Scope.trace_id = dummy_trace_id_; + span_id = dummy_span_id; + attrs = []; + events = []; + } + in + f scope + ) +end + (** capture current GC metrics if {!needs_gc_metrics} is true or it has been a long time since the last GC metrics collection, and push them into {!gc_metrics} for later collection *) @@ -120,9 +147,19 @@ end = struct let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in + let@ _sc = + Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" + in + + let data = + let@ _sc = + Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" + in + Pbrt.Encoder.reset encoder; + encode x encoder; + Pbrt.Encoder.to_string encoder + in + let url = let url = config.Config.url in if url <> "" && String.get url (String.length url - 1) = '/' then @@ -138,11 +175,18 @@ end = struct ("Content-Type", "application/x-protobuf") :: config.headers in match + let@ _sc = + Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post" + ~attrs:[ "sz", `Int (String.length data); "url", `String url ] + in Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () with | Ok { code; _ } when code >= 200 && code < 300 -> () | Ok { code; body; headers = _; info = _ } -> Atomic.incr n_errors; + Self_trace.add_event _sc + @@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]; + if !debug_ || config.debug then ( let dec = Pbrt.Decoder.of_string body in let body = @@ -171,6 +215,11 @@ end = struct let send_logs_http ~stop ~config (client : Curl.t) encoder (l : Logs.resource_logs list list) : unit = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer "send-logs" + ~attrs:[ "n", `Int (List.length l) ] + in + let x = Logs_service.default_export_logs_service_request ~resource_logs:l () in @@ -180,6 +229,11 @@ end = struct let send_metrics_http ~stop ~config curl encoder (l : Metrics.resource_metrics list list) : unit = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer "send-metrics" + ~attrs:[ "n", `Int (List.length l) ] + in + let x = Metrics_service.default_export_metrics_service_request ~resource_metrics:l () @@ -190,6 +244,11 @@ end = struct let send_traces_http ~stop ~config curl encoder (l : Trace.resource_spans list list) : unit = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer "send-traces" + ~attrs:[ "n", `Int (List.length l) ] + in + let x = Trace_service.default_export_trace_service_request ~resource_spans:l () in @@ -248,8 +307,8 @@ end = struct in let send_metrics () = - B_queue.push self.send_q - (To_send.Send_metric (Batch.pop_all batches.metrics)) + let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in + B_queue.push self.send_q (To_send.Send_metric metrics) in let send_logs () = @@ -266,39 +325,36 @@ end = struct (* read multiple events at once *) B_queue.pop_all self.q local_q; - let now = Mtime_clock.now () in + (* are we asked to flush all events? *) + let must_flush_all = ref false in (* how to process a single event *) let process_ev (ev : Event.t) : unit = match ev with - | Event.E_metric m -> - Batch.push batches.metrics m; - if should_send_batch_ ~config ~now batches.metrics then - send_metrics () - | Event.E_trace tr -> - Batch.push batches.traces tr; - if should_send_batch_ ~config ~now batches.traces then - send_traces () - | Event.E_logs logs -> - Batch.push batches.logs logs; - if should_send_batch_ ~config ~now batches.logs then send_logs () + | Event.E_metric m -> Batch.push batches.metrics m + | Event.E_trace tr -> Batch.push batches.traces tr + | Event.E_logs logs -> Batch.push batches.logs logs | Event.E_tick -> - (* check for batches whose timeout expired *) - if should_send_batch_ ~config ~now batches.metrics then - send_metrics (); - if should_send_batch_ ~config ~now batches.logs then send_logs (); - if should_send_batch_ ~config ~now batches.traces then - send_traces () - | Event.E_flush_all -> - if Batch.len batches.metrics > 0 then send_metrics (); - if Batch.len batches.logs > 0 then send_logs (); - if Batch.len batches.traces > 0 then send_traces () + (* the only impact of "tick" is that it wakes us up regularly *) + () + | Event.E_flush_all -> must_flush_all := true in - while not (Queue.is_empty local_q) do - let ev = Queue.pop local_q in - process_ev ev - done + Queue.iter process_ev local_q; + Queue.clear local_q; + + if !must_flush_all then ( + if Batch.len batches.metrics > 0 then send_metrics (); + if Batch.len batches.logs > 0 then send_logs (); + if Batch.len batches.traces > 0 then send_traces () + ) else ( + let now = Mtime_clock.now () in + if should_send_batch_ ~config ~now batches.metrics then + send_metrics (); + + if should_send_batch_ ~config ~now batches.traces then send_traces (); + if should_send_batch_ ~config ~now batches.logs then send_logs () + ) done with B_queue.Closed -> () @@ -418,15 +474,14 @@ let create_backend ?(stop = Atomic.make false) ret ()); } - let on_tick_cbs_ = Atomic.make (ref []) + let on_tick_cbs_ = Atomic.make (AList.make ()) let set_on_tick_callbacks = Atomic.set on_tick_cbs_ let tick () = sample_gc_metrics_if_needed (); Backend_impl.send_event backend Event.E_tick; - let l = Atomic.get on_tick_cbs_ in - List.iter (fun f -> f ()) !l + List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_) let cleanup () = Backend_impl.shutdown backend end in @@ -451,9 +506,11 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () Opentelemetry.Collector.set_backend backend; if config.url <> get_url () then set_url config.url; + Atomic.set Self_trace.enabled config.self_trace; if config.ticker_thread then ( - let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in + (* at most a minute *) + let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) ); diff --git a/src/client-ocurl/AList.ml b/src/core/AList.ml similarity index 94% rename from src/client-ocurl/AList.ml rename to src/core/AList.ml index 9b7d47df..faec8000 100644 --- a/src/client-ocurl/AList.ml +++ b/src/core/AList.ml @@ -4,6 +4,8 @@ type 'a t = 'a list Atomic.t let make () = Atomic.make [] +let get = Atomic.get + let add self x = while let old = Atomic.get self in diff --git a/src/client-ocurl/AList.mli b/src/core/AList.mli similarity index 72% rename from src/client-ocurl/AList.mli rename to src/core/AList.mli index b4c718dd..4799dd6b 100644 --- a/src/client-ocurl/AList.mli +++ b/src/core/AList.mli @@ -2,6 +2,9 @@ type 'a t +val get : 'a t -> 'a list +(** Snapshot *) + val make : unit -> 'a t val add : 'a t -> 'a -> unit diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index cc5186e2..e8568863 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -6,6 +6,10 @@ module Lock = Lock module Rand_bytes = Rand_bytes (** Generation of random identifiers. *) +module AList = AList +(** Atomic list, for internal usage + @since NEXT_RELEASE *) + open struct let[@inline] result_bind x f = match x with @@ -82,7 +86,7 @@ module Collector = struct (** Should be called regularly for background processing, timeout checks, etc. *) - val set_on_tick_callbacks : (unit -> unit) list ref -> unit + val set_on_tick_callbacks : (unit -> unit) AList.t -> unit (** Give the collector the list of callbacks to be executed when [tick()] is called. Each such callback should be short and reentrant. Depending on the collector's implementation, it might be @@ -158,7 +162,7 @@ module Collector = struct (* hidden *) open struct - let on_tick_cbs_ = ref [] + let on_tick_cbs_ = AList.make () let backend : backend option ref = ref None end @@ -194,7 +198,7 @@ module Collector = struct let[@inline] rand_bytes_8 () = !Rand_bytes.rand_bytes_8 () - let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_ + let[@inline] on_tick f = AList.add on_tick_cbs_ f (** Do background work. Call this regularly if the collector doesn't already have a ticker thread or internal timer. *) @@ -1134,7 +1138,8 @@ end These metrics are emitted after each GC collection. *) module GC_metrics : sig val basic_setup : unit -> unit - (** Setup a hook that will emit GC statistics regularly *) + (** Setup a hook that will emit GC statistics on every tick (assuming + a ticker thread) *) val get_runtime_attributes : unit -> Span.key_value list (** Get OCaml name and version runtime attributes *) @@ -1154,13 +1159,12 @@ end = struct let get_runtime_attributes () = Lazy.force runtime_attributes let basic_setup () = - (* emit metrics when GC is called *) - let on_gc () = + let on_tick () = match Collector.get_backend () with | None -> () | Some (module C) -> C.signal_emit_gc_metrics () in - ignore (Gc.create_alarm on_gc : Gc.alarm) + Collector.on_tick on_tick let bytes_per_word = Sys.word_size / 8 diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 179ddc78..cf40e2ef 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -124,7 +124,7 @@ let () = Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; let config = - Opentelemetry_client_ocurl.Config.make ~debug:!debug + Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true ?bg_threads: (let n = !n_bg_threads in if n = 0 then