From 6e198207dc08e4deb350feabf45ec2682cf73275 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 20 Jun 2023 10:31:41 -0400 Subject: [PATCH] ocurl: uniformize `debug`; implement missing bits; add max batch size --- .../opentelemetry_client_ocurl.ml | 45 ++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 00aea025..81319741 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -115,6 +115,9 @@ end = struct encode x encoder; let data = Pbrt.Encoder.to_string encoder in let url = config.Config.url ^ path in + if !debug_ || config.debug then + Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url + (String.length data); match Ezcurl.post ~headers:config.headers ~client ~params:[] ~url ~content:(`String data) () @@ -122,7 +125,7 @@ end = struct | Ok { code; _ } when code >= 200 && code < 300 -> () | Ok { code; body; headers = _; info = _ } -> Atomic.incr n_errors; - if config.debug then ( + if !debug_ || config.debug then ( let dec = Pbrt.Decoder.of_string body in let body = try @@ -180,7 +183,7 @@ end = struct (** Thread that, in a loop, reads from [q] to get the next message to send via http *) let bg_thread_loop (self : t) : unit = - Ezcurl.with_client @@ fun client -> + Ezcurl.with_client ?set_opts:None @@ fun client -> let stop = self.stop in let config = self.config in let encoder = Pbrt.Encoder.create () in @@ -203,12 +206,15 @@ end = struct metrics: Proto.Metrics.resource_metrics Batch.t; } - let batch_timeout_expired_ ~config ~now (b : _ Batch.t) : bool = + let batch_max_size_ = 200 + + let should_send_batch_ ~config ~now (b : _ Batch.t) : bool = Batch.len b > 0 - && - let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in - let elapsed = Mtime.span now (Batch.time_started b) in - Mtime.Span.compare elapsed timeout >= 0 + && (Batch.len b >= batch_max_size_ + || + let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in + let elapsed = Mtime.span now (Batch.time_started b) in + Mtime.Span.compare elapsed timeout >= 0) let main_thread_loop (self : t) : unit = let local_q = Queue.create () in @@ -242,18 +248,28 @@ end = struct (* read multiple events at once *) B_queue.pop_all self.q local_q; + let now = Mtime_clock.now () in + (* how to process a single event *) let process_ev (ev : Event.t) : unit = match ev with - | Event.E_metric _ | Event.E_trace _ | Event.E_logs _ -> () + | Event.E_metric m -> + Batch.push batches.metrics m; + if should_send_batch_ ~config ~now batches.metrics then + send_metrics () + | Event.E_trace tr -> + Batch.push batches.traces tr; + if should_send_batch_ ~config ~now batches.traces then + send_traces () + | Event.E_logs logs -> + Batch.push batches.logs logs; + if should_send_batch_ ~config ~now batches.logs then send_logs () | Event.E_tick -> (* check for batches whose timeout expired *) - let now = Mtime_clock.now () in - if batch_timeout_expired_ ~config ~now batches.metrics then + if should_send_batch_ ~config ~now batches.metrics then send_metrics (); - if batch_timeout_expired_ ~config ~now batches.logs then - send_logs (); - if batch_timeout_expired_ ~config ~now batches.traces then + if should_send_batch_ ~config ~now batches.logs then send_logs (); + if should_send_batch_ ~config ~now batches.traces then send_traces () | Event.E_flush_all -> if Batch.len batches.metrics > 0 then send_metrics (); @@ -328,7 +344,7 @@ let mk_backend ~stop ~config () : (module Collector.BACKEND) = let timeout_sent_metrics = Mtime.Span.(5 * s) let signal_emit_gc_metrics () = - if !debug_ then + if !debug_ || config.debug then Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Atomic.set needs_gc_metrics true @@ -398,7 +414,6 @@ let mk_backend ~stop ~config () : (module Collector.BACKEND) = (module M) let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () = - debug_ := config.debug; let ((module B) as backend) = mk_backend ~stop ~config () in Opentelemetry.Collector.set_backend backend; B.cleanup