use backoff in ocurl clients

This commit is contained in:
Simon Cruanes 2025-12-05 22:05:30 -05:00
parent 3053b20676
commit 583372ddda
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 36 additions and 24 deletions

View file

@ -12,10 +12,6 @@ let set_headers = Config.Env.set_headers
let get_headers = Config.Env.get_headers let get_headers = Config.Env.get_headers
external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
use Lwt's latest version *)
type error = Export_error.t type error = Export_error.t
(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) (* TODO: emit this in a metric in [tick()] if self tracing is enabled? *)
@ -109,21 +105,23 @@ module Consumer_impl = struct
CNotifier.delete self.notify CNotifier.delete self.notify
) )
let send_http_ (self : state) (httpc : Httpc.t) ~url (data : string) : let send_http_ (self : state) ~backoff (httpc : Httpc.t) ~url (data : string)
unit Lwt.t = : unit Lwt.t =
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
match r with match r with
| Ok () -> Lwt.return () | Ok () ->
Util_backoff.on_success backoff;
Lwt.return ()
| Error `Sysbreak -> | Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!"; Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set self.stop true; Atomic.set self.stop true;
Lwt.return () Lwt.return ()
| Error err -> | Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors; Atomic.incr n_errors;
let dur_s = Util_backoff.cur_duration_s backoff in
Util_backoff.on_error backoff;
report_err_ err; report_err_ err;
(* avoid crazy error loop *) Lwt_unix.sleep (dur_s +. Random.float (dur_s /. 10.))
Lwt_unix.sleep 3.
let send_metrics_http (st : state) client ~encoder let send_metrics_http (st : state) client ~encoder
(l : Proto.Metrics.resource_metrics list) = (l : Proto.Metrics.resource_metrics list) =
@ -147,6 +145,7 @@ module Consumer_impl = struct
let start_worker (self : state) : unit = let start_worker (self : state) : unit =
let client = Httpc.create () in let client = Httpc.create () in
let encoder = Pbrt.Encoder.create () in let encoder = Pbrt.Encoder.create () in
let backoff = Util_backoff.create () in
(* loop on [q] *) (* loop on [q] *)
let rec loop () : unit Lwt.t = let rec loop () : unit Lwt.t =
@ -159,9 +158,12 @@ module Consumer_impl = struct
shutdown self; shutdown self;
Lwt.return () Lwt.return ()
| `Empty -> CNotifier.wait self.notify | `Empty -> CNotifier.wait self.notify
| `Item (R_logs logs) -> send_logs_http self client ~encoder logs | `Item (R_logs logs) ->
| `Item (R_metrics ms) -> send_metrics_http self client ~encoder ms send_logs_http self client ~backoff ~encoder logs
| `Item (R_spans spans) -> send_traces_http self client ~encoder spans | `Item (R_metrics ms) ->
send_metrics_http self client ~encoder ~backoff ms
| `Item (R_spans spans) ->
send_traces_http self client ~encoder ~backoff spans
in in
loop () loop ()
in in
@ -255,6 +257,6 @@ let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
res) res)
(fun exn -> (fun exn ->
let* () = remove_backend () in let* () = remove_backend () in
reraise exn) Lwt.reraise exn)
) else ) else
f () f ()

View file

@ -31,11 +31,19 @@ module Consumer_impl = struct
(* wakeup sleepers *) (* wakeup sleepers *)
Util_thread.MCond.signal self.mcond Util_thread.MCond.signal self.mcond
let send_http_ (self : state) (client : Curl.t) ~url (data : string) : unit = let send_http_ (self : state) (client : Curl.t) ~backoff ~url (data : string)
: unit =
let@ _sc = let@ _sc =
Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http" Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http"
in in
(* avoid crazy error loop *)
let sleep_with_backoff () =
let dur_s = Util_backoff.cur_duration_s backoff in
Util_backoff.on_error backoff;
Thread.delay (dur_s +. Random.float (dur_s /. 10.))
in
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
(String.length data); (String.length data);
@ -50,6 +58,7 @@ module Consumer_impl = struct
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
with with
| Ok { code; _ } when code >= 200 && code < 300 -> | Ok { code; _ } when code >= 200 && code < 300 ->
Util_backoff.on_success backoff;
if Config.Env.get_debug () then if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: got response code=%d\n%!" code Printf.eprintf "opentelemetry: got response code=%d\n%!" code
| Ok { code; body; headers = _; info = _ } -> | Ok { code; body; headers = _; info = _ } ->
@ -61,20 +70,20 @@ module Consumer_impl = struct
let err = Export_error.decode_invalid_http_response ~url ~code body in let err = Export_error.decode_invalid_http_response ~url ~code body in
Export_error.report_err err; Export_error.report_err err;
() ()
) );
sleep_with_backoff ()
| exception Sys.Break -> | exception Sys.Break ->
Printf.eprintf "ctrl-c captured, stopping\n%!"; Printf.eprintf "ctrl-c captured, stopping\n%!";
shutdown self shutdown self
| Error (code, msg) -> | Error (code, msg) ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors; Atomic.incr n_errors;
Printf.eprintf Printf.eprintf
"opentelemetry: export failed:\n %s\n curl code: %s\n url: %s\n%!" "opentelemetry: export failed:\n %s\n curl code: %s\n url: %s\n%!"
msg (Curl.strerror code) url; msg (Curl.strerror code) url;
(* avoid crazy error loop *) sleep_with_backoff ()
Thread.delay 3.
(** The main loop of a thread that, reads from [bq] to get the next message to (** The main loop of a thread that, reads from [bq] to get the next message to
send via http *) send via http *)
@ -82,8 +91,9 @@ module Consumer_impl = struct
Ezcurl.with_client ?set_opts:None @@ fun client -> Ezcurl.with_client ?set_opts:None @@ fun client ->
(* we need exactly one encoder per thread *) (* we need exactly one encoder per thread *)
let encoder = Pbrt.Encoder.create ~size:2048 () in let encoder = Pbrt.Encoder.create ~size:2048 () in
let backoff = Util_backoff.create () in
let send ~name ~url ~conv (signals : _ list) : unit = let send ~name ~url ~conv ~backoff (signals : _ list) : unit =
let@ _sp = let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer name Self_trace.with_ ~kind:Span_kind_producer name
~attrs:[ "n", `Int (List.length signals) ] ~attrs:[ "n", `Int (List.length signals) ]
@ -93,7 +103,7 @@ module Consumer_impl = struct
Pbrt.Encoder.reset encoder; Pbrt.Encoder.reset encoder;
ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int);
send_http_ self client msg ~url; send_http_ self client msg ~backoff ~url;
() ()
in in
while not (Atomic.get self.stop) do while not (Atomic.get self.stop) do
@ -101,13 +111,13 @@ module Consumer_impl = struct
| `Closed -> shutdown self | `Closed -> shutdown self
| `Empty -> Util_thread.MCond.wait self.mcond | `Empty -> Util_thread.MCond.wait self.mcond
| `Item (Any_resource.R_spans tr) -> | `Item (Any_resource.R_spans tr) ->
send ~name:"send-traces" ~conv:Signal.Encode.traces send ~name:"send-traces" ~conv:Signal.Encode.traces ~backoff
~url:self.config.common.url_traces tr ~url:self.config.common.url_traces tr
| `Item (Any_resource.R_metrics ms) -> | `Item (Any_resource.R_metrics ms) ->
send ~name:"send-metrics" ~conv:Signal.Encode.metrics send ~name:"send-metrics" ~conv:Signal.Encode.metrics ~backoff
~url:self.config.common.url_metrics ms ~url:self.config.common.url_metrics ms
| `Item (Any_resource.R_logs logs) -> | `Item (Any_resource.R_logs logs) ->
send ~name:"send-logs" ~conv:Signal.Encode.logs send ~name:"send-logs" ~conv:Signal.Encode.logs ~backoff
~url:self.config.common.url_logs logs ~url:self.config.common.url_logs logs
done done