diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 2efc50fe..bee44a6f 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -14,89 +14,25 @@ let set_headers = Config.Env.set_headers let n_errors = Atomic.make 0 -let n_dropped = Atomic.make 0 - -(** Something to be sent via HTTP *) -module To_send = struct - open Opentelemetry.Proto - - type t = - | Send_metric of Metrics.resource_metrics list - | Send_trace of Trace.resource_spans list - | Send_logs of Logs.resource_logs list -end - -(** start a thread in the background, running [f()] *) -let start_bg_thread (f : unit -> unit) : Thread.t = - let unix_run () = - let signals = - [ - Sys.sigusr1; - Sys.sigusr2; - Sys.sigterm; - Sys.sigpipe; - Sys.sigalrm; - Sys.sigstop; - ] - in - ignore (Thread.sigmask Unix.SIG_BLOCK signals : _ list); - f () - in - (* no signals on Windows *) - let run () = - if Sys.win32 then - f () - else - unix_run () - in - Thread.create run () - -let str_to_hex (s : string) : string = - Opentelemetry_util.Util_bytes_.bytes_to_hex (Bytes.unsafe_of_string s) - -module Exporter_impl : sig - val n_bytes_sent : int Atomic.t - - class type t = object - inherit OTEL.Exporter.t - - method shutdown : on_done:(unit -> unit) -> unit -> unit - end - - val create : stop:bool Atomic.t -> config:Config.t -> unit -> t - - val shutdown : t -> on_done:(unit -> unit) -> unit -end = struct - open Opentelemetry.Proto - - let n_bytes_sent : int Atomic.t = Atomic.make 0 - - class type t = object - inherit OTEL.Exporter.t - - method shutdown : on_done:(unit -> unit) -> unit -> unit - end +let n_bytes_sent : int Atomic.t = Atomic.make 0 +module Consumer_impl = struct type state = { + bq: Any_resource.t Bounded_queue.t; (** Queue of incoming workload *) stop: bool Atomic.t; - cleaned: bool Atomic.t; (** True when we cleaned up after closing *) config: Config.t; - encoder_pool: Pbrt.Encoder.t Rpool.t; - send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *) - traces: Proto.Trace.span Batch.t; - logs: Proto.Logs.log_record Batch.t; - metrics: Proto.Metrics.metric Batch.t; - mutable send_threads: Thread.t array; (** Threads that send data via http *) + mutable send_threads: Thread.t array; + (** Threads that send data via http *) + cleaned: bool Atomic.t; (** True when we cleaned up after closing *) + mcond: Util_thread.MCond.t; (** how to wait for the queue *) } - let send_batch_ (self : state) ~force ~mk_to_send (b : _ Batch.t) : unit = - match Batch.pop_if_ready ~force ~now:(Mtime_clock.now ()) b with - | None -> () - | Some l -> - let to_send = mk_to_send l in - Sync_queue.push self.send_q to_send + let shutdown self : unit = + Atomic.set self.stop true; + (* wakeup sleepers *) + Util_thread.MCond.signal self.mcond - let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit = + let send_http_ (self : state) (client : Curl.t) ~url (data : string) : unit = let@ _sc = Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http" in @@ -105,12 +41,12 @@ end = struct Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url (String.length data); let headers = - ("Content-Type", "application/x-protobuf") :: config.common.headers + ("Content-Type", "application/x-protobuf") :: self.config.common.headers in match let@ _sc = Self_trace.with_ ~kind:Span_kind_internal "curl.post" - ~attrs:[ "sz", `Int (String.length data); "url", `String url ] + ~attrs:[ "size", `Int (String.length data); "url", `String url ] in Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () with @@ -123,22 +59,13 @@ end = struct @@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]; if Config.Env.get_debug () then ( - let dec = Pbrt.Decoder.of_string body in - let body = - try - let status = Status.decode_pb_status dec in - Format.asprintf "%a" Status.pp_status status - with _ -> - spf "(could not decode status)\nraw bytes: %s" (str_to_hex body) - in - Printf.eprintf - "opentelemetry: error while sending data to %s:\n code=%d\n %s\n%!" - url code body - ); - () + let err = Export_error.decode_invalid_http_response ~url ~code body in + Export_error.report_err err; + () + ) | exception Sys.Break -> Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set stop true + shutdown self | Error (code, msg) -> (* TODO: log error _via_ otel? *) Atomic.incr n_errors; @@ -150,182 +77,117 @@ end = struct (* avoid crazy error loop *) Thread.delay 3. - (** Thread that, in a loop, reads from [q] to get the next message to send via - http *) + (** The main loop of a thread that, reads from [bq] to get the next message to + send via http *) let bg_thread_loop (self : state) : unit = Ezcurl.with_client ?set_opts:None @@ fun client -> - let config = self.config in - let stop = self.stop in - let send ~name ~url ~conv (signals : _ list) = + (* we need exactly one encoder per thread *) + let encoder = Pbrt.Encoder.create ~size:2048 () in + + let send ~name ~url ~conv (signals : _ list) : unit = let@ _sp = Self_trace.with_ ~kind:Span_kind_producer name ~attrs:[ "n", `Int (List.length signals) ] in - let msg : string = - (* borrow encoder from buffer pool and turn [signals] into bytes *) - let@ encoder = Rpool.with_resource self.encoder_pool in - conv ?encoder:(Some encoder) signals - in + + let msg : string = conv ?encoder:(Some encoder) signals in + Pbrt.Encoder.reset encoder; + ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); - send_http_ ~stop ~config ~url client msg + send_http_ self client msg ~url; + () in - try - while not (Atomic.get stop) do - let msg = Sync_queue.pop self.send_q in - match msg with - | To_send.Send_trace tr -> - send ~name:"send-traces" ~conv:Signal.Encode.traces - ~url:config.common.url_traces tr - | To_send.Send_metric ms -> - send ~name:"send-metrics" ~conv:Signal.Encode.metrics - ~url:config.common.url_metrics ms - | To_send.Send_logs logs -> - send ~name:"send-logs" ~conv:Signal.Encode.logs - ~url:config.common.url_logs logs - done - with Sync_queue.Closed -> () + while not (Atomic.get self.stop) do + match Bounded_queue.try_pop self.bq with + | `Closed -> shutdown self + | `Empty -> Util_thread.MCond.wait self.mcond + | `Item (Any_resource.R_spans tr) -> + send ~name:"send-traces" ~conv:Signal.Encode.traces + ~url:self.config.common.url_traces tr + | `Item (Any_resource.R_metrics ms) -> + send ~name:"send-metrics" ~conv:Signal.Encode.metrics + ~url:self.config.common.url_metrics ms + | `Item (Any_resource.R_logs logs) -> + send ~name:"send-logs" ~conv:Signal.Encode.logs + ~url:self.config.common.url_logs logs + done - let batch_max_size_ = 200 - - let batch_timeout_ = Mtime.Span.(20 * s) - - let create_state ~stop ~config () : state = - let n_send_threads = max 2 config.Config.bg_threads in - let encoder_pool = - Rpool.create - ~mk_item:(fun () -> Pbrt.Encoder.create ~size:1024 ()) - ~max_size:32 ~clear:Pbrt.Encoder.reset () + let to_consumer (self : state) : _ Consumer.t = + let active () = not (Atomic.get self.stop) in + let tick () = + (* make sure to poll from time to time *) + Util_thread.MCond.signal self.mcond in + let shutdown ~on_done = + shutdown self; + on_done () + in + { tick; active; shutdown } + + let create_state ~stop ~(config : Config.t) ~q () : state = + let n_send_threads = min 100 @@ max 2 config.bg_threads in let self = { stop; config; send_threads = [||]; - send_q = Sync_queue.create (); - encoder_pool; + bq = q; cleaned = Atomic.make false; - traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); - logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); - metrics = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); + mcond = Util_thread.MCond.create (); } in + Util_thread.MCond.wakeup_from_bq self.mcond q; + self.send_threads <- Array.init n_send_threads (fun _i -> - start_bg_thread (fun () -> bg_thread_loop self)); + Util_thread.start_bg_thread (fun () -> bg_thread_loop self)); self - let maybe_send_metrics ~force (self : state) = - send_batch_ self ~force self.metrics ~mk_to_send:(fun metrics -> - let metrics = - Opentelemetry_client.Util_resources.make_resource_metrics metrics - in - To_send.Send_metric [ metrics ]) - - let maybe_send_logs ~force (self : state) = - send_batch_ self ~force self.logs ~mk_to_send:(fun logs -> - let logs = - Opentelemetry_client.Util_resources.make_resource_logs logs - in - To_send.Send_logs [ logs ]) - - let maybe_send_traces ~force (self : state) = - send_batch_ self ~force self.traces ~mk_to_send:(fun spans -> - let traces = - Opentelemetry_client.Util_resources.make_resource_spans spans - in - To_send.Send_trace [ traces ]) - - let[@inline] push_to_batch b e = - if e <> [] then ( - match Batch.push b e with - | `Ok -> () - | `Dropped -> Atomic.incr n_dropped - ) - - let create ~stop ~config () : #t = - let open Opentelemetry_util in - let st = create_state ~stop ~config () in - let ticker = Cb_set.create () in - object (self : #t) - method send_trace spans = - push_to_batch st.traces spans; - maybe_send_traces st ~force:false - - method send_metrics m = - push_to_batch st.metrics m; - maybe_send_metrics st ~force:false - - method send_logs m = - push_to_batch st.logs m; - maybe_send_logs st ~force:false - - method add_on_tick_callback cb = Cb_set.register ticker cb - - method tick () = Cb_set.trigger ticker - - method cleanup ~on_done () : unit = - if not (Atomic.exchange st.cleaned true) then ( - (* flush all signals *) - maybe_send_logs ~force:true st; - maybe_send_metrics ~force:true st; - maybe_send_traces ~force:true st; - - (* close send queues, then wait for all threads *) - Sync_queue.close st.send_q; - Array.iter Thread.join st.send_threads - ); - on_done () - - method shutdown ~on_done () = - Atomic.set st.stop true; - self#cleanup ~on_done () - end - - let shutdown (self : #t) ~on_done : unit = self#shutdown ~on_done () + let create ~stop ~config () : Consumer.any_resource_builder = + { + start_consuming = + (fun q -> + let st = create_state ~stop ~config ~q () in + to_consumer st); + } end -let create_exporter ?(stop = Atomic.make false) - ?(config : Config.t = Config.make ()) () : #OTEL.Exporter.t = - let backend = Exporter_impl.create ~stop ~config () in - (backend :> OTEL.Exporter.t) +let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () : + Opentelemetry_client.Consumer.any_resource_builder = + Consumer_impl.create ~stop ~config () -(** thread that calls [tick()] regularly, to help enforce timeouts *) -let setup_ticker_thread ~stop ~sleep_ms (exp : #OTEL.Exporter.t) () = - let sleep_s = float sleep_ms /. 1000. in - let tick_loop () = - try - while not @@ Atomic.get stop do - Thread.delay sleep_s; - exp#tick () - done - with - | Sync_queue.Closed -> () - | exn -> - (* print and ignore *) - Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!" - (Printexc.to_string exn) +let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t = + let consumer = consumer ?stop ~config () in + let bq = + Bounded_queue_sync.create + ~high_watermark:Bounded_queue.Defaults.high_watermark () in - start_bg_thread tick_loop + + Exporter_queued.create ~q:bq ~consumer () + |> Exporter_add_batching.add_batching ~config:config.common + +let create_backend = create_exporter let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () : unit = - let exporter = Exporter_impl.create ~stop ~config () in - OTEL.Exporter.Main_exporter.set exporter; + let exporter = create_exporter ~stop ~config () in + OTEL.Main_exporter.set exporter; Self_trace.set_enabled config.common.self_trace; if config.ticker_thread then ( (* at most a minute *) let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in - ignore (setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t) + ignore + (Util_thread.setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t) ) let remove_backend () : unit = (* we don't need the callback, this runs in the same thread *) - OTEL.Exporter.Main_exporter.remove () ~on_done:ignore + OTEL.Main_exporter.remove () ~on_done:ignore let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () @@ -337,4 +199,4 @@ let with_setup ?stop ?config ?(enable = true) () f = ) else f () -let[@inline] n_bytes_sent () = Atomic.get Exporter_impl.n_bytes_sent +let[@inline] n_bytes_sent () = Atomic.get n_bytes_sent diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index 77b8ea34..cece89df 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -4,7 +4,6 @@ *) open Opentelemetry_atomic -open Opentelemetry_util val get_headers : unit -> (string * string) list @@ -16,9 +15,20 @@ module Config = Config val n_bytes_sent : unit -> int (** Global counter of bytes sent (or attempted to be sent) *) +val consumer : + ?stop:bool Atomic.t -> + ?config:Config.t -> + unit -> + Opentelemetry_client.Consumer.any_resource_builder +(** Consumer that pulls from a queue *) + val create_exporter : ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_backend : + ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +[@@deprecated "use create_exporter"] + val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.