mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-05-05 17:04:52 -04:00
Compare commits
3 commits
aa86fc455d
...
54dc4d0ef7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54dc4d0ef7 | ||
|
|
31e9812b4f | ||
|
|
ce3c85869b |
9 changed files with 54 additions and 32 deletions
|
|
@ -214,7 +214,7 @@
|
||||||
(mtime
|
(mtime
|
||||||
(>= "1.4"))
|
(>= "1.4"))
|
||||||
ca-certs
|
ca-certs
|
||||||
mirage-crypto-rng-eio
|
mirage-crypto-rng
|
||||||
ambient-context-eio
|
ambient-context-eio
|
||||||
(opentelemetry
|
(opentelemetry
|
||||||
(= :version))
|
(= :version))
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ depends: [
|
||||||
"ocaml" {>= "5.00"}
|
"ocaml" {>= "5.00"}
|
||||||
"mtime" {>= "1.4"}
|
"mtime" {>= "1.4"}
|
||||||
"ca-certs"
|
"ca-certs"
|
||||||
"mirage-crypto-rng-eio"
|
"mirage-crypto-rng"
|
||||||
"ambient-context-eio"
|
"ambient-context-eio"
|
||||||
"opentelemetry" {= version}
|
"opentelemetry" {= version}
|
||||||
"opentelemetry-client" {= version}
|
"opentelemetry-client" {= version}
|
||||||
|
|
|
||||||
|
|
@ -89,8 +89,8 @@ struct
|
||||||
let cleanup = ignore
|
let cleanup = ignore
|
||||||
|
|
||||||
(* send the content to the remote endpoint/path *)
|
(* send the content to the remote endpoint/path *)
|
||||||
let send (client : t) ~url ~headers:user_headers ~decode (body : string) :
|
let send (client : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, Export_error.t) result =
|
(body : string) : ('a, Export_error.t) result =
|
||||||
Eio.Switch.run @@ fun sw ->
|
Eio.Switch.run @@ fun sw ->
|
||||||
let uri = Uri.of_string url in
|
let uri = Uri.of_string url in
|
||||||
|
|
||||||
|
|
@ -138,7 +138,7 @@ struct
|
||||||
let r =
|
let r =
|
||||||
try
|
try
|
||||||
let status = Status.decode_pb_status dec in
|
let status = Status.decode_pb_status dec in
|
||||||
Error (`Status (code, status))
|
Error (`Status (code, status, attempt_descr))
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_backtrace () in
|
let bt = Printexc.get_backtrace () in
|
||||||
Error
|
Error
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,8 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
let cleanup _self = ()
|
let cleanup _self = ()
|
||||||
|
|
||||||
(* send the content to the remote endpoint/path *)
|
(* send the content to the remote endpoint/path *)
|
||||||
let send (_self : t) ~url ~headers:user_headers ~decode (bod : string) :
|
let send (_self : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, error) result Lwt.t =
|
(bod : string) : ('a, error) result Lwt.t =
|
||||||
let uri = Uri.of_string url in
|
let uri = Uri.of_string url in
|
||||||
|
|
||||||
let open Cohttp in
|
let open Cohttp in
|
||||||
|
|
@ -74,7 +74,7 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
let r =
|
let r =
|
||||||
try
|
try
|
||||||
let status = Status.decode_pb_status dec in
|
let status = Status.decode_pb_status dec in
|
||||||
Error (`Status (code, status))
|
Error (`Status (code, status, attempt_descr))
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_backtrace () in
|
let bt = Printexc.get_backtrace () in
|
||||||
Error
|
Error
|
||||||
|
|
|
||||||
|
|
@ -26,8 +26,8 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
let cleanup self = Ezcurl_lwt.delete self
|
let cleanup self = Ezcurl_lwt.delete self
|
||||||
|
|
||||||
(** send the content to the remote endpoint/path *)
|
(** send the content to the remote endpoint/path *)
|
||||||
let send (self : t) ~url ~headers:user_headers ~decode (bod : string) :
|
let send (self : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, error) result Lwt.t =
|
(bod : string) : ('a, error) result Lwt.t =
|
||||||
let* r =
|
let* r =
|
||||||
let headers = user_headers in
|
let headers = user_headers in
|
||||||
Ezcurl_lwt.post ~client:self ~headers ~params:[] ~url
|
Ezcurl_lwt.post ~client:self ~headers ~params:[] ~url
|
||||||
|
|
@ -61,7 +61,9 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
in
|
in
|
||||||
Lwt.return r)
|
Lwt.return r)
|
||||||
| Ok { code; body; _ } ->
|
| Ok { code; body; _ } ->
|
||||||
let err = Export_error.decode_invalid_http_response ~url ~code body in
|
let err =
|
||||||
|
Export_error.decode_invalid_http_response ~attempt_descr ~url ~code body
|
||||||
|
in
|
||||||
Lwt.return (Error err)
|
Lwt.return (Error err)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,8 +24,8 @@ module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
|
|
||||||
let cleanup = Ezcurl.delete
|
let cleanup = Ezcurl.delete
|
||||||
|
|
||||||
let send (self : t) ~url ~headers:user_headers ~decode (bod : string) :
|
let send (self : t) ~attempt_descr ~url ~headers:user_headers ~decode
|
||||||
('a, error) result =
|
(bod : string) : ('a, error) result =
|
||||||
let r =
|
let r =
|
||||||
let headers = user_headers in
|
let headers = user_headers in
|
||||||
Ezcurl.post ~client:self ~headers ~params:[] ~url ~content:(`String bod)
|
Ezcurl.post ~client:self ~headers ~params:[] ~url ~content:(`String bod)
|
||||||
|
|
@ -57,7 +57,8 @@ module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||||
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))))
|
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))))
|
||||||
| Ok { code; body; _ } ->
|
| Ok { code; body; _ } ->
|
||||||
let err =
|
let err =
|
||||||
OTELC.Export_error.decode_invalid_http_response ~url ~code body
|
OTELC.Export_error.decode_invalid_http_response ~attempt_descr ~url
|
||||||
|
~code body
|
||||||
in
|
in
|
||||||
Error err
|
Error err
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
(** Error that can occur during export *)
|
(** Error that can occur during export *)
|
||||||
|
|
||||||
|
type attempt_descr = string
|
||||||
|
|
||||||
type t =
|
type t =
|
||||||
[ `Status of int * Opentelemetry.Proto.Status.status
|
[ `Status of int * Opentelemetry.Proto.Status.status * attempt_descr
|
||||||
| `Failure of string
|
| `Failure of string
|
||||||
| `Sysbreak
|
| `Sysbreak
|
||||||
]
|
]
|
||||||
|
|
@ -10,12 +12,18 @@ let str_to_hex (s : string) : string =
|
||||||
Opentelemetry_util.Util_bytes_.bytes_to_hex (Bytes.unsafe_of_string s)
|
Opentelemetry_util.Util_bytes_.bytes_to_hex (Bytes.unsafe_of_string s)
|
||||||
|
|
||||||
(** Report the error on stderr. *)
|
(** Report the error on stderr. *)
|
||||||
let report_err : t -> unit = function
|
let report_err ~level:(provided_level : [ `Debug | `Auto ]) (err : t) : unit =
|
||||||
|
let compute_level lvl =
|
||||||
|
match provided_level with
|
||||||
|
| `Debug -> Opentelemetry.Self_debug.Debug
|
||||||
|
| `Auto -> lvl
|
||||||
|
in
|
||||||
|
match err with
|
||||||
| `Sysbreak ->
|
| `Sysbreak ->
|
||||||
Opentelemetry.Self_debug.log Opentelemetry.Self_debug.Info (fun () ->
|
Opentelemetry.Self_debug.log (compute_level Info) (fun () ->
|
||||||
"opentelemetry: ctrl-c captured, stopping")
|
"opentelemetry: ctrl-c captured, stopping")
|
||||||
| `Failure msg ->
|
| `Failure msg ->
|
||||||
Opentelemetry.Self_debug.log Opentelemetry.Self_debug.Error (fun () ->
|
Opentelemetry.Self_debug.log (compute_level Error) (fun () ->
|
||||||
Printf.sprintf "opentelemetry: export failed: %s" msg)
|
Printf.sprintf "opentelemetry: export failed: %s" msg)
|
||||||
| `Status
|
| `Status
|
||||||
( code,
|
( code,
|
||||||
|
|
@ -24,25 +32,27 @@ let report_err : t -> unit = function
|
||||||
message;
|
message;
|
||||||
details;
|
details;
|
||||||
_presence = _;
|
_presence = _;
|
||||||
} ) ->
|
},
|
||||||
Opentelemetry.Self_debug.log Opentelemetry.Self_debug.Error (fun () ->
|
descr ) ->
|
||||||
|
Opentelemetry.Self_debug.log (compute_level Error) (fun () ->
|
||||||
let pp_details out l =
|
let pp_details out l =
|
||||||
List.iter
|
List.iter
|
||||||
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
|
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
|
||||||
l
|
l
|
||||||
in
|
in
|
||||||
|
|
||||||
Format.asprintf
|
Format.asprintf
|
||||||
"@[<2>opentelemetry: export failed with@ http code=%d@ status \
|
"@[<2>opentelemetry: export failed with@ http code=%d@ attempt: %s@ \
|
||||||
{@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]"
|
status {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]"
|
||||||
code scode
|
code descr scode
|
||||||
(Bytes.unsafe_to_string message)
|
(Bytes.unsafe_to_string message)
|
||||||
pp_details details)
|
pp_details details)
|
||||||
|
|
||||||
let decode_invalid_http_response ~code ~url (body : string) : t =
|
let decode_invalid_http_response ~attempt_descr ~code ~url (body : string) : t =
|
||||||
try
|
try
|
||||||
let dec = Pbrt.Decoder.of_string body in
|
let dec = Pbrt.Decoder.of_string body in
|
||||||
let status = Opentelemetry.Proto.Status.decode_pb_status dec in
|
let status = Opentelemetry.Proto.Status.decode_pb_status dec in
|
||||||
`Status (code, status)
|
`Status (code, status, attempt_descr)
|
||||||
with e ->
|
with e ->
|
||||||
let bt = Printexc.get_backtrace () in
|
let bt = Printexc.get_backtrace () in
|
||||||
`Failure
|
`Failure
|
||||||
|
|
|
||||||
|
|
@ -128,7 +128,7 @@ end = struct
|
||||||
IO.return ()
|
IO.return ()
|
||||||
| Error err ->
|
| Error err ->
|
||||||
Atomic.incr n_errors;
|
Atomic.incr n_errors;
|
||||||
Export_error.report_err err;
|
Export_error.report_err ~level:`Auto err;
|
||||||
(* avoid crazy error loop *)
|
(* avoid crazy error loop *)
|
||||||
let dur_s = Util_net_backoff.on_error backoff in
|
let dur_s = Util_net_backoff.on_error backoff in
|
||||||
IO.sleep_s (dur_s +. Random.float (dur_s /. 10.))
|
IO.sleep_s (dur_s +. Random.float (dur_s /. 10.))
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,14 @@ module type HTTPC = sig
|
||||||
|
|
||||||
val send :
|
val send :
|
||||||
t ->
|
t ->
|
||||||
|
attempt_descr:string ->
|
||||||
url:string ->
|
url:string ->
|
||||||
headers:(string * string) list ->
|
headers:(string * string) list ->
|
||||||
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
|
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
|
||||||
string ->
|
string ->
|
||||||
('a, error) result IO.t
|
('a, error) result IO.t
|
||||||
|
(** Send a HTTP request.
|
||||||
|
@param attempt_descr included in error message if this fails *)
|
||||||
end
|
end
|
||||||
|
|
||||||
module Make
|
module Make
|
||||||
|
|
@ -61,20 +64,26 @@ end = struct
|
||||||
(** Should we retry, based on the HTTP response code? *)
|
(** Should we retry, based on the HTTP response code? *)
|
||||||
let should_retry = function
|
let should_retry = function
|
||||||
| `Failure _ -> true (* Network errors, connection issues *)
|
| `Failure _ -> true (* Network errors, connection issues *)
|
||||||
| `Status (code, _) ->
|
| `Status (code, _, _) ->
|
||||||
(* Retry on server errors, rate limits, timeouts *)
|
(* Retry on server errors, rate limits, timeouts *)
|
||||||
code >= 500 || code = 429 || code = 408
|
code >= 500 || code = 429 || code = 408
|
||||||
| `Sysbreak -> false (* User interrupt, don't retry *)
|
| `Sysbreak -> false (* User interrupt, don't retry *)
|
||||||
|
|
||||||
(** Retry loop over [f()] with exponential backoff *)
|
(** Retry loop over [f()] with exponential backoff *)
|
||||||
let rec retry_loop_ (self : t) attempt delay_ms ~f =
|
let rec retry_loop_ (self : t) attempt delay_ms
|
||||||
|
~(f : attempt_descr:string -> unit -> _ result IO.t) : _ result IO.t =
|
||||||
let open IO in
|
let open IO in
|
||||||
let* result = f () in
|
let attempt_descr =
|
||||||
|
spf "try(%d/%d)" attempt self.config.retry_max_attempts
|
||||||
|
in
|
||||||
|
let* result = f ~attempt_descr () in
|
||||||
match result with
|
match result with
|
||||||
| Ok x -> return (Ok x)
|
| Ok x -> return (Ok x)
|
||||||
| Error err
|
| Error err
|
||||||
when should_retry err && attempt < self.config.retry_max_attempts ->
|
when should_retry err && attempt < self.config.retry_max_attempts ->
|
||||||
let delay_s = delay_ms /. 1000. in
|
let delay_s = delay_ms /. 1000. in
|
||||||
|
Export_error.report_err ~level:`Debug err;
|
||||||
|
|
||||||
let* () = sleep_s delay_s in
|
let* () = sleep_s delay_s in
|
||||||
let next_delay =
|
let next_delay =
|
||||||
min self.config.retry_max_delay_ms
|
min self.config.retry_max_delay_ms
|
||||||
|
|
@ -114,14 +123,14 @@ end = struct
|
||||||
~protocol:self.config.protocol res
|
~protocol:self.config.protocol res
|
||||||
in
|
in
|
||||||
|
|
||||||
let do_once () =
|
let do_once ~attempt_descr () =
|
||||||
Httpc.send self.http ~url ~headers ~decode:(`Ret ()) data
|
Httpc.send self.http ~attempt_descr ~url ~headers ~decode:(`Ret ()) data
|
||||||
in
|
in
|
||||||
|
|
||||||
if self.config.retry_max_attempts > 0 then
|
if self.config.retry_max_attempts > 0 then
|
||||||
retry_loop_ self 0 self.config.retry_initial_delay_ms ~f:do_once
|
retry_loop_ self 0 self.config.retry_initial_delay_ms ~f:do_once
|
||||||
else
|
else
|
||||||
do_once ()
|
do_once ~attempt_descr:"single_attempt" ()
|
||||||
end
|
end
|
||||||
|
|
||||||
module C = Generic_consumer.Make (IO) (Notifier) (Sender)
|
module C = Generic_consumer.Make (IO) (Notifier) (Sender)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue