From d7da4c44438b4da7405c21bc691d97343e8c6efb Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Dec 2025 20:06:45 -0500 Subject: [PATCH] feat client: overhaul of bounded queue; generic_consumer --- src/client/any_resource.ml | 7 + src/client/bounded_queue.ml | 138 +++++++++----- src/client/bounded_queue_sync.ml | 6 +- src/client/consumer.ml | 22 ++- src/client/dune | 1 - src/client/exporter_queued.ml | 43 ++--- src/client/exporter_stdout.ml | 12 +- src/client/generic_consumer.ml | 220 ++++++++++++++++++++++ src/client/generic_consumer_exporter.ml | 9 +- src/client/generic_http_consumer.ml | 234 ++++-------------------- src/client/generic_notifier.ml | 2 +- src/client/io_sync.ml | 5 + src/client/io_sync.mli | 2 + src/client/lwt/notifier_lwt.ml | 4 +- src/client/notifier_sync.ml | 4 +- src/client/signal.ml | 6 + src/client/signal.mli | 2 + 17 files changed, 429 insertions(+), 288 deletions(-) create mode 100644 src/client/generic_consumer.ml create mode 100644 src/client/io_sync.ml create mode 100644 src/client/io_sync.mli diff --git a/src/client/any_resource.ml b/src/client/any_resource.ml index a85d425c..327ae11c 100644 --- a/src/client/any_resource.ml +++ b/src/client/any_resource.ml @@ -1,3 +1,4 @@ +open Common_ open Opentelemetry.Proto (** A resource *) @@ -31,3 +32,9 @@ let of_metrics ?service_name ?attrs m : t = let of_metrics_or_empty ?service_name ?attrs ms = of_x_or_empty ?service_name ?attrs ~f:of_metrics ms + +let of_signal_l ?service_name ?attrs (s : OTEL.Any_signal_l.t) : t = + match s with + | Logs logs -> of_logs ?service_name ?attrs logs + | Spans sp -> of_spans ?service_name ?attrs sp + | Metrics ms -> of_metrics ?service_name ?attrs ms diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index 7419fd59..709fcead 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -12,24 +12,101 @@ type 'a pop_result = | `Item of 'a ] -type 'a t = { - push: 'a list -> unit; - (** Push items. This might discard some of them. - @raise Closed if the queue is closed. *) - num_discarded: unit -> int; (** How many items were discarded? *) - on_non_empty: (unit -> unit) -> unit; - (** [on_non_empty f] registers [f] to be called whenever the queue - transitions from empty to non-empty. *) - try_pop: unit -> 'a pop_result; (** Try to pop an item right now. *) - close: unit -> unit; - (** Close the queue. Items currently in the queue will still be accessible - to consumers until the queue is emptied out. Idempotent. *) - closed: unit -> bool; - (** Is the queue closed {b for writing}. Consumers should only use - [try_pop] because a queue that's closed-for-writing might still - contain straggler items that need to be consumed. +module Common = struct + type t = { + closed: unit -> bool; + (** Is the queue closed {b for writing}. Consumers should only use + [try_pop] because a queue that's closed-for-writing might still + contain straggler items that need to be consumed. - This should be as fast and cheap as possible. *) + This should be as fast and cheap as possible. *) + num_discarded: unit -> int; (** How many items were discarded? *) + } + + let[@inline] num_discarded self = self.num_discarded () + + let[@inline] closed (self : t) : bool = self.closed () +end + +(** Receiving side *) +module Recv = struct + type 'a t = { + on_non_empty: (unit -> unit) -> unit; + (** [on_non_empty f] registers [f] to be called whenever the queue + transitions from empty to non-empty. *) + try_pop: unit -> 'a pop_result; (** Try to pop an item right now. *) + common: Common.t; + } + + let[@inline] try_pop (self : _ t) : _ pop_result = self.try_pop () + + let[@inline] on_non_empty (self : _ t) f = self.on_non_empty f + + let[@inline] closed (self : _ t) : bool = self.common.closed () + + let[@inline] num_discarded self = self.common.num_discarded () + + let map (type a b) (f : a -> b) (self : a t) : b t = + { + self with + try_pop = + (fun () -> + match self.try_pop () with + | (`Closed | `Empty) as r -> r + | `Item x -> `Item (f x)); + } +end + +(** Sending side *) +module Send = struct + type 'a t = { + push: 'a list -> unit; + (** Push items. This might discard some of them. + @raise Closed if the queue is closed. *) + close: unit -> unit; + (** Close the queue. Items currently in the queue will still be + accessible to consumers until the queue is emptied out. Idempotent. + *) + common: Common.t; + } + + let[@inline] push (self : _ t) x : unit = self.push x + + let[@inline] close (self : _ t) : unit = self.close () + + let[@inline] closed (self : _ t) : bool = self.common.closed () + + let[@inline] num_discarded self = self.common.num_discarded () + + let map (type a b) (f : a list -> b list) (self : b t) : a t = + { + self with + push = + (fun xs -> + match f xs with + | [] -> () + | ys -> self.push ys); + } + + (** Turn the writing end of the queue into an emitter. + @param close_queue_on_close + if true, closing the emitter will close the queue *) + let to_emitter ~close_queue_on_close (self : 'a t) : + 'a Opentelemetry_emitter.Emitter.t = + let closed () = closed self in + let enabled () = not (closed ()) in + let emit x = if x <> [] then push self x in + let tick ~now:_ = () in + + (* NOTE: we cannot actually flush, only close. Emptying the queue is + fundamentally asynchronous because it's done by consumers *) + let flush_and_close () = if close_queue_on_close then close self in + { closed; enabled; emit; tick; flush_and_close } +end + +type 'a t = { + send: 'a Send.t; + recv: 'a Recv.t; } (** A bounded queue, with multiple producers and potentially multiple consumers. @@ -37,33 +114,6 @@ type 'a t = { to be depending on the context (e.g. a Lwt-specific queue implementation will consume only from the Lwt thread). *) -let[@inline] push (self : _ t) x : unit = self.push x - -let[@inline] num_discarded self = self.num_discarded () - -let[@inline] try_pop (self : _ t) : _ pop_result = self.try_pop () - -let[@inline] on_non_empty (self : _ t) f = self.on_non_empty f - -let[@inline] close (self : _ t) : unit = self.close () - -let[@inline] closed (self : _ t) : bool = self.closed () - -(** Turn the writing end of the queue into an emitter. - @param close_queue_on_close - if true, closing the emitter will close the queue *) -let to_emitter ~close_queue_on_close (self : 'a t) : - 'a Opentelemetry_emitter.Emitter.t = - let closed () = self.closed () in - let enabled () = not (closed ()) in - let emit x = if x <> [] then push self x in - let tick ~now:_ = () in - - (* NOTE: we cannot actually flush, only close. Emptying the queue is - fundamentally asynchronous because it's done by consumers *) - let flush_and_close () = if close_queue_on_close then close self in - { closed; enabled; emit; tick; flush_and_close } - module Defaults = struct (** The default high watermark *) let high_watermark : int = 2048 diff --git a/src/client/bounded_queue_sync.ml b/src/client/bounded_queue_sync.ml index f8bf507d..f8778ea9 100644 --- a/src/client/bounded_queue_sync.ml +++ b/src/client/bounded_queue_sync.ml @@ -113,7 +113,11 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t = (* waiters will want to know *) Cb_set.trigger self.on_non_empty in - { BQ.push; num_discarded; try_pop; on_non_empty; close; closed } + let common = { BQ.Common.closed; num_discarded } in + { + BQ.send = { push; close; common }; + recv = { try_pop; on_non_empty; common }; + } let create ~high_watermark () : _ BQ.t = let st = diff --git a/src/client/consumer.ml b/src/client/consumer.ml index e22af2f0..48b5008f 100644 --- a/src/client/consumer.ml +++ b/src/client/consumer.ml @@ -2,7 +2,7 @@ open Common_ -type 'a t = { +type t = { active: unit -> Aswitch.t; shutdown: unit -> unit; (** Shutdown the consumer as soon as possible. [active] will be turned off @@ -14,11 +14,11 @@ type 'a t = { } (** A consumer for signals of type ['a] *) -type 'a consumer = 'a t +type consumer = t -let[@inline] active (self : _ t) : Aswitch.t = self.active () +let[@inline] active (self : t) : Aswitch.t = self.active () -let[@inline] shutdown (self : _ t) : unit = self.shutdown () +let[@inline] shutdown (self : t) : unit = self.shutdown () let[@inline] self_metrics self : _ list = self.self_metrics () @@ -26,13 +26,23 @@ let[@inline] self_metrics self : _ list = self.self_metrics () let on_stop self f = Aswitch.on_turn_off (self.active ()) f module Builder = struct - type 'a t = { start_consuming: 'a Bounded_queue.t -> 'a consumer } + type 'a t = { start_consuming: 'a Bounded_queue.Recv.t -> consumer } (** A builder that will create a consumer for a given queue, start the consumer so it starts consuming from the queue, and return the consumer. *) let start_consuming (self : _ t) bq = self.start_consuming bq + + let map (type a b) (f : a -> b) (self : b t) : a t = + { + start_consuming = + (fun q -> + let q = Bounded_queue.Recv.map f q in + self.start_consuming q); + } end +type any_signal_l_builder = OTEL.Any_signal_l.t Builder.t + type any_resource_builder = Any_resource.t Builder.t -(** The type that's useful for OTEL backends *) +(** The type that's useful for HTTP backends *) diff --git a/src/client/dune b/src/client/dune index 5b3f6fde..c1c61af4 100644 --- a/src/client/dune +++ b/src/client/dune @@ -9,7 +9,6 @@ opentelemetry.proto opentelemetry.domain pbrt - saturn mtime mtime.clock.os) (synopsis diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 969988d1..490c2320 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -8,23 +8,22 @@ module BQ_emitters = struct queue because we need to flush_and_close the other emitters first. The bounded queue is a shared resource. *) - let logs_emitter_of_bq ?service_name ?attrs - (q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t = - Bounded_queue.to_emitter q ~close_queue_on_close:false - |> Opentelemetry_emitter.Emitter.flat_map - (Any_resource.of_logs_or_empty ?service_name ?attrs) + let logs_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : + OTEL.Logger.t = + Bounded_queue.Send.to_emitter q ~close_queue_on_close:false + |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_logs_or_empty - let spans_emitter_of_bq ?service_name ?attrs - (q : Any_resource.t Bounded_queue.t) : OTEL.Tracer.t = - Bounded_queue.to_emitter q ~close_queue_on_close:false + let spans_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : + OTEL.Tracer.t = + Bounded_queue.Send.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map - (Any_resource.of_spans_or_empty ?service_name ?attrs) + OTEL.Any_signal_l.of_spans_or_empty - let metrics_emitter_of_bq ?service_name ?attrs - (q : Any_resource.t Bounded_queue.t) : OTEL.Metrics_emitter.t = - Bounded_queue.to_emitter q ~close_queue_on_close:false + let metrics_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : + OTEL.Metrics_emitter.t = + Bounded_queue.Send.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map - (Any_resource.of_metrics_or_empty ?service_name ?attrs) + OTEL.Any_signal_l.of_metrics_or_empty end (** Pair a queue with a consumer to build an exporter. @@ -33,20 +32,16 @@ end bounded queue; while the consumer takes them from the queue to forward them somewhere else, store them, etc. @param resource_attributes attributes added to every "resource" batch *) -let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) - ~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t = +let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) + ~(consumer : Consumer.any_signal_l_builder) () : OTEL.Exporter.t = let open Opentelemetry_emitter in let shutdown_started = Atomic.make false in let active, trigger = Aswitch.create () in - let consumer = consumer.start_consuming q in + let consumer = consumer.start_consuming q.recv in - let emit_spans = - BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q - in - let emit_logs = BQ_emitters.logs_emitter_of_bq ~attrs:resource_attributes q in - let emit_metrics = - BQ_emitters.metrics_emitter_of_bq ~attrs:resource_attributes q - in + let emit_spans = BQ_emitters.spans_emitter_of_bq q.send in + let emit_logs = BQ_emitters.logs_emitter_of_bq q.send in + let emit_metrics = BQ_emitters.metrics_emitter_of_bq q.send in let tick_set = Cb_set.create () in let tick () = Cb_set.trigger tick_set in @@ -61,7 +56,7 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) (* first, prevent further pushes to the queue. Consumer workers can still drain it. *) - Bounded_queue.close q; + Bounded_queue.Send.close q.send; (* shutdown consumer; once it's down it'll turn our switch off too *) Aswitch.link (Consumer.active consumer) trigger; diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index af2f1a61..6cab03aa 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -8,7 +8,7 @@ open struct let pp_span out (sp : OTEL.Span.t) = let open OTEL in Format.fprintf out - "@[<2>SPAN@ trace_id: %a@ span_id: %a@ name: %S@ start: %a@ end: %a@]@." + "@[<2>SPAN {@ trace_id: %a@ span_id: %a@ name: %S@ start: %a@ end: %a@]}" Trace_id.pp (Trace_id.of_bytes sp.trace_id) Span_id.pp @@ -16,6 +16,12 @@ open struct sp.name Timestamp_ns.pp_debug sp.start_time_unix_nano Timestamp_ns.pp_debug sp.end_time_unix_nano + let pp_log out l = + Format.fprintf out "@[<2>LOG %a@]" Proto.Logs.pp_log_record l + + let pp_metric out m = + Format.fprintf out "@[<2>METRICS %a@]" Proto.Metrics.pp_metric m + let pp_vlist mutex pp out l = if l != [] then ( let@ () = Util_mutex.protect mutex in @@ -55,8 +61,8 @@ let stdout : OTEL.Exporter.t = in let emit_spans = mk_emitter pp_span in - let emit_logs = mk_emitter Proto.Logs.pp_log_record in - let emit_metrics = mk_emitter Proto.Metrics.pp_metric in + let emit_logs = mk_emitter pp_log in + let emit_metrics = mk_emitter pp_metric in let shutdown () = Emitter.flush_and_close emit_spans; diff --git a/src/client/generic_consumer.ml b/src/client/generic_consumer.ml new file mode 100644 index 00000000..adacca4c --- /dev/null +++ b/src/client/generic_consumer.ml @@ -0,0 +1,220 @@ +open Common_ + +type error = Export_error.t + +(** Number of errors met during export *) +let n_errors = Atomic.make 0 + +module type IO = Generic_io.S_WITH_CONCURRENCY + +module type SENDER = sig + module IO : IO + + type t + + type config + + val create : config:config -> unit -> t + + val cleanup : t -> unit + + val send : t -> OTEL.Any_signal_l.t -> (unit, error) result IO.t +end + +module Make + (IO : IO) + (Notifier : Generic_notifier.S with type 'a IO.t = 'a IO.t) + (Sender : SENDER with type 'a IO.t = 'a IO.t) : sig + val consumer : + sender_config:Sender.config -> + n_workers:int -> + ticker_task:float option -> + unit -> + Consumer.any_signal_l_builder + (** Make a consumer builder, ie. a builder function that will take a bounded + queue of signals, and start a consumer to process these signals and send + them somewhere using HTTP. *) +end = struct + module Proto = Opentelemetry_proto + open IO + + type config = { + n_workers: int; + ticker_task: float option; + } + + type status = + | Active + | Shutting_down + | Stopped + + type state = { + active: Aswitch.t; (** Public facing switch *) + q: OTEL.Any_signal_l.t Bounded_queue.Recv.t; + status: status Atomic.t; + (** Internal status, including the shutting down process *) + notify: Notifier.t; + n_workers: int Atomic.t; (** Current number of workers *) + active_trigger: Aswitch.trigger; + config: config; + sender_config: Sender.config; + } + + let shutdown self : unit = + let old_status = + Util_atomic.update_cas self.status @@ fun status -> + match status with + | Stopped -> status, status + | Shutting_down -> status, status + | Active -> status, Shutting_down + in + + match old_status with + | Stopped -> () + | Shutting_down -> + (* last worker to stop will call [on_done] *) + () + | Active -> + (* notify potentially asleep workers *) + Notifier.trigger self.notify; + Notifier.delete self.notify + + let tick (self : state) = + if Aswitch.is_on self.active then Notifier.trigger self.notify + + (** Shutdown one worker, when the queue is closed *) + let shutdown_worker (self : state) : unit = + (* let tid = Thread.id @@ Thread.self () in + Printf.eprintf "worker %d: shutting down\n%!" tid; *) + if Atomic.fetch_and_add self.n_workers (-1) = 1 then ( + (* we were the last worker *) + (* Printf.eprintf "worker %d: last one!\n%!" tid; *) + Atomic.set self.status Stopped; + Aswitch.turn_off self.active_trigger + ) + + let send_signals (self : state) (sender : Sender.t) ~backoff + (sigs : OTEL.Any_signal_l.t) : unit IO.t = + let* r = Sender.send sender sigs in + match r with + | Ok () -> + Util_net_backoff.on_success backoff; + IO.return () + | Error `Sysbreak -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + shutdown self; + IO.return () + | Error err -> + Atomic.incr n_errors; + Export_error.report_err err; + (* avoid crazy error loop *) + let dur_s = Util_net_backoff.on_error backoff in + IO.sleep_s (dur_s +. Random.float (dur_s /. 10.)) + + let start_worker (self : state) : unit = + let sender = Sender.create ~config:self.sender_config () in + let backoff = Util_net_backoff.create () in + + (* loop on [q] *) + let rec loop () : unit IO.t = + (* first look at the queue, to drain it *) + match Bounded_queue.Recv.try_pop self.q with + | `Closed -> + (* this worker shuts down, others might still be busy *) + shutdown_worker self; + IO.return () + | `Item sigs -> + let* () = send_signals ~backoff self sender sigs in + loop () + | `Empty -> + (* Printf.eprintf "worker %d: empty queue\n%!" tid; *) + (match Atomic.get self.status with + | Stopped -> + assert false + (* shouldn't happen without us going through [Shutting_down] *) + | Shutting_down -> + shutdown_worker self; + IO.return () + | Active -> + let* () = Notifier.wait self.notify in + loop ()) + in + + IO.spawn (fun () -> + IO.protect loop ~finally:(fun () -> + Sender.cleanup sender; + IO.return ())) + + let start_ticker (self : state) ~(interval_s : float) : unit = + let rec loop () : unit IO.t = + match Atomic.get self.status with + | Stopped | Shutting_down -> IO.return () + | Active -> + let* () = IO.sleep_s interval_s in + if Aswitch.is_on self.active then tick self; + loop () + in + IO.spawn loop + + let create_state ~sender_config ~n_workers ~ticker_task ~q () : state = + let active, active_trigger = Aswitch.create () in + let config = { n_workers; ticker_task } in + let self = + { + active; + active_trigger; + status = Atomic.make Active; + n_workers = Atomic.make 0; + q; + notify = Notifier.create (); + config; + sender_config; + } + in + + (* start workers *) + let n_workers = min 2 (max 500 self.config.n_workers) in + + ignore (Atomic.fetch_and_add self.n_workers n_workers : int); + for _i = 1 to n_workers do + start_worker self + done; + + Notifier.register_bounded_queue self.notify q; + + (* start ticker *) + (match self.config.ticker_task with + | None -> () + | Some interval_s -> start_ticker self ~interval_s); + + self + + let self_metrics (self : state) : OTEL.Metrics.t list = + let open OTEL.Metrics in + let now = Mtime_clock.now () in + [ + sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true + [ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ]; + sum ~name:"otel-ocaml.export.discarded-by-bounded-queue" + ~is_monotonic:true + [ + int ~now:(Mtime.to_uint64_ns now) + (Bounded_queue.Recv.num_discarded self.q); + ]; + ] + + let to_consumer (self : state) : Consumer.t = + let shutdown () = shutdown self in + let tick () = tick self in + let self_metrics () = self_metrics self in + { active = (fun () -> self.active); tick; shutdown; self_metrics } + + let consumer ~sender_config ~n_workers ~ticker_task () : + Consumer.any_signal_l_builder = + { + start_consuming = + (fun q -> + let st = create_state ~sender_config ~n_workers ~ticker_task ~q () in + to_consumer st); + } +end diff --git a/src/client/generic_consumer_exporter.ml b/src/client/generic_consumer_exporter.ml index 65ff639e..d06dd492 100644 --- a/src/client/generic_consumer_exporter.ml +++ b/src/client/generic_consumer_exporter.ml @@ -26,7 +26,7 @@ end = struct active: Aswitch.t; (** Public facing switch *) active_trigger: Aswitch.trigger; status: status Atomic.t; (** Internal state, including shutdown *) - q: OTEL.Any_signal_l.t Bounded_queue.t; + q: OTEL.Any_signal_l.t Bounded_queue.Recv.t; notify: Notifier.t; exp: OTEL.Exporter.t; } @@ -62,7 +62,7 @@ end = struct let start_worker (self : state) : unit = (* loop on [q] *) let rec loop () : unit IO.t = - match Bounded_queue.try_pop self.q with + match Bounded_queue.Recv.try_pop self.q with | `Closed -> shutdown_worker self; IO.return () @@ -118,11 +118,12 @@ end = struct sum ~name:"otel-ocaml.export.discarded-by-bounded-queue" ~is_monotonic:true [ - int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q); + int ~now:(Mtime.to_uint64_ns now) + (Bounded_queue.Recv.num_discarded self.q); ]; ] - let to_consumer (self : state) : _ Consumer.t = + let to_consumer (self : state) : Consumer.t = let shutdown () = shutdown self in let tick () = tick self in let self_metrics () = self_metrics self in diff --git a/src/client/generic_http_consumer.ml b/src/client/generic_http_consumer.ml index b557249a..7b502a07 100644 --- a/src/client/generic_http_consumer.ml +++ b/src/client/generic_http_consumer.ml @@ -14,14 +14,14 @@ module type HTTPC = sig val create : unit -> t + val cleanup : t -> unit + val send : t -> url:string -> decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> string -> ('a, error) result IO.t - - val cleanup : t -> unit end module Make @@ -33,7 +33,7 @@ module Make ticker_task:float option -> config:Client_config.t -> unit -> - Consumer.any_resource_builder + Consumer.any_signal_l_builder (** Make a consumer builder, ie. a builder function that will take a bounded queue of signals, and start a consumer to process these signals and send them somewhere using HTTP. @@ -44,218 +44,52 @@ end = struct module Proto = Opentelemetry_proto open IO - type other_config = { - override_n_workers: int option; - ticker_task: float option; - } + module Sender : + Generic_consumer.SENDER + with module IO = IO + and type config = Client_config.t = struct + module IO = IO - type status = - | Active - | Shutting_down - | Stopped + type t = { + config: Client_config.t; + encoder: Pbrt.Encoder.t; + http: Httpc.t; + } - type state = { - active: Aswitch.t; (** Public facing switch *) - active_trigger: Aswitch.trigger; - status: status Atomic.t; - (** Internal status, including the shutting down process *) - config: Client_config.t; - other_config: other_config; - q: Any_resource.t Bounded_queue.t; - notify: Notifier.t; - n_workers: int Atomic.t; (** Current number of workers *) - } + type config = Client_config.t - let shutdown self : unit = - let old_status = - Util_atomic.update_cas self.status @@ fun status -> - match status with - | Stopped -> status, status - | Shutting_down -> status, status - | Active -> status, Shutting_down - in + let create ~config () : t = + { config; http = Httpc.create (); encoder = Pbrt.Encoder.create () } - match old_status with - | Stopped -> () - | Shutting_down -> - (* last worker to stop will call [on_done] *) - () - | Active -> - (* notify potentially asleep workers *) - Notifier.trigger self.notify; - Notifier.delete self.notify + let cleanup self = Httpc.cleanup self.http - let send_http_ (self : state) (httpc : Httpc.t) ~backoff ~url (data : string) - : unit IO.t = - let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in - match r with - | Ok () -> - Util_net_backoff.on_success backoff; - IO.return () - | Error `Sysbreak -> - Printf.eprintf "ctrl-c captured, stopping\n%!"; - shutdown self; - IO.return () - | Error err -> - Atomic.incr n_errors; - Export_error.report_err err; - (* avoid crazy error loop *) - let dur_s = Util_net_backoff.on_error backoff in - IO.sleep_s (dur_s +. Random.float (dur_s /. 10.)) + let send (self : t) (sigs : OTEL.Any_signal_l.t) : (unit, error) result IO.t + = + let res = Any_resource.of_signal_l sigs in + let url = + match res with + | R_logs _ -> self.config.url_logs + | R_spans _ -> self.config.url_traces + | R_metrics _ -> self.config.url_metrics + in + let data = Signal.Encode.any ~encoder:self.encoder res in + Httpc.send self.http ~url ~decode:(`Ret ()) data + end - let send_metrics_http (st : state) client ~encoder ~backoff - (l : Proto.Metrics.resource_metrics list) = - let msg = Signal.Encode.metrics ~encoder l in - send_http_ st client msg ~backoff ~url:st.config.url_metrics - - let send_traces_http st client ~encoder ~backoff - (l : Proto.Trace.resource_spans list) = - let msg = Signal.Encode.traces ~encoder l in - send_http_ st client msg ~backoff ~url:st.config.url_traces - - let send_logs_http st client ~encoder ~backoff - (l : Proto.Logs.resource_logs list) = - let msg = Signal.Encode.logs ~encoder l in - send_http_ st client msg ~backoff ~url:st.config.url_logs - - let tick (self : state) = - if Aswitch.is_on self.active then Notifier.trigger self.notify - - (** Shutdown one worker, when the queue is closed *) - let shutdown_worker (self : state) : unit = - (* let tid = Thread.id @@ Thread.self () in - Printf.eprintf "worker %d: shutting down\n%!" tid; *) - if Atomic.fetch_and_add self.n_workers (-1) = 1 then ( - (* we were the last worker *) - (* Printf.eprintf "worker %d: last one!\n%!" tid; *) - Atomic.set self.status Stopped; - Aswitch.turn_off self.active_trigger - ) - - let start_worker (self : state) : unit = - let client = Httpc.create () in - let encoder = Pbrt.Encoder.create () in - let backoff = Util_net_backoff.create () in - - (* loop on [q] *) - let rec loop () : unit IO.t = - (* first look at the queue, to drain it *) - match Bounded_queue.try_pop self.q with - | `Closed -> - (* this worker shuts down, others might still be busy *) - shutdown_worker self; - IO.return () - | `Item (R_logs logs) -> - let* () = send_logs_http self client ~encoder ~backoff logs in - loop () - | `Item (R_metrics ms) -> - let* () = send_metrics_http self client ~encoder ~backoff ms in - loop () - | `Item (R_spans spans) -> - let* () = send_traces_http self client ~encoder ~backoff spans in - loop () - | `Empty -> - (* Printf.eprintf "worker %d: empty queue\n%!" tid; *) - (match Atomic.get self.status with - | Stopped -> - assert false - (* shouldn't happen without us going through [Shutting_down] *) - | Shutting_down -> - shutdown_worker self; - IO.return () - | Active -> - let* () = Notifier.wait self.notify in - loop ()) - in - - IO.spawn (fun () -> - IO.protect loop ~finally:(fun () -> - Httpc.cleanup client; - IO.return ())) - - let start_ticker (self : state) ~(interval_s : float) : unit = - let rec loop () : unit IO.t = - match Atomic.get self.status with - | Stopped | Shutting_down -> IO.return () - | Active -> - let* () = IO.sleep_s interval_s in - if Aswitch.is_on self.active then tick self; - loop () - in - IO.spawn loop + module C = Generic_consumer.Make (IO) (Notifier) (Sender) let default_n_workers = 50 - let create_state ?override_n_workers ~ticker_task ~config ~q () : state = - let active, active_trigger = Aswitch.create () in - let other_config = { override_n_workers; ticker_task } in - let self = - { - active; - active_trigger; - status = Atomic.make Active; - config; - other_config; - q; - notify = Notifier.create (); - n_workers = Atomic.make 0; - } - in - - (* start workers *) + let consumer ?override_n_workers ~ticker_task ~(config : Client_config.t) () : + Consumer.any_signal_l_builder = let n_workers = min 2 (max 500 - (match - ( self.other_config.override_n_workers, - self.config.http_concurrency_level ) - with + (match override_n_workers, config.http_concurrency_level with | Some n, _ -> n | None, Some n -> n | None, None -> default_n_workers)) in - ignore (Atomic.fetch_and_add self.n_workers n_workers : int); - for _i = 1 to n_workers do - start_worker self - done; - - Notifier.register_bounded_queue self.notify q; - - (* start ticker *) - (match self.other_config.ticker_task with - | None -> () - | Some interval_s -> start_ticker self ~interval_s); - - self - - let self_metrics (self : state) : OTEL.Metrics.t list = - let open OTEL.Metrics in - let now = Mtime_clock.now () in - [ - sum ~name:"otel-ocaml.export.discarded-by-bounded-queue" - ~is_monotonic:true - [ - int ~now:(Mtime.to_uint64_ns now) (Bounded_queue.num_discarded self.q); - ]; - sum ~name:"otel-ocaml.export.errors" ~is_monotonic:true - [ int ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors) ]; - ] - - let to_consumer (self : state) : Any_resource.t Consumer.t = - let shutdown () = shutdown self in - let tick () = tick self in - let self_metrics () = self_metrics self in - { active = (fun () -> self.active); tick; shutdown; self_metrics } - - let consumer ?override_n_workers ~ticker_task ~config () : - Consumer.any_resource_builder = - { - start_consuming = - (fun q -> - let st = - create_state ?override_n_workers ~ticker_task ~config ~q () - in - to_consumer st); - } + C.consumer ~sender_config:config ~n_workers ~ticker_task () end diff --git a/src/client/generic_notifier.ml b/src/client/generic_notifier.ml index 0d3ea1d3..8005982c 100644 --- a/src/client/generic_notifier.ml +++ b/src/client/generic_notifier.ml @@ -13,5 +13,5 @@ module type S = sig val wait : t -> unit IO.t - val register_bounded_queue : t -> _ Bounded_queue.t -> unit + val register_bounded_queue : t -> _ Bounded_queue.Recv.t -> unit end diff --git a/src/client/io_sync.ml b/src/client/io_sync.ml new file mode 100644 index 00000000..884c8fd1 --- /dev/null +++ b/src/client/io_sync.ml @@ -0,0 +1,5 @@ +include Generic_io.Direct_style + +let sleep_s = Thread.delay + +let[@inline] spawn f = ignore (Util_thread.start_bg_thread f : Thread.t) diff --git a/src/client/io_sync.mli b/src/client/io_sync.mli new file mode 100644 index 00000000..303a099e --- /dev/null +++ b/src/client/io_sync.mli @@ -0,0 +1,2 @@ +include Generic_io.S_WITH_CONCURRENCY with type 'a t = 'a +(** Generic IO with [spawn] starting a background thread *) diff --git a/src/client/lwt/notifier_lwt.ml b/src/client/lwt/notifier_lwt.ml index 83d8dfbe..9ec178ae 100644 --- a/src/client/lwt/notifier_lwt.ml +++ b/src/client/lwt/notifier_lwt.ml @@ -36,5 +36,5 @@ let trigger (self : t) : unit = let wait (self : t) : unit Lwt.t = Lwt_condition.wait self.cond -let register_bounded_queue (self : t) (q : _ Bounded_queue.t) : unit = - Bounded_queue.on_non_empty q (fun () -> trigger self) +let register_bounded_queue (self : t) (q : _ Bounded_queue.Recv.t) : unit = + Bounded_queue.Recv.on_non_empty q (fun () -> trigger self) diff --git a/src/client/notifier_sync.ml b/src/client/notifier_sync.ml index f7590705..e62505ad 100644 --- a/src/client/notifier_sync.ml +++ b/src/client/notifier_sync.ml @@ -17,5 +17,5 @@ let wait self = Mutex.unlock self.mutex (** Ensure we get signalled when the queue goes from empty to non-empty *) -let register_bounded_queue (self : t) (bq : _ Bounded_queue.t) : unit = - Bounded_queue.on_non_empty bq (fun () -> trigger self) +let register_bounded_queue (self : t) (bq : _ Bounded_queue.Recv.t) : unit = + Bounded_queue.Recv.on_non_empty bq (fun () -> trigger self) diff --git a/src/client/signal.ml b/src/client/signal.ml index 92dfce2c..010d3b93 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -77,6 +77,12 @@ module Encode = struct ~ctor:(fun r -> Trace_service.make_export_trace_service_request ~resource_spans:r ()) ~enc:Trace_service.encode_pb_export_trace_service_request + + let any ?encoder (r : Any_resource.t) : string = + match r with + | R_logs l -> logs ?encoder l + | R_spans sp -> traces ?encoder sp + | R_metrics ms -> metrics ?encoder ms end module Decode = struct diff --git a/src/client/signal.mli b/src/client/signal.mli index a10f9e00..8bc8a5a3 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -48,6 +48,8 @@ module Encode : sig (** [traces ts] is a protobuf encoded string of the traces [ts] @param encoder provide an encoder state to reuse *) + + val any : ?encoder:Pbrt.Encoder.t -> Any_resource.t -> string end (** Decode signals from protobuf encoded strings, received over the wire *)