From 1a0ba5fc9e8de9693dc7ee0d376507419dc3c8f6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 5 Dec 2025 22:05:30 -0500 Subject: [PATCH] use backoff in ocurl clients --- .../opentelemetry_client_ocurl_lwt.ml | 30 ++++++++++--------- .../opentelemetry_client_ocurl.ml | 30 ++++++++++++------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 720f2dd7..4617754d 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -12,10 +12,6 @@ let set_headers = Config.Env.set_headers let get_headers = Config.Env.get_headers -external reraise : exn -> 'a = "%reraise" -(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to - use Lwt's latest version *) - type error = Export_error.t (* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) @@ -109,21 +105,23 @@ module Consumer_impl = struct CNotifier.delete self.notify ) - let send_http_ (self : state) (httpc : Httpc.t) ~url (data : string) : - unit Lwt.t = + let send_http_ (self : state) ~backoff (httpc : Httpc.t) ~url (data : string) + : unit Lwt.t = let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with - | Ok () -> Lwt.return () + | Ok () -> + Util_backoff.on_success backoff; + Lwt.return () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; Atomic.set self.stop true; Lwt.return () | Error err -> - (* TODO: log error _via_ otel? *) Atomic.incr n_errors; + let dur_s = Util_backoff.cur_duration_s backoff in + Util_backoff.on_error backoff; report_err_ err; - (* avoid crazy error loop *) - Lwt_unix.sleep 3. + Lwt_unix.sleep (dur_s +. Random.float (dur_s /. 10.)) let send_metrics_http (st : state) client ~encoder (l : Proto.Metrics.resource_metrics list) = @@ -147,6 +145,7 @@ module Consumer_impl = struct let start_worker (self : state) : unit = let client = Httpc.create () in let encoder = Pbrt.Encoder.create () in + let backoff = Util_backoff.create () in (* loop on [q] *) let rec loop () : unit Lwt.t = @@ -159,9 +158,12 @@ module Consumer_impl = struct shutdown self; Lwt.return () | `Empty -> CNotifier.wait self.notify - | `Item (R_logs logs) -> send_logs_http self client ~encoder logs - | `Item (R_metrics ms) -> send_metrics_http self client ~encoder ms - | `Item (R_spans spans) -> send_traces_http self client ~encoder spans + | `Item (R_logs logs) -> + send_logs_http self client ~backoff ~encoder logs + | `Item (R_metrics ms) -> + send_metrics_http self client ~encoder ~backoff ms + | `Item (R_spans spans) -> + send_traces_http self client ~encoder ~backoff spans in loop () in @@ -255,6 +257,6 @@ let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t res) (fun exn -> let* () = remove_backend () in - reraise exn) + Lwt.reraise exn) ) else f () diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 5db14538..edba7a0a 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -31,11 +31,19 @@ module Consumer_impl = struct (* wakeup sleepers *) Util_thread.MCond.signal self.mcond - let send_http_ (self : state) (client : Curl.t) ~url (data : string) : unit = + let send_http_ (self : state) (client : Curl.t) ~backoff ~url (data : string) + : unit = let@ _sc = Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http" in + (* avoid crazy error loop *) + let sleep_with_backoff () = + let dur_s = Util_backoff.cur_duration_s backoff in + Util_backoff.on_error backoff; + Thread.delay (dur_s +. Random.float (dur_s /. 10.)) + in + if Config.Env.get_debug () then Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url (String.length data); @@ -50,6 +58,7 @@ module Consumer_impl = struct Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () with | Ok { code; _ } when code >= 200 && code < 300 -> + Util_backoff.on_success backoff; if Config.Env.get_debug () then Printf.eprintf "opentelemetry: got response code=%d\n%!" code | Ok { code; body; headers = _; info = _ } -> @@ -61,20 +70,20 @@ module Consumer_impl = struct let err = Export_error.decode_invalid_http_response ~url ~code body in Export_error.report_err err; () - ) + ); + + sleep_with_backoff () | exception Sys.Break -> Printf.eprintf "ctrl-c captured, stopping\n%!"; shutdown self | Error (code, msg) -> - (* TODO: log error _via_ otel? *) Atomic.incr n_errors; Printf.eprintf "opentelemetry: export failed:\n %s\n curl code: %s\n url: %s\n%!" msg (Curl.strerror code) url; - (* avoid crazy error loop *) - Thread.delay 3. + sleep_with_backoff () (** The main loop of a thread that, reads from [bq] to get the next message to send via http *) @@ -82,8 +91,9 @@ module Consumer_impl = struct Ezcurl.with_client ?set_opts:None @@ fun client -> (* we need exactly one encoder per thread *) let encoder = Pbrt.Encoder.create ~size:2048 () in + let backoff = Util_backoff.create () in - let send ~name ~url ~conv (signals : _ list) : unit = + let send ~name ~url ~conv ~backoff (signals : _ list) : unit = let@ _sp = Self_trace.with_ ~kind:Span_kind_producer name ~attrs:[ "n", `Int (List.length signals) ] @@ -93,7 +103,7 @@ module Consumer_impl = struct Pbrt.Encoder.reset encoder; ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); - send_http_ self client msg ~url; + send_http_ self client msg ~backoff ~url; () in while not (Atomic.get self.stop) do @@ -101,13 +111,13 @@ module Consumer_impl = struct | `Closed -> shutdown self | `Empty -> Util_thread.MCond.wait self.mcond | `Item (Any_resource.R_spans tr) -> - send ~name:"send-traces" ~conv:Signal.Encode.traces + send ~name:"send-traces" ~conv:Signal.Encode.traces ~backoff ~url:self.config.common.url_traces tr | `Item (Any_resource.R_metrics ms) -> - send ~name:"send-metrics" ~conv:Signal.Encode.metrics + send ~name:"send-metrics" ~conv:Signal.Encode.metrics ~backoff ~url:self.config.common.url_metrics ms | `Item (Any_resource.R_logs logs) -> - send ~name:"send-logs" ~conv:Signal.Encode.logs + send ~name:"send-logs" ~conv:Signal.Encode.logs ~backoff ~url:self.config.common.url_logs logs done