mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-11 04:58:39 -04:00
http clients: carry a description of the export attempt into error message
This commit is contained in:
parent
aa86fc455d
commit
ce3c85869b
6 changed files with 39 additions and 25 deletions
|
|
@ -89,8 +89,8 @@ struct
|
||||||
let cleanup = ignore
|
let cleanup = ignore
|
||||||
|
|
||||||
(* send the content to the remote endpoint/path *)
|
(* send the content to the remote endpoint/path *)
|
||||||
let send (client : t) ~url ~headers:user_headers ~decode (body : string) :
|
let send (client : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, Export_error.t) result =
|
(body : string) : ('a, Export_error.t) result =
|
||||||
Eio.Switch.run @@ fun sw ->
|
Eio.Switch.run @@ fun sw ->
|
||||||
let uri = Uri.of_string url in
|
let uri = Uri.of_string url in
|
||||||
|
|
||||||
|
|
@ -138,7 +138,7 @@ struct
|
||||||
let r =
|
let r =
|
||||||
try
|
try
|
||||||
let status = Status.decode_pb_status dec in
|
let status = Status.decode_pb_status dec in
|
||||||
Error (`Status (code, status))
|
Error (`Status (code, status, attempt_descr))
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_backtrace () in
|
let bt = Printexc.get_backtrace () in
|
||||||
Error
|
Error
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,8 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
let cleanup _self = ()
|
let cleanup _self = ()
|
||||||
|
|
||||||
(* send the content to the remote endpoint/path *)
|
(* send the content to the remote endpoint/path *)
|
||||||
let send (_self : t) ~url ~headers:user_headers ~decode (bod : string) :
|
let send (_self : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, error) result Lwt.t =
|
(bod : string) : ('a, error) result Lwt.t =
|
||||||
let uri = Uri.of_string url in
|
let uri = Uri.of_string url in
|
||||||
|
|
||||||
let open Cohttp in
|
let open Cohttp in
|
||||||
|
|
@ -74,7 +74,7 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
let r =
|
let r =
|
||||||
try
|
try
|
||||||
let status = Status.decode_pb_status dec in
|
let status = Status.decode_pb_status dec in
|
||||||
Error (`Status (code, status))
|
Error (`Status (code, status, attempt_descr))
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_backtrace () in
|
let bt = Printexc.get_backtrace () in
|
||||||
Error
|
Error
|
||||||
|
|
|
||||||
|
|
@ -26,8 +26,8 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
let cleanup self = Ezcurl_lwt.delete self
|
let cleanup self = Ezcurl_lwt.delete self
|
||||||
|
|
||||||
(** send the content to the remote endpoint/path *)
|
(** send the content to the remote endpoint/path *)
|
||||||
let send (self : t) ~url ~headers:user_headers ~decode (bod : string) :
|
let send (self : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, error) result Lwt.t =
|
(bod : string) : ('a, error) result Lwt.t =
|
||||||
let* r =
|
let* r =
|
||||||
let headers = user_headers in
|
let headers = user_headers in
|
||||||
Ezcurl_lwt.post ~client:self ~headers ~params:[] ~url
|
Ezcurl_lwt.post ~client:self ~headers ~params:[] ~url
|
||||||
|
|
@ -61,7 +61,9 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
in
|
in
|
||||||
Lwt.return r)
|
Lwt.return r)
|
||||||
| Ok { code; body; _ } ->
|
| Ok { code; body; _ } ->
|
||||||
let err = Export_error.decode_invalid_http_response ~url ~code body in
|
let err =
|
||||||
|
Export_error.decode_invalid_http_response ~attempt_descr ~url ~code body
|
||||||
|
in
|
||||||
Lwt.return (Error err)
|
Lwt.return (Error err)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,8 +24,8 @@ module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
|
|
||||||
let cleanup = Ezcurl.delete
|
let cleanup = Ezcurl.delete
|
||||||
|
|
||||||
let send (self : t) ~url ~headers:user_headers ~decode (bod : string) :
|
let send (self : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, error) result =
|
(bod : string) : ('a, error) result =
|
||||||
let r =
|
let r =
|
||||||
let headers = user_headers in
|
let headers = user_headers in
|
||||||
Ezcurl.post ~client:self ~headers ~params:[] ~url ~content:(`String bod)
|
Ezcurl.post ~client:self ~headers ~params:[] ~url ~content:(`String bod)
|
||||||
|
|
@ -57,7 +57,8 @@ module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))))
|
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))))
|
||||||
| Ok { code; body; _ } ->
|
| Ok { code; body; _ } ->
|
||||||
let err =
|
let err =
|
||||||
OTELC.Export_error.decode_invalid_http_response ~url ~code body
|
OTELC.Export_error.decode_invalid_http_response ~attempt_descr ~url
|
||||||
|
~code body
|
||||||
in
|
in
|
||||||
Error err
|
Error err
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
(** Error that can occur during export *)
|
(** Error that can occur during export *)
|
||||||
|
|
||||||
|
type attempt_descr = string
|
||||||
|
|
||||||
type t =
|
type t =
|
||||||
[ `Status of int * Opentelemetry.Proto.Status.status
|
[ `Status of int * Opentelemetry.Proto.Status.status * attempt_descr
|
||||||
| `Failure of string
|
| `Failure of string
|
||||||
| `Sysbreak
|
| `Sysbreak
|
||||||
]
|
]
|
||||||
|
|
@ -24,25 +26,27 @@ let report_err : t -> unit = function
|
||||||
message;
|
message;
|
||||||
details;
|
details;
|
||||||
_presence = _;
|
_presence = _;
|
||||||
} ) ->
|
},
|
||||||
|
descr ) ->
|
||||||
Opentelemetry.Self_debug.log Opentelemetry.Self_debug.Error (fun () ->
|
Opentelemetry.Self_debug.log Opentelemetry.Self_debug.Error (fun () ->
|
||||||
let pp_details out l =
|
let pp_details out l =
|
||||||
List.iter
|
List.iter
|
||||||
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
|
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
|
||||||
l
|
l
|
||||||
in
|
in
|
||||||
|
|
||||||
Format.asprintf
|
Format.asprintf
|
||||||
"@[<2>opentelemetry: export failed with@ http code=%d@ status \
|
"@[<2>opentelemetry: export failed with@ http code=%d@ attempt: %s@ \
|
||||||
{@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]"
|
status {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]"
|
||||||
code scode
|
code descr scode
|
||||||
(Bytes.unsafe_to_string message)
|
(Bytes.unsafe_to_string message)
|
||||||
pp_details details)
|
pp_details details)
|
||||||
|
|
||||||
let decode_invalid_http_response ~code ~url (body : string) : t =
|
let decode_invalid_http_response ~attempt_descr ~code ~url (body : string) : t =
|
||||||
try
|
try
|
||||||
let dec = Pbrt.Decoder.of_string body in
|
let dec = Pbrt.Decoder.of_string body in
|
||||||
let status = Opentelemetry.Proto.Status.decode_pb_status dec in
|
let status = Opentelemetry.Proto.Status.decode_pb_status dec in
|
||||||
`Status (code, status)
|
`Status (code, status, attempt_descr)
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_backtrace () in
|
let bt = Printexc.get_backtrace () in
|
||||||
`Failure
|
`Failure
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,14 @@ module type HTTPC = sig
|
||||||
|
|
||||||
val send :
|
val send :
|
||||||
t ->
|
t ->
|
||||||
|
attempt_descr:string ->
|
||||||
url:string ->
|
url:string ->
|
||||||
headers:(string * string) list ->
|
headers:(string * string) list ->
|
||||||
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
|
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
|
||||||
string ->
|
string ->
|
||||||
('a, error) result IO.t
|
('a, error) result IO.t
|
||||||
|
(** Send a HTTP request.
|
||||||
|
@param attempt_descr included in error message if this fails *)
|
||||||
end
|
end
|
||||||
|
|
||||||
module Make
|
module Make
|
||||||
|
|
@ -61,15 +64,19 @@ end = struct
|
||||||
(** Should we retry, based on the HTTP response code? *)
|
(** Should we retry, based on the HTTP response code? *)
|
||||||
let should_retry = function
|
let should_retry = function
|
||||||
| `Failure _ -> true (* Network errors, connection issues *)
|
| `Failure _ -> true (* Network errors, connection issues *)
|
||||||
| `Status (code, _) ->
|
| `Status (code, _, _) ->
|
||||||
(* Retry on server errors, rate limits, timeouts *)
|
(* Retry on server errors, rate limits, timeouts *)
|
||||||
code >= 500 || code = 429 || code = 408
|
code >= 500 || code = 429 || code = 408
|
||||||
| `Sysbreak -> false (* User interrupt, don't retry *)
|
| `Sysbreak -> false (* User interrupt, don't retry *)
|
||||||
|
|
||||||
(** Retry loop over [f()] with exponential backoff *)
|
(** Retry loop over [f()] with exponential backoff *)
|
||||||
let rec retry_loop_ (self : t) attempt delay_ms ~f =
|
let rec retry_loop_ (self : t) attempt delay_ms
|
||||||
|
~(f : attempt_descr:string -> unit -> _ result IO.t) : _ result IO.t =
|
||||||
let open IO in
|
let open IO in
|
||||||
let* result = f () in
|
let attempt_descr =
|
||||||
|
spf "try(%d/%d)" attempt self.config.retry_max_attempts
|
||||||
|
in
|
||||||
|
let* result = f ~attempt_descr () in
|
||||||
match result with
|
match result with
|
||||||
| Ok x -> return (Ok x)
|
| Ok x -> return (Ok x)
|
||||||
| Error err
|
| Error err
|
||||||
|
|
@ -114,14 +121,14 @@ end = struct
|
||||||
~protocol:self.config.protocol res
|
~protocol:self.config.protocol res
|
||||||
in
|
in
|
||||||
|
|
||||||
let do_once () =
|
let do_once ~attempt_descr () =
|
||||||
Httpc.send self.http ~url ~headers ~decode:(`Ret ()) data
|
Httpc.send self.http ~attempt_descr ~url ~headers ~decode:(`Ret ()) data
|
||||||
in
|
in
|
||||||
|
|
||||||
if self.config.retry_max_attempts > 0 then
|
if self.config.retry_max_attempts > 0 then
|
||||||
retry_loop_ self 0 self.config.retry_initial_delay_ms ~f:do_once
|
retry_loop_ self 0 self.config.retry_initial_delay_ms ~f:do_once
|
||||||
else
|
else
|
||||||
do_once ()
|
do_once ~attempt_descr:"single_attempt" ()
|
||||||
end
|
end
|
||||||
|
|
||||||
module C = Generic_consumer.Make (IO) (Notifier) (Sender)
|
module C = Generic_consumer.Make (IO) (Notifier) (Sender)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue