From f1ee6141a558f0601f5fe007a7d1f304b62640f6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 5 Dec 2025 22:59:11 -0500 Subject: [PATCH] refactor ocurl, ocurl_lwt, cohttp_lwt clients to use generic consumer --- src/client-cohttp-lwt/dune | 1 + .../opentelemetry_client_cohttp_lwt.ml | 419 ++---------------- .../opentelemetry_client_cohttp_lwt.mli | 14 +- .../opentelemetry_client_ocurl_lwt.ml | 161 +------ .../opentelemetry_client_ocurl.ml | 211 +++------ 5 files changed, 117 insertions(+), 689 deletions(-) diff --git a/src/client-cohttp-lwt/dune b/src/client-cohttp-lwt/dune index 23c36d3a..56b0b4f8 100644 --- a/src/client-cohttp-lwt/dune +++ b/src/client-cohttp-lwt/dune @@ -7,6 +7,7 @@ (libraries opentelemetry opentelemetry-client + opentelemetry-client.lwt lwt cohttp-lwt cohttp-lwt-unix diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 0f823fbc..53cf515b 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -3,10 +3,8 @@ https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md *) -module OT = Opentelemetry module Config = Config -module Signal = Opentelemetry_client.Signal -module Batch = Opentelemetry_client.Batch +open Opentelemetry_client open Opentelemetry open Common_ @@ -14,87 +12,14 @@ let set_headers = Config.Env.set_headers let get_headers = Config.Env.get_headers -external reraise : exn -> 'a = "%reraise" -(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to - use Lwt's latest version *) +type error = Export_error.t -let needs_gc_metrics = Atomic.make false +open struct + module IO = Opentelemetry_client_lwt.Io_lwt +end -let last_gc_metrics = Atomic.make (Mtime_clock.now ()) - -let timeout_gc_metrics = Mtime.Span.(20 * s) - -let gc_metrics = ref [] -(* side channel for GC, appended to {!E_metrics}'s data *) - -(* capture current GC metrics if {!needs_gc_metrics} is true, - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) -let sample_gc_metrics_if_needed () = - let now = Mtime_clock.now () in - let alarm = Atomic.compare_and_set needs_gc_metrics true false in - let timeout () = - let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in - Mtime.Span.compare elapsed timeout_gc_metrics > 0 - in - if alarm || timeout () then ( - Atomic.set last_gc_metrics now; - let l = - OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics () - in - gc_metrics := l :: !gc_metrics - ) - -type error = - [ `Status of int * Opentelemetry.Proto.Status.status - | `Failure of string - | `Sysbreak - ] - -let n_errors = Atomic.make 0 - -let n_dropped = Atomic.make 0 - -let report_err_ = function - | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" - | `Failure msg -> - Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status - ( code, - { - Opentelemetry.Proto.Status.code = scode; - message; - details; - _presence = _; - } ) -> - let pp_details out l = - List.iter - (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) - l - in - Format.eprintf - "@[<2>opentelemetry: export failed with@ http code=%d@ status \ - {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." - code scode - (Bytes.unsafe_to_string message) - pp_details details - -module Httpc : sig - type t - - val create : unit -> t - - val send : - t -> - url:string -> - decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> - string -> - ('a, error) result Lwt.t - - val cleanup : t -> unit -end = struct +module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct + module IO = IO open Opentelemetry.Proto open Lwt.Syntax module Httpc = Cohttp_lwt_unix.Client @@ -176,325 +101,39 @@ end = struct ) end -(** An emitter. This is used by {!Backend} below to forward traces/metrics/… - from the program to whatever collector client we have. *) -module type EMITTER = sig - open Opentelemetry.Proto +module Consumer_impl = + Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt) + (Httpc) - val push_trace : Trace.resource_spans list -> unit +let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = + Consumer_impl.consumer ~ticker_task:(Some 0.5) ~stop ~config () - val push_metrics : Metrics.resource_metrics list -> unit - - val push_logs : Logs.resource_logs list -> unit - - val set_on_tick_callbacks : (unit -> unit) Alist.t -> unit - - val tick : unit -> unit - - val cleanup : on_done:(unit -> unit) -> unit -> unit -end - -(* make an emitter. - - exceptions inside should be caught, see - https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = - let open Proto in - let open Lwt.Syntax in - (* local helpers *) - let open struct - let timeout = - if config.batch_timeout_ms > 0 then - Some Mtime.Span.(config.batch_timeout_ms * ms) - else - None - - let batch_traces : Trace.resource_spans Batch.t = - Batch.make ?batch:config.batch_traces ?timeout () - - let batch_metrics : Metrics.resource_metrics Batch.t = - Batch.make ?batch:config.batch_metrics ?timeout () - - let batch_logs : Logs.resource_logs Batch.t = - Batch.make ?batch:config.batch_logs ?timeout () - - let on_tick_cbs_ = Atomic.make (Alist.make ()) - - let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - - let send_http_ (httpc : Httpc.t) ~url data : unit Lwt.t = - let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in - match r with - | Ok () -> Lwt.return () - | Error `Sysbreak -> - Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set stop true; - Lwt.return () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err; - (* avoid crazy error loop *) - Lwt_unix.sleep 3. - - let send_metrics_http client (l : Metrics.resource_metrics list) = - Signal.Encode.metrics l |> send_http_ client ~url:config.url_metrics - - let send_traces_http client (l : Trace.resource_spans list) = - Signal.Encode.traces l |> send_http_ client ~url:config.url_traces - - let send_logs_http client (l : Logs.resource_logs list) = - Signal.Encode.logs l |> send_http_ client ~url:config.url_logs - - (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_metrics with - | None -> Lwt.return false - | Some l -> - let batch = !gc_metrics @ l in - gc_metrics := []; - let+ () = send_metrics_http httpc batch in - true - - let emit_traces_maybe ~now ?force httpc : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_traces with - | None -> Lwt.return false - | Some l -> - let+ () = send_traces_http httpc l in - true - - let emit_logs_maybe ~now ?force httpc : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_logs with - | None -> Lwt.return false - | Some l -> - let+ () = send_logs_http httpc l in - true - - let[@inline] guard_exn_ where f = - try f () - with e -> - let bt = Printexc.get_backtrace () in - Printf.eprintf - "opentelemetry-cohttp-lwt: uncaught exception in %s: %s\n%s\n%!" where - (Printexc.to_string e) bt - - let emit_all_force (httpc : Httpc.t) : unit Lwt.t = - let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc - and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc - and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc in - () - - (* thread that calls [tick()] regularly, to help enforce timeouts *) - let setup_ticker_thread ~tick ~finally () = - let rec tick_thread () = - if Atomic.get stop then ( - finally (); - Lwt.return () - ) else - let* () = Lwt_unix.sleep 0.5 in - let* () = tick () in - tick_thread () - in - Lwt.async tick_thread - end in - let httpc = Httpc.create () in - - let module M = struct - (* we make sure that this is thread-safe, even though we don't have a - background thread. There can still be a ticker thread, and there - can also be several user threads that produce spans and call - the emit functions. *) - - let push_to_batch b e = - match Batch.push b e with - | `Ok -> () - | `Dropped -> Atomic.incr n_errors - - let push_trace e = - let@ () = guard_exn_ "push trace" in - push_to_batch batch_traces e; - let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_traces_maybe ~now httpc in - ()) - - let push_metrics e = - let@ () = guard_exn_ "push metrics" in - sample_gc_metrics_if_needed (); - push_to_batch batch_metrics e; - let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_metrics_maybe ~now httpc in - ()) - - let push_logs e = - let@ () = guard_exn_ "push logs" in - push_to_batch batch_logs e; - let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_logs_maybe ~now httpc in - ()) - - let set_on_tick_callbacks = set_on_tick_callbacks - - let tick_ () = - if Config.Env.get_debug () then - Printf.eprintf "tick (from %d)\n%!" (tid ()); - sample_gc_metrics_if_needed (); - List.iter - (fun f -> - try f () - with e -> - Printf.eprintf "on tick callback raised: %s\n" - (Printexc.to_string e)) - (Alist.get @@ Atomic.get on_tick_cbs_); - let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now httpc - and+ (_ : bool) = emit_logs_maybe ~now httpc - and+ (_ : bool) = emit_metrics_maybe ~now httpc in - () - - let () = setup_ticker_thread ~tick:tick_ ~finally:ignore () - - (* if called in a blocking context: work in the background *) - let tick () = Lwt.async tick_ - - let cleanup ~on_done () = - if Config.Env.get_debug () then - Printf.eprintf "opentelemetry: exiting…\n%!"; - Lwt.async (fun () -> - let* () = emit_all_force httpc in - Httpc.cleanup httpc; - on_done (); - Lwt.return ()) - end in - (module M) - -module Backend - (Arg : sig - val stop : bool Atomic.t - - val config : Config.t - end) - () : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) - - open Opentelemetry.Proto - open Opentelemetry.Collector - - let send_trace : Trace.resource_spans list sender = - { - send = - (fun l ~ret -> - (if Config.Env.get_debug () then - let@ () = Lock.with_lock in - Format.eprintf "send spans %a@." - (Format.pp_print_list Trace.pp_resource_spans) - l); - push_trace l; - ret ()); - } - - let last_sent_metrics = Atomic.make (Mtime_clock.now ()) - - let timeout_sent_metrics = Mtime.Span.(5 * s) - (* send metrics from time to time *) - - let signal_emit_gc_metrics () = - if Config.Env.get_debug () then - Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true - - let additional_metrics () : Metrics.resource_metrics list = - (* add exporter metrics to the lot? *) - let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now () in - let add_own_metrics = - let elapsed = Mtime.span last_emit now in - Mtime.Span.compare elapsed timeout_sent_metrics > 0 - in - - (* there is a possible race condition here, as several threads might update - metrics at the same time. But that's harmless. *) - if add_own_metrics then ( - Atomic.set last_sent_metrics now; - let open OT.Metrics in - [ - make_resource_metrics - [ - sum ~name:"otel.export.dropped" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel.export.errors" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]; - ] - ) else - [] - - let send_metrics : Metrics.resource_metrics list sender = - { - send = - (fun m ~ret -> - (if Config.Env.get_debug () then - let@ () = Lock.with_lock in - Format.eprintf "send metrics %a@." - (Format.pp_print_list Metrics.pp_resource_metrics) - m); - - let m = List.rev_append (additional_metrics ()) m in - push_metrics m; - ret ()); - } - - let send_logs : Logs.resource_logs list sender = - { - send = - (fun m ~ret -> - (if Config.Env.get_debug () then - let@ () = Lock.with_lock in - Format.eprintf "send logs %a@." - (Format.pp_print_list Logs.pp_resource_logs) - m); - - push_logs m; - ret ()); - } -end - -let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = - let module B = - Backend - (struct - let stop = stop - - let config = config - end) - () +let create_exporter ?stop ?(config = Config.make ()) () = + let consumer = create_consumer ?stop ~config () in + let bq = + Bounded_queue_sync.create + ~high_watermark:Bounded_queue.Defaults.high_watermark () in - (module B : OT.Collector.BACKEND) + Exporter_queued.create ~q:bq ~consumer () + |> Exporter_add_batching.add_batching ~config + +let create_backend = create_exporter let setup_ ?stop ?config () : unit = let backend = create_backend ?stop ?config () in - OT.Collector.set_backend backend; + Main_exporter.set backend; () let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () -let remove_backend () : unit Lwt.t = +let remove_exporter () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in - OT.Collector.remove_backend ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); + Main_exporter.remove ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); done_fut +let remove_backend = remove_exporter + let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t = if enable then ( @@ -504,10 +143,10 @@ let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t Lwt.catch (fun () -> let* res = f () in - let+ () = remove_backend () in + let+ () = remove_exporter () in res) (fun exn -> - let* () = remove_backend () in - reraise exn) + let* () = remove_exporter () in + Lwt.reraise exn) ) else f () diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index c57d9653..2f12121f 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -12,14 +12,20 @@ val set_headers : (string * string) list -> unit module Config = Config -val create_backend : +val create_consumer : ?stop:bool Atomic.t -> ?config:Config.t -> unit -> - (module Opentelemetry.Collector.BACKEND) -(** Create a new backend using lwt and cohttp + Opentelemetry_client.Consumer.any_resource_builder +(** Consumer that pulls from a queue *) - NOTE [after_cleanup] optional parameter removed @since 0.12 *) +val create_exporter : + ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +(** Create a new backend using lwt and ezcurl-lwt *) + +val create_backend : + ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +[@@deprecated "use create_exporter"] val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 4617754d..50cd834b 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -14,27 +14,13 @@ let get_headers = Config.Env.get_headers type error = Export_error.t -(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) -let n_errors = Atomic.make 0 - -let report_err_ = Export_error.report_err +open struct + module IO = Opentelemetry_client_lwt.Io_lwt +end (** HTTP client *) -module Httpc : sig - type t - - val create : unit -> t - - val send : - t -> - url:string -> - decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> - string -> - ('a, error) result Lwt.t - - val cleanup : t -> unit -end = struct - open Opentelemetry.Proto +module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct + module IO = IO open Lwt.Syntax type t = Curl.t @@ -43,7 +29,7 @@ end = struct let cleanup self = Ezcurl_core.delete self - (* send the content to the remote endpoint/path *) + (** send the content to the remote endpoint/path *) let send (self : t) ~url ~decode (bod : string) : ('a, error) result Lwt.t = let* r = let headers = @@ -86,139 +72,12 @@ end = struct Lwt.return (Error err) end -module Consumer_impl = struct - module CNotifier = Opentelemetry_client_lwt.Notifier - open Lwt.Syntax - - type state = { - stop: bool Atomic.t; - cleaned: bool Atomic.t; (** True when we cleaned up after closing *) - config: Config.t; - q: Any_resource.t Bounded_queue.t; - notify: CNotifier.t; - } - - let shutdown self = - Atomic.set self.stop true; - if not (Atomic.exchange self.cleaned true) then ( - CNotifier.trigger self.notify; - CNotifier.delete self.notify - ) - - let send_http_ (self : state) ~backoff (httpc : Httpc.t) ~url (data : string) - : unit Lwt.t = - let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in - match r with - | Ok () -> - Util_backoff.on_success backoff; - Lwt.return () - | Error `Sysbreak -> - Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set self.stop true; - Lwt.return () - | Error err -> - Atomic.incr n_errors; - let dur_s = Util_backoff.cur_duration_s backoff in - Util_backoff.on_error backoff; - report_err_ err; - Lwt_unix.sleep (dur_s +. Random.float (dur_s /. 10.)) - - let send_metrics_http (st : state) client ~encoder - (l : Proto.Metrics.resource_metrics list) = - let msg = Signal.Encode.metrics ~encoder l in - Pbrt.Encoder.reset encoder; - send_http_ st client msg ~url:st.config.url_metrics - - let send_traces_http st client ~encoder (l : Proto.Trace.resource_spans list) - = - let msg = Signal.Encode.traces ~encoder l in - Pbrt.Encoder.reset encoder; - send_http_ st client msg ~url:st.config.url_traces - - let send_logs_http st client ~encoder (l : Proto.Logs.resource_logs list) = - let msg = Signal.Encode.logs ~encoder l in - Pbrt.Encoder.reset encoder; - send_http_ st client msg ~url:st.config.url_logs - - let tick (self : state) = CNotifier.trigger self.notify - - let start_worker (self : state) : unit = - let client = Httpc.create () in - let encoder = Pbrt.Encoder.create () in - let backoff = Util_backoff.create () in - - (* loop on [q] *) - let rec loop () : unit Lwt.t = - if Atomic.get self.stop then - Lwt.return () - else - let* () = - match Bounded_queue.try_pop self.q with - | `Closed -> - shutdown self; - Lwt.return () - | `Empty -> CNotifier.wait self.notify - | `Item (R_logs logs) -> - send_logs_http self client ~backoff ~encoder logs - | `Item (R_metrics ms) -> - send_metrics_http self client ~encoder ~backoff ms - | `Item (R_spans spans) -> - send_traces_http self client ~encoder ~backoff spans - in - loop () - in - - Lwt.async (fun () -> - Lwt.finalize loop (fun () -> - Httpc.cleanup client; - Lwt.return ())) - - let default_n_workers = 50 - - let create_state ~stop ~config ~q () : state = - let self = - { - stop; - config; - q; - cleaned = Atomic.make false; - notify = CNotifier.create (); - } - in - - (* start workers *) - let n_workers = - min 2 - (max 500 - (Option.value ~default:default_n_workers - config.http_concurrency_level)) - in - for _i = 1 to n_workers do - start_worker self - done; - - self - - let to_consumer (self : state) : Any_resource.t Consumer.t = - let active () = not (Atomic.get self.stop) in - let shutdown ~on_done = - shutdown self; - on_done () - in - let tick () = tick self in - { active; tick; shutdown } - - let consumer ~stop ~config () : Consumer.any_resource_builder = - { - start_consuming = - (fun q -> - let st = create_state ~stop ~config ~q () in - to_consumer st); - } -end +module Consumer_impl = + Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt) + (Httpc) let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = - Consumer_impl.consumer ~stop ~config () + Consumer_impl.consumer ~ticker_task:(Some 0.5) ~stop ~config () let create_exporter ?stop ?(config = Config.make ()) () = let consumer = create_consumer ?stop ~config () in diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index edba7a0a..5906ace0 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -12,160 +12,83 @@ let get_headers = Config.Env.get_headers let set_headers = Config.Env.set_headers -let n_errors = Atomic.make 0 - let n_bytes_sent : int Atomic.t = Atomic.make 0 -module Consumer_impl = struct - type state = { - bq: Any_resource.t Bounded_queue.t; (** Queue of incoming workload *) - stop: bool Atomic.t; - config: Config.t; - mutable send_threads: Thread.t array; - (** Threads that send data via http *) - mcond: Util_thread.MCond.t; (** how to wait for the queue *) - } +type error = Export_error.t - let shutdown self : unit = - Atomic.set self.stop true; - (* wakeup sleepers *) - Util_thread.MCond.signal self.mcond +open struct + module Notifier = Notifier_sync - let send_http_ (self : state) (client : Curl.t) ~backoff ~url (data : string) - : unit = - let@ _sc = - Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http" - in + module IO : Generic_io.S_WITH_CONCURRENCY with type 'a t = 'a = struct + include Generic_io.Direct_style - (* avoid crazy error loop *) - let sleep_with_backoff () = - let dur_s = Util_backoff.cur_duration_s backoff in - Util_backoff.on_error backoff; - Thread.delay (dur_s +. Random.float (dur_s /. 10.)) - in + let sleep_s = Thread.delay - if Config.Env.get_debug () then - Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url - (String.length data); - let headers = - ("Content-Type", "application/x-protobuf") :: self.config.common.headers - in - match - let@ _sc = - Self_trace.with_ ~kind:Span_kind_internal "curl.post" - ~attrs:[ "size", `Int (String.length data); "url", `String url ] - in - Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () - with - | Ok { code; _ } when code >= 200 && code < 300 -> - Util_backoff.on_success backoff; - if Config.Env.get_debug () then - Printf.eprintf "opentelemetry: got response code=%d\n%!" code - | Ok { code; body; headers = _; info = _ } -> - Atomic.incr n_errors; - Self_trace.add_event _sc - @@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]; - - if Config.Env.get_debug () then ( - let err = Export_error.decode_invalid_http_response ~url ~code body in - Export_error.report_err err; - () - ); - - sleep_with_backoff () - | exception Sys.Break -> - Printf.eprintf "ctrl-c captured, stopping\n%!"; - shutdown self - | Error (code, msg) -> - Atomic.incr n_errors; - - Printf.eprintf - "opentelemetry: export failed:\n %s\n curl code: %s\n url: %s\n%!" - msg (Curl.strerror code) url; - - sleep_with_backoff () - - (** The main loop of a thread that, reads from [bq] to get the next message to - send via http *) - let bg_thread_loop (self : state) : unit = - Ezcurl.with_client ?set_opts:None @@ fun client -> - (* we need exactly one encoder per thread *) - let encoder = Pbrt.Encoder.create ~size:2048 () in - let backoff = Util_backoff.create () in - - let send ~name ~url ~conv ~backoff (signals : _ list) : unit = - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer name - ~attrs:[ "n", `Int (List.length signals) ] - in - - let msg : string = conv ?encoder:(Some encoder) signals in - Pbrt.Encoder.reset encoder; - - ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); - send_http_ self client msg ~backoff ~url; - () - in - while not (Atomic.get self.stop) do - match Bounded_queue.try_pop self.bq with - | `Closed -> shutdown self - | `Empty -> Util_thread.MCond.wait self.mcond - | `Item (Any_resource.R_spans tr) -> - send ~name:"send-traces" ~conv:Signal.Encode.traces ~backoff - ~url:self.config.common.url_traces tr - | `Item (Any_resource.R_metrics ms) -> - send ~name:"send-metrics" ~conv:Signal.Encode.metrics ~backoff - ~url:self.config.common.url_metrics ms - | `Item (Any_resource.R_logs logs) -> - send ~name:"send-logs" ~conv:Signal.Encode.logs ~backoff - ~url:self.config.common.url_logs logs - done - - let to_consumer (self : state) : _ Consumer.t = - let active () = not (Atomic.get self.stop) in - let tick () = - (* make sure to poll from time to time *) - Util_thread.MCond.signal self.mcond - in - let shutdown ~on_done = - shutdown self; - on_done () - in - { tick; active; shutdown } - - let create_state ~stop ~(config : Config.t) ~q () : state = - let n_send_threads = min 100 @@ max 2 config.bg_threads in - - let self = - { - stop; - config; - send_threads = [||]; - bq = q; - mcond = Util_thread.MCond.create (); - } - in - - Util_thread.MCond.wakeup_from_bq self.mcond q; - - self.send_threads <- - Array.init n_send_threads (fun _i -> - Util_thread.start_bg_thread (fun () -> bg_thread_loop self)); - - self - - let create ~stop ~config () : Consumer.any_resource_builder = - { - start_consuming = - (fun q -> - let st = create_state ~stop ~config ~q () in - to_consumer st); - } + let[@inline] spawn f = ignore (Util_thread.start_bg_thread f : Thread.t) + end end +module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct + module IO = IO + + type t = Curl.t + + let create () = Ezcurl.make () + + let cleanup = Ezcurl.delete + + let send (self : t) ~url ~decode (bod : string) : ('a, error) result = + let r = + let headers = + ("Content-Type", "application/x-protobuf") + :: ("Accept", "application/x-protobuf") + :: Config.Env.get_headers () + in + Ezcurl.post ~client:self ~headers ~params:[] ~url ~content:(`String bod) + () + in + match r with + | Error (code, msg) -> + let err = + `Failure + (spf + "sending signals via http POST failed:\n\ + \ %s\n\ + \ curl code: %s\n\ + \ url: %s\n\ + %!" + msg (Curl.strerror code) url) + in + Error err + | Ok { code; body; _ } when code >= 200 && code < 300 -> + (match decode with + | `Ret x -> Ok x + | `Dec f -> + let dec = Pbrt.Decoder.of_string body in + (try Ok (f dec) + with e -> + let bt = Printexc.get_backtrace () in + Error + (`Failure + (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt)))) + | Ok { code; body; _ } -> + let err = Export_error.decode_invalid_http_response ~url ~code body in + Error err +end + +module Consumer_impl = Generic_http_consumer.Make (IO) (Notifier) (Httpc) + let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () : Opentelemetry_client.Consumer.any_resource_builder = - Consumer_impl.create ~stop ~config () + let n_workers = max 2 (min 32 config.bg_threads) in + let ticker_task = + if config.ticker_thread then + Some (float config.ticker_interval_ms /. 1000.) + else + None + in + Consumer_impl.consumer ~override_n_workers:n_workers ~ticker_task ~stop + ~config:config.common () let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t = let consumer = consumer ?stop ~config () in