mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
use backoff in ocurl clients
This commit is contained in:
parent
64c7125838
commit
1a0ba5fc9e
2 changed files with 36 additions and 24 deletions
|
|
@ -12,10 +12,6 @@ let set_headers = Config.Env.set_headers
|
||||||
|
|
||||||
let get_headers = Config.Env.get_headers
|
let get_headers = Config.Env.get_headers
|
||||||
|
|
||||||
external reraise : exn -> 'a = "%reraise"
|
|
||||||
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
|
|
||||||
use Lwt's latest version *)
|
|
||||||
|
|
||||||
type error = Export_error.t
|
type error = Export_error.t
|
||||||
|
|
||||||
(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *)
|
(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *)
|
||||||
|
|
@ -109,21 +105,23 @@ module Consumer_impl = struct
|
||||||
CNotifier.delete self.notify
|
CNotifier.delete self.notify
|
||||||
)
|
)
|
||||||
|
|
||||||
let send_http_ (self : state) (httpc : Httpc.t) ~url (data : string) :
|
let send_http_ (self : state) ~backoff (httpc : Httpc.t) ~url (data : string)
|
||||||
unit Lwt.t =
|
: unit Lwt.t =
|
||||||
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
|
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
|
||||||
match r with
|
match r with
|
||||||
| Ok () -> Lwt.return ()
|
| Ok () ->
|
||||||
|
Util_backoff.on_success backoff;
|
||||||
|
Lwt.return ()
|
||||||
| Error `Sysbreak ->
|
| Error `Sysbreak ->
|
||||||
Printf.eprintf "ctrl-c captured, stopping\n%!";
|
Printf.eprintf "ctrl-c captured, stopping\n%!";
|
||||||
Atomic.set self.stop true;
|
Atomic.set self.stop true;
|
||||||
Lwt.return ()
|
Lwt.return ()
|
||||||
| Error err ->
|
| Error err ->
|
||||||
(* TODO: log error _via_ otel? *)
|
|
||||||
Atomic.incr n_errors;
|
Atomic.incr n_errors;
|
||||||
|
let dur_s = Util_backoff.cur_duration_s backoff in
|
||||||
|
Util_backoff.on_error backoff;
|
||||||
report_err_ err;
|
report_err_ err;
|
||||||
(* avoid crazy error loop *)
|
Lwt_unix.sleep (dur_s +. Random.float (dur_s /. 10.))
|
||||||
Lwt_unix.sleep 3.
|
|
||||||
|
|
||||||
let send_metrics_http (st : state) client ~encoder
|
let send_metrics_http (st : state) client ~encoder
|
||||||
(l : Proto.Metrics.resource_metrics list) =
|
(l : Proto.Metrics.resource_metrics list) =
|
||||||
|
|
@ -147,6 +145,7 @@ module Consumer_impl = struct
|
||||||
let start_worker (self : state) : unit =
|
let start_worker (self : state) : unit =
|
||||||
let client = Httpc.create () in
|
let client = Httpc.create () in
|
||||||
let encoder = Pbrt.Encoder.create () in
|
let encoder = Pbrt.Encoder.create () in
|
||||||
|
let backoff = Util_backoff.create () in
|
||||||
|
|
||||||
(* loop on [q] *)
|
(* loop on [q] *)
|
||||||
let rec loop () : unit Lwt.t =
|
let rec loop () : unit Lwt.t =
|
||||||
|
|
@ -159,9 +158,12 @@ module Consumer_impl = struct
|
||||||
shutdown self;
|
shutdown self;
|
||||||
Lwt.return ()
|
Lwt.return ()
|
||||||
| `Empty -> CNotifier.wait self.notify
|
| `Empty -> CNotifier.wait self.notify
|
||||||
| `Item (R_logs logs) -> send_logs_http self client ~encoder logs
|
| `Item (R_logs logs) ->
|
||||||
| `Item (R_metrics ms) -> send_metrics_http self client ~encoder ms
|
send_logs_http self client ~backoff ~encoder logs
|
||||||
| `Item (R_spans spans) -> send_traces_http self client ~encoder spans
|
| `Item (R_metrics ms) ->
|
||||||
|
send_metrics_http self client ~encoder ~backoff ms
|
||||||
|
| `Item (R_spans spans) ->
|
||||||
|
send_traces_http self client ~encoder ~backoff spans
|
||||||
in
|
in
|
||||||
loop ()
|
loop ()
|
||||||
in
|
in
|
||||||
|
|
@ -255,6 +257,6 @@ let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
|
||||||
res)
|
res)
|
||||||
(fun exn ->
|
(fun exn ->
|
||||||
let* () = remove_backend () in
|
let* () = remove_backend () in
|
||||||
reraise exn)
|
Lwt.reraise exn)
|
||||||
) else
|
) else
|
||||||
f ()
|
f ()
|
||||||
|
|
|
||||||
|
|
@ -31,11 +31,19 @@ module Consumer_impl = struct
|
||||||
(* wakeup sleepers *)
|
(* wakeup sleepers *)
|
||||||
Util_thread.MCond.signal self.mcond
|
Util_thread.MCond.signal self.mcond
|
||||||
|
|
||||||
let send_http_ (self : state) (client : Curl.t) ~url (data : string) : unit =
|
let send_http_ (self : state) (client : Curl.t) ~backoff ~url (data : string)
|
||||||
|
: unit =
|
||||||
let@ _sc =
|
let@ _sc =
|
||||||
Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http"
|
Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http"
|
||||||
in
|
in
|
||||||
|
|
||||||
|
(* avoid crazy error loop *)
|
||||||
|
let sleep_with_backoff () =
|
||||||
|
let dur_s = Util_backoff.cur_duration_s backoff in
|
||||||
|
Util_backoff.on_error backoff;
|
||||||
|
Thread.delay (dur_s +. Random.float (dur_s /. 10.))
|
||||||
|
in
|
||||||
|
|
||||||
if Config.Env.get_debug () then
|
if Config.Env.get_debug () then
|
||||||
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
|
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
|
||||||
(String.length data);
|
(String.length data);
|
||||||
|
|
@ -50,6 +58,7 @@ module Consumer_impl = struct
|
||||||
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
|
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
|
||||||
with
|
with
|
||||||
| Ok { code; _ } when code >= 200 && code < 300 ->
|
| Ok { code; _ } when code >= 200 && code < 300 ->
|
||||||
|
Util_backoff.on_success backoff;
|
||||||
if Config.Env.get_debug () then
|
if Config.Env.get_debug () then
|
||||||
Printf.eprintf "opentelemetry: got response code=%d\n%!" code
|
Printf.eprintf "opentelemetry: got response code=%d\n%!" code
|
||||||
| Ok { code; body; headers = _; info = _ } ->
|
| Ok { code; body; headers = _; info = _ } ->
|
||||||
|
|
@ -61,20 +70,20 @@ module Consumer_impl = struct
|
||||||
let err = Export_error.decode_invalid_http_response ~url ~code body in
|
let err = Export_error.decode_invalid_http_response ~url ~code body in
|
||||||
Export_error.report_err err;
|
Export_error.report_err err;
|
||||||
()
|
()
|
||||||
)
|
);
|
||||||
|
|
||||||
|
sleep_with_backoff ()
|
||||||
| exception Sys.Break ->
|
| exception Sys.Break ->
|
||||||
Printf.eprintf "ctrl-c captured, stopping\n%!";
|
Printf.eprintf "ctrl-c captured, stopping\n%!";
|
||||||
shutdown self
|
shutdown self
|
||||||
| Error (code, msg) ->
|
| Error (code, msg) ->
|
||||||
(* TODO: log error _via_ otel? *)
|
|
||||||
Atomic.incr n_errors;
|
Atomic.incr n_errors;
|
||||||
|
|
||||||
Printf.eprintf
|
Printf.eprintf
|
||||||
"opentelemetry: export failed:\n %s\n curl code: %s\n url: %s\n%!"
|
"opentelemetry: export failed:\n %s\n curl code: %s\n url: %s\n%!"
|
||||||
msg (Curl.strerror code) url;
|
msg (Curl.strerror code) url;
|
||||||
|
|
||||||
(* avoid crazy error loop *)
|
sleep_with_backoff ()
|
||||||
Thread.delay 3.
|
|
||||||
|
|
||||||
(** The main loop of a thread that, reads from [bq] to get the next message to
|
(** The main loop of a thread that, reads from [bq] to get the next message to
|
||||||
send via http *)
|
send via http *)
|
||||||
|
|
@ -82,8 +91,9 @@ module Consumer_impl = struct
|
||||||
Ezcurl.with_client ?set_opts:None @@ fun client ->
|
Ezcurl.with_client ?set_opts:None @@ fun client ->
|
||||||
(* we need exactly one encoder per thread *)
|
(* we need exactly one encoder per thread *)
|
||||||
let encoder = Pbrt.Encoder.create ~size:2048 () in
|
let encoder = Pbrt.Encoder.create ~size:2048 () in
|
||||||
|
let backoff = Util_backoff.create () in
|
||||||
|
|
||||||
let send ~name ~url ~conv (signals : _ list) : unit =
|
let send ~name ~url ~conv ~backoff (signals : _ list) : unit =
|
||||||
let@ _sp =
|
let@ _sp =
|
||||||
Self_trace.with_ ~kind:Span_kind_producer name
|
Self_trace.with_ ~kind:Span_kind_producer name
|
||||||
~attrs:[ "n", `Int (List.length signals) ]
|
~attrs:[ "n", `Int (List.length signals) ]
|
||||||
|
|
@ -93,7 +103,7 @@ module Consumer_impl = struct
|
||||||
Pbrt.Encoder.reset encoder;
|
Pbrt.Encoder.reset encoder;
|
||||||
|
|
||||||
ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int);
|
ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int);
|
||||||
send_http_ self client msg ~url;
|
send_http_ self client msg ~backoff ~url;
|
||||||
()
|
()
|
||||||
in
|
in
|
||||||
while not (Atomic.get self.stop) do
|
while not (Atomic.get self.stop) do
|
||||||
|
|
@ -101,13 +111,13 @@ module Consumer_impl = struct
|
||||||
| `Closed -> shutdown self
|
| `Closed -> shutdown self
|
||||||
| `Empty -> Util_thread.MCond.wait self.mcond
|
| `Empty -> Util_thread.MCond.wait self.mcond
|
||||||
| `Item (Any_resource.R_spans tr) ->
|
| `Item (Any_resource.R_spans tr) ->
|
||||||
send ~name:"send-traces" ~conv:Signal.Encode.traces
|
send ~name:"send-traces" ~conv:Signal.Encode.traces ~backoff
|
||||||
~url:self.config.common.url_traces tr
|
~url:self.config.common.url_traces tr
|
||||||
| `Item (Any_resource.R_metrics ms) ->
|
| `Item (Any_resource.R_metrics ms) ->
|
||||||
send ~name:"send-metrics" ~conv:Signal.Encode.metrics
|
send ~name:"send-metrics" ~conv:Signal.Encode.metrics ~backoff
|
||||||
~url:self.config.common.url_metrics ms
|
~url:self.config.common.url_metrics ms
|
||||||
| `Item (Any_resource.R_logs logs) ->
|
| `Item (Any_resource.R_logs logs) ->
|
||||||
send ~name:"send-logs" ~conv:Signal.Encode.logs
|
send ~name:"send-logs" ~conv:Signal.Encode.logs ~backoff
|
||||||
~url:self.config.common.url_logs logs
|
~url:self.config.common.url_logs logs
|
||||||
done
|
done
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue