diff --git a/src/client-ocurl-lwt/dune b/src/client-ocurl-lwt/dune index 64fb6217..41dd39a9 100644 --- a/src/client-ocurl-lwt/dune +++ b/src/client-ocurl-lwt/dune @@ -8,6 +8,7 @@ opentelemetry opentelemetry.atomic opentelemetry-client + opentelemetry-client.lwt pbrt mtime mtime.clock.os diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 0041e2af..b31160ed 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -3,9 +3,8 @@ https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md *) -module OT = Opentelemetry +module Config = Config open Opentelemetry -open Opentelemetry_util open Opentelemetry_client open Common_ @@ -17,40 +16,14 @@ external reraise : exn -> 'a = "%reraise" (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to use Lwt's latest version *) -type error = - [ `Status of int * Opentelemetry.Proto.Status.status - | `Failure of string - | `Sysbreak - ] +type error = Export_error.t +(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) let n_errors = Atomic.make 0 -let n_dropped = Atomic.make 0 - -let report_err_ = function - | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" - | `Failure msg -> - Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status - ( code, - { - Opentelemetry.Proto.Status.code = scode; - message; - details; - _presence = _; - } ) -> - let pp_details out l = - List.iter - (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) - l - in - Format.eprintf - "@[<2>opentelemetry: export failed with@ http code=%d@ status \ - {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." - code scode - (Bytes.unsafe_to_string message) - pp_details details +let report_err_ = Export_error.report_err +(** HTTP client *) module Httpc : sig type t @@ -68,24 +41,22 @@ end = struct open Opentelemetry.Proto open Lwt.Syntax - type t = unit + type t = Curl.t - let create () : t = () + let create () : t = Ezcurl_core.make () - let cleanup _self = () - - (* FIXME: absolutely need some rate limiting somewhere, ideally as early - as possible so we can measure how many resources we drop *) + let cleanup self = Ezcurl_core.delete self (* send the content to the remote endpoint/path *) - let send (_self : t) ~url ~decode (bod : string) : ('a, error) result Lwt.t = + let send (self : t) ~url ~decode (bod : string) : ('a, error) result Lwt.t = let* r = let headers = ("Content-Type", "application/x-protobuf") :: ("Accept", "application/x-protobuf") :: Config.Env.get_headers () in - Ezcurl_lwt.post ~headers ~params:[] ~url ~content:(`String bod) () + Ezcurl_lwt.post ~client:self ~headers ~params:[] ~url + ~content:(`String bod) () in match r with | Error (code, msg) -> @@ -115,53 +86,36 @@ end = struct in Lwt.return r) | Ok { code; body; _ } -> - let dec = Pbrt.Decoder.of_string body in - - let r = - try - let status = Status.decode_pb_status dec in - Error (`Status (code, status)) - with e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf - "httpc: decoding of status (url=%S, code=%d) failed with:\n\ - %s\n\ - status: %S\n\ - %s" - url code (Printexc.to_string e) body bt)) - in - Lwt.return r + let err = Export_error.decode_invalid_http_response ~url ~code body in + Lwt.return (Error err) end -module Exporter_impl = struct +module Consumer_impl = struct + module CNotifier = Opentelemetry_client_lwt.Notifier open Lwt.Syntax - let[@inline] push_to_batch b e = - if e <> [] then ( - match Batch.push b e with - | `Ok -> () - | `Dropped -> Atomic.incr n_dropped - ) - type state = { stop: bool Atomic.t; cleaned: bool Atomic.t; (** True when we cleaned up after closing *) config: Config.t; - encoder_pool: Pbrt.Encoder.t Rpool.t; - traces: Proto.Trace.span Batch.t; - logs: Proto.Logs.log_record Batch.t; - metrics: Proto.Metrics.metric Batch.t; + q: Any_resource.t Bounded_queue.t; + notify: CNotifier.t; } - let send_http_ (st : state) (httpc : Httpc.t) ~url data : unit Lwt.t = + let shutdown self = + if not (Atomic.exchange self.stop true) then ( + CNotifier.trigger self.notify; + CNotifier.delete self.notify + ) + + let send_http_ (self : state) (httpc : Httpc.t) ~url (data : string) : + unit Lwt.t = let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with | Ok () -> Lwt.return () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set st.stop true; + Atomic.set self.stop true; Lwt.return () | Error err -> (* TODO: log error _via_ otel? *) @@ -170,179 +124,113 @@ module Exporter_impl = struct (* avoid crazy error loop *) Lwt_unix.sleep 3. - let send_metrics_http (st : state) client + let send_metrics_http (st : state) client ~encoder (l : Proto.Metrics.resource_metrics list) = - let msg = - let@ encoder = Rpool.with_resource st.encoder_pool in - Signal.Encode.metrics ~encoder l - in + let msg = Signal.Encode.metrics ~encoder l in + Pbrt.Encoder.reset encoder; send_http_ st client msg ~url:st.config.url_metrics - let send_traces_http st client (l : Proto.Trace.resource_spans list) = - let msg = - let@ encoder = Rpool.with_resource st.encoder_pool in - Signal.Encode.traces ~encoder l - in + let send_traces_http st client ~encoder (l : Proto.Trace.resource_spans list) + = + let msg = Signal.Encode.traces ~encoder l in + Pbrt.Encoder.reset encoder; send_http_ st client msg ~url:st.config.url_traces - let send_logs_http st client (l : Proto.Logs.resource_logs list) = - let msg = - let@ encoder = Rpool.with_resource st.encoder_pool in - Signal.Encode.logs ~encoder l - in + let send_logs_http st client ~encoder (l : Proto.Logs.resource_logs list) = + let msg = Signal.Encode.logs ~encoder l in + Pbrt.Encoder.reset encoder; send_http_ st client msg ~url:st.config.url_logs - (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe (st : state) ~now ?force httpc : bool Lwt.t = - match Batch.pop_if_ready ?force ~now st.metrics with - | None -> Lwt.return false - | Some l -> - let res = Util_resources.make_resource_metrics l in - let+ () = send_metrics_http st httpc [ res ] in - true + let tick (self : state) = CNotifier.trigger self.notify - let emit_traces_maybe st ~now ?force httpc : bool Lwt.t = - match Batch.pop_if_ready ?force ~now st.traces with - | None -> Lwt.return false - | Some l -> - let res = Util_resources.make_resource_spans l in - let+ () = send_traces_http st httpc [ res ] in - true + let start_worker (self : state) : unit = + let client = Httpc.create () in + let encoder = Pbrt.Encoder.create () in - let emit_logs_maybe st ~now ?force httpc : bool Lwt.t = - match Batch.pop_if_ready ?force ~now st.logs with - | None -> Lwt.return false - | Some l -> - let res = Util_resources.make_resource_logs l in - let+ () = send_logs_http st httpc [ res ] in - true - - let emit_all_force st (httpc : Httpc.t) : unit Lwt.t = - let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe st ~now ~force:true httpc - and+ (_ : bool) = emit_logs_maybe st ~now ~force:true httpc - and+ (_ : bool) = emit_metrics_maybe st ~now ~force:true httpc in - () - - let[@inline] guard_exn_ where f = - try f () - with e -> - let bt = Printexc.get_backtrace () in - Printf.eprintf - "opentelemetry-ocurl-lwt: uncaught exception in %s: %s\n%s\n%!" where - (Printexc.to_string e) bt - - (* Lwt task that calls [tick()] regularly, to help enforce timeouts *) - let setup_ticker_ st ~tick ~finally () = - let rec tick_loop () = - if Atomic.get st.stop then ( - finally (); + (* loop on [q] *) + let rec loop () : unit Lwt.t = + if Atomic.get self.stop then Lwt.return () - ) else - let* () = Lwt_unix.sleep 0.5 in - let* () = tick () in - tick_loop () - in - Lwt.async tick_loop - - (* make an emitter. - - exceptions inside should be caught, see - https://opentelemetry.io/docs/reference/specification/error-handling/ *) - let create ~stop ~(config : Config.t) () : OT.Exporter.t = - let open Proto in - let encoder_pool = - Rpool.create - ~mk_item:(fun () -> Pbrt.Encoder.create ~size:1024 ()) - ~max_size:32 ~clear:Pbrt.Encoder.reset () - in - - (* local helpers *) - let timeout = - if config.batch_timeout_ms > 0 then - Some Mtime.Span.(config.batch_timeout_ms * ms) else - None + let* () = + match Bounded_queue.try_pop self.q with + | `Closed -> + shutdown self; + Lwt.return () + | `Empty -> CNotifier.wait self.notify + | `Item (R_logs logs) -> send_logs_http self client ~encoder logs + | `Item (R_metrics ms) -> send_metrics_http self client ~encoder ms + | `Item (R_spans spans) -> send_traces_http self client ~encoder spans + in + loop () in - let st = + Lwt.async (fun () -> + Lwt.finalize loop (fun () -> + Httpc.cleanup client; + Lwt.return ())) + + let default_n_workers = 50 + + let create_state ~stop ~config ~q () : state = + let self = { stop; config; + q; cleaned = Atomic.make false; - encoder_pool; - traces = Batch.make ?batch:config.batch_traces ?timeout (); - metrics = Batch.make ?batch:config.batch_metrics ?timeout (); - logs = Batch.make ?batch:config.batch_logs ?timeout (); + notify = CNotifier.create (); } in - let httpc = Httpc.create () in - let ticker = Cb_set.create () in - let tick_ () = - if Config.Env.get_debug () then - Printf.eprintf "tick (from %d)\n%!" (Thread.id @@ Thread.self ()); - Cb_set.trigger ticker; - let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe st ~now httpc - and+ (_ : bool) = emit_logs_maybe st ~now httpc - and+ (_ : bool) = emit_metrics_maybe st ~now httpc in - () + (* start workers *) + let n_workers = + min 2 + (max 500 + (Option.value ~default:default_n_workers + config.http_concurrency_level)) in + for _i = 1 to n_workers do + start_worker self + done; - setup_ticker_ st ~tick:tick_ ~finally:ignore (); + self - (* we make sure that this is thread-safe, even though we don't have a - background thread. There can still be a ticker thread, and there - can also be several user threads that produce spans and call - the emit functions. *) - object - method send_trace e = - let@ () = guard_exn_ "push trace" in - push_to_batch st.traces e; - let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_traces_maybe st ~now httpc in - ()) + let to_consumer (self : state) : Any_resource.t Consumer.t = + let active () = not (Atomic.get self.stop) in + let shutdown ~on_done = + shutdown self; + on_done () + in + let tick () = tick self in + { active; tick; shutdown } - method send_metrics e = - let@ () = guard_exn_ "push metrics" in - push_to_batch st.metrics e; - let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_metrics_maybe st ~now httpc in - ()) - - method send_logs e = - let@ () = guard_exn_ "push logs" in - push_to_batch st.logs e; - let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_logs_maybe st ~now httpc in - ()) - - method add_on_tick_callback f = Cb_set.register ticker f - - (* if called in a blocking context: work in the background *) - method tick () = Lwt.async tick_ - - method cleanup ~on_done () = - if Config.Env.get_debug () then - Printf.eprintf "opentelemetry: exiting…\n%!"; - Lwt.async (fun () -> - let* () = emit_all_force st httpc in - Httpc.cleanup httpc; - on_done (); - Lwt.return ()) - end + let consumer ~stop ~config () : Consumer.any_resource_builder = + { + start_consuming = + (fun q -> + let st = create_state ~stop ~config ~q () in + to_consumer st); + } end -let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = - Exporter_impl.create ~stop ~config () +let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = + Consumer_impl.consumer ~stop ~config () + +let create_exporter ?stop ?(config = Config.make ()) () = + let consumer = create_consumer ?stop ~config () in + let bq = + Bounded_queue_sync.create + ~high_watermark:Bounded_queue.Defaults.high_watermark () + in + Exporter_queued.create ~q:bq ~consumer () + |> Exporter_add_batching.add_batching ~config + +let create_backend = create_exporter let setup_ ?stop ?config () : unit = let exp = create_backend ?stop ?config () in - OT.Exporter.Main_exporter.set exp; + Main_exporter.set exp; () let setup ?stop ?config ?(enable = true) () = @@ -350,9 +238,7 @@ let setup ?stop ?config ?(enable = true) () = let remove_backend () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in - OT.Exporter.Main_exporter.remove - ~on_done:(fun () -> Lwt.wakeup_later done_u ()) - (); + Main_exporter.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); done_fut let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli index 0e02d495..713ea70a 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli @@ -12,10 +12,21 @@ val set_headers : (string * string) list -> unit module Config = Config -val create_backend : +val create_consumer : + ?stop:bool Atomic.t -> + ?config:Config.t -> + unit -> + Opentelemetry_client.Consumer.any_resource_builder +(** Consumer that pulls from a queue *) + +val create_exporter : ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t (** Create a new backend using lwt and ezcurl-lwt *) +val create_backend : + ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +[@@deprecated "use create_exporter"] + val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.