ocurl: uniformize debug; implement missing bits; add max batch size

This commit is contained in:
Simon Cruanes 2023-06-20 10:31:41 -04:00
parent 77e763b336
commit 6e198207dc
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -115,6 +115,9 @@ end = struct
encode x encoder; encode x encoder;
let data = Pbrt.Encoder.to_string encoder in let data = Pbrt.Encoder.to_string encoder in
let url = config.Config.url ^ path in let url = config.Config.url ^ path in
if !debug_ || config.debug then
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
(String.length data);
match match
Ezcurl.post ~headers:config.headers ~client ~params:[] ~url Ezcurl.post ~headers:config.headers ~client ~params:[] ~url
~content:(`String data) () ~content:(`String data) ()
@ -122,7 +125,7 @@ end = struct
| Ok { code; _ } when code >= 200 && code < 300 -> () | Ok { code; _ } when code >= 200 && code < 300 -> ()
| Ok { code; body; headers = _; info = _ } -> | Ok { code; body; headers = _; info = _ } ->
Atomic.incr n_errors; Atomic.incr n_errors;
if config.debug then ( if !debug_ || config.debug then (
let dec = Pbrt.Decoder.of_string body in let dec = Pbrt.Decoder.of_string body in
let body = let body =
try try
@ -180,7 +183,7 @@ end = struct
(** Thread that, in a loop, reads from [q] to get the (** Thread that, in a loop, reads from [q] to get the
next message to send via http *) next message to send via http *)
let bg_thread_loop (self : t) : unit = let bg_thread_loop (self : t) : unit =
Ezcurl.with_client @@ fun client -> Ezcurl.with_client ?set_opts:None @@ fun client ->
let stop = self.stop in let stop = self.stop in
let config = self.config in let config = self.config in
let encoder = Pbrt.Encoder.create () in let encoder = Pbrt.Encoder.create () in
@ -203,12 +206,15 @@ end = struct
metrics: Proto.Metrics.resource_metrics Batch.t; metrics: Proto.Metrics.resource_metrics Batch.t;
} }
let batch_timeout_expired_ ~config ~now (b : _ Batch.t) : bool = let batch_max_size_ = 200
let should_send_batch_ ~config ~now (b : _ Batch.t) : bool =
Batch.len b > 0 Batch.len b > 0
&& && (Batch.len b >= batch_max_size_
||
let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in
let elapsed = Mtime.span now (Batch.time_started b) in let elapsed = Mtime.span now (Batch.time_started b) in
Mtime.Span.compare elapsed timeout >= 0 Mtime.Span.compare elapsed timeout >= 0)
let main_thread_loop (self : t) : unit = let main_thread_loop (self : t) : unit =
let local_q = Queue.create () in let local_q = Queue.create () in
@ -242,18 +248,28 @@ end = struct
(* read multiple events at once *) (* read multiple events at once *)
B_queue.pop_all self.q local_q; B_queue.pop_all self.q local_q;
let now = Mtime_clock.now () in
(* how to process a single event *) (* how to process a single event *)
let process_ev (ev : Event.t) : unit = let process_ev (ev : Event.t) : unit =
match ev with match ev with
| Event.E_metric _ | Event.E_trace _ | Event.E_logs _ -> () | Event.E_metric m ->
Batch.push batches.metrics m;
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ()
| Event.E_trace tr ->
Batch.push batches.traces tr;
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_logs logs ->
Batch.push batches.logs logs;
if should_send_batch_ ~config ~now batches.logs then send_logs ()
| Event.E_tick -> | Event.E_tick ->
(* check for batches whose timeout expired *) (* check for batches whose timeout expired *)
let now = Mtime_clock.now () in if should_send_batch_ ~config ~now batches.metrics then
if batch_timeout_expired_ ~config ~now batches.metrics then
send_metrics (); send_metrics ();
if batch_timeout_expired_ ~config ~now batches.logs then if should_send_batch_ ~config ~now batches.logs then send_logs ();
send_logs (); if should_send_batch_ ~config ~now batches.traces then
if batch_timeout_expired_ ~config ~now batches.traces then
send_traces () send_traces ()
| Event.E_flush_all -> | Event.E_flush_all ->
if Batch.len batches.metrics > 0 then send_metrics (); if Batch.len batches.metrics > 0 then send_metrics ();
@ -328,7 +344,7 @@ let mk_backend ~stop ~config () : (module Collector.BACKEND) =
let timeout_sent_metrics = Mtime.Span.(5 * s) let timeout_sent_metrics = Mtime.Span.(5 * s)
let signal_emit_gc_metrics () = let signal_emit_gc_metrics () =
if !debug_ then if !debug_ || config.debug then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true Atomic.set needs_gc_metrics true
@ -398,7 +414,6 @@ let mk_backend ~stop ~config () : (module Collector.BACKEND) =
(module M) (module M)
let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () = let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
debug_ := config.debug;
let ((module B) as backend) = mk_backend ~stop ~config () in let ((module B) as backend) = mk_backend ~stop ~config () in
Opentelemetry.Collector.set_backend backend; Opentelemetry.Collector.set_backend backend;
B.cleanup B.cleanup