From f1437a842f8c430137ae82c2b87332b3a832a142 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 5 Dec 2025 22:57:23 -0500 Subject: [PATCH] feat client: generic consumer, notifier, etc. --- src/client/generic_http_consumer.ml | 206 ++++++++++++++++++ src/client/generic_io.ml | 28 +++ src/client/generic_notifier.ml | 17 ++ src/client/lwt/io_lwt.ml | 11 + src/client/lwt/io_lwt.mli | 1 + .../lwt/{notifier.ml => notifier_lwt.ml} | 2 + src/client/lwt/notifier_lwt.mli | 1 + src/client/notifier_sync.ml | 8 + src/client/notifier_sync.mli | 1 + src/client/signal.ml | 4 +- 10 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 src/client/generic_http_consumer.ml create mode 100644 src/client/generic_io.ml create mode 100644 src/client/generic_notifier.ml create mode 100644 src/client/lwt/io_lwt.ml create mode 100644 src/client/lwt/io_lwt.mli rename src/client/lwt/{notifier.ml => notifier_lwt.ml} (97%) create mode 100644 src/client/lwt/notifier_lwt.mli create mode 100644 src/client/notifier_sync.ml create mode 100644 src/client/notifier_sync.mli diff --git a/src/client/generic_http_consumer.ml b/src/client/generic_http_consumer.ml new file mode 100644 index 00000000..f9c3ba84 --- /dev/null +++ b/src/client/generic_http_consumer.ml @@ -0,0 +1,206 @@ +type error = Export_error.t + +(* TODO: emit this in a metric in [tick()] if self tracing is enabled? *) + +(** Number of errors met during export *) +let n_errors = Atomic.make 0 + +module type IO = Generic_io.S_WITH_CONCURRENCY + +module type HTTPC = sig + module IO : IO + + type t + + val create : unit -> t + + val send : + t -> + url:string -> + decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> + string -> + ('a, error) result IO.t + + val cleanup : t -> unit +end + +module Make + (IO : IO) + (Notifier : Generic_notifier.S with type 'a IO.t = 'a IO.t) + (Httpc : HTTPC with type 'a IO.t = 'a IO.t) : sig + val consumer : + ?override_n_workers:int -> + ticker_task:float option -> + stop:bool Atomic.t -> + config:Client_config.t -> + unit -> + Consumer.any_resource_builder + (** Create a consumer. + @param stop + shared stop variable, set to true to stop this (and maybe other tasks) + @param ticker_task + controls whether we start a task to call [tick] at the given interval in + seconds, or [None] to not start such a task at all. *) +end = struct + module Proto = Opentelemetry_proto + open IO + + type other_config = { + override_n_workers: int option; + ticker_task: float option; + } + + type state = { + stop: bool Atomic.t; + cleaned: bool Atomic.t; (** True when we cleaned up after closing *) + config: Client_config.t; + other_config: other_config; + q: Any_resource.t Bounded_queue.t; + notify: Notifier.t; + } + + let shutdown self = + Atomic.set self.stop true; + if not (Atomic.exchange self.cleaned true) then ( + Notifier.trigger self.notify; + Notifier.delete self.notify + ) + + let send_http_ (self : state) (httpc : Httpc.t) ~backoff ~url (data : string) + : unit IO.t = + let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in + match r with + | Ok () -> + Util_backoff.on_success backoff; + IO.return () + | Error `Sysbreak -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + Atomic.set self.stop true; + IO.return () + | Error err -> + Atomic.incr n_errors; + Export_error.report_err err; + (* avoid crazy error loop *) + let dur_s = Util_backoff.cur_duration_s backoff in + Util_backoff.on_error backoff; + IO.sleep_s (dur_s +. Random.float (dur_s /. 10.)) + + let send_metrics_http (st : state) client ~encoder ~backoff + (l : Proto.Metrics.resource_metrics list) = + let msg = Signal.Encode.metrics ~encoder l in + send_http_ st client msg ~backoff ~url:st.config.url_metrics + + let send_traces_http st client ~encoder ~backoff + (l : Proto.Trace.resource_spans list) = + let msg = Signal.Encode.traces ~encoder l in + send_http_ st client msg ~backoff ~url:st.config.url_traces + + let send_logs_http st client ~encoder ~backoff + (l : Proto.Logs.resource_logs list) = + let msg = Signal.Encode.logs ~encoder l in + send_http_ st client msg ~backoff ~url:st.config.url_logs + + let tick (self : state) = Notifier.trigger self.notify + + let start_worker (self : state) : unit = + let client = Httpc.create () in + let encoder = Pbrt.Encoder.create () in + let backoff = Util_backoff.create () in + + (* loop on [q] *) + let rec loop () : unit IO.t = + if Atomic.get self.stop then + IO.return () + else + let* () = + match Bounded_queue.try_pop self.q with + | `Closed -> + shutdown self; + IO.return () + | `Empty -> Notifier.wait self.notify + | `Item (R_logs logs) -> + send_logs_http self client ~encoder ~backoff logs + | `Item (R_metrics ms) -> + send_metrics_http self client ~encoder ~backoff ms + | `Item (R_spans spans) -> + send_traces_http self client ~encoder ~backoff spans + in + loop () + in + + IO.spawn (fun () -> + IO.protect loop ~finally:(fun () -> + Httpc.cleanup client; + IO.return ())) + + let start_ticker (self : state) ~(interval_s : float) : unit = + let rec loop () : unit IO.t = + if Atomic.get self.stop then + IO.return () + else + let* () = IO.sleep_s interval_s in + tick self; + loop () + in + IO.spawn loop + + let default_n_workers = 50 + + let create_state ?override_n_workers ~ticker_task ~stop ~config ~q () : state + = + let other_config = { override_n_workers; ticker_task } in + let self = + { + stop; + config; + other_config; + q; + cleaned = Atomic.make false; + notify = Notifier.create (); + } + in + + (* start workers *) + let n_workers = + min 2 + (max 500 + (match + ( self.other_config.override_n_workers, + self.config.http_concurrency_level ) + with + | Some n, _ -> n + | None, Some n -> n + | None, None -> default_n_workers)) + in + + for _i = 1 to n_workers do + start_worker self + done; + + (* start ticker *) + (match self.other_config.ticker_task with + | None -> () + | Some interval_s -> start_ticker self ~interval_s); + + self + + let to_consumer (self : state) : Any_resource.t Consumer.t = + let active () = not (Atomic.get self.stop) in + let shutdown ~on_done = + shutdown self; + on_done () + in + let tick () = tick self in + { active; tick; shutdown } + + let consumer ?override_n_workers ~ticker_task ~stop ~config () : + Consumer.any_resource_builder = + { + start_consuming = + (fun q -> + let st = + create_state ?override_n_workers ~ticker_task ~stop ~config ~q () + in + to_consumer st); + } +end diff --git a/src/client/generic_io.ml b/src/client/generic_io.ml new file mode 100644 index 00000000..9e297026 --- /dev/null +++ b/src/client/generic_io.ml @@ -0,0 +1,28 @@ +(** Generic IO *) +module type S = sig + type 'a t + + val return : 'a -> 'a t + + val ( let* ) : 'a t -> ('a -> 'b t) -> 'b t + + val protect : finally:(unit -> unit t) -> (unit -> 'a t) -> 'a t +end + +module type S_WITH_CONCURRENCY = sig + include S + + val sleep_s : float -> unit t + + val spawn : (unit -> unit t) -> unit +end + +module Direct_style : S with type 'a t = 'a = struct + type 'a t = 'a + + let[@inline] return x = x + + let[@inline] ( let* ) x f = f x + + let protect = Fun.protect +end diff --git a/src/client/generic_notifier.ml b/src/client/generic_notifier.ml new file mode 100644 index 00000000..0d3ea1d3 --- /dev/null +++ b/src/client/generic_notifier.ml @@ -0,0 +1,17 @@ +module type IO = Generic_io.S + +module type S = sig + module IO : IO + + type t + + val create : unit -> t + + val delete : t -> unit + + val trigger : t -> unit + + val wait : t -> unit IO.t + + val register_bounded_queue : t -> _ Bounded_queue.t -> unit +end diff --git a/src/client/lwt/io_lwt.ml b/src/client/lwt/io_lwt.ml new file mode 100644 index 00000000..d8dcece9 --- /dev/null +++ b/src/client/lwt/io_lwt.ml @@ -0,0 +1,11 @@ +type 'a t = 'a Lwt.t + +let return = Lwt.return + +let ( let* ) = Lwt.Syntax.( let* ) + +let sleep_s = Lwt_unix.sleep + +let spawn = Lwt.async + +let[@inline] protect ~finally f = Lwt.finalize f finally diff --git a/src/client/lwt/io_lwt.mli b/src/client/lwt/io_lwt.mli new file mode 100644 index 00000000..ec083176 --- /dev/null +++ b/src/client/lwt/io_lwt.mli @@ -0,0 +1 @@ +include Generic_io.S_WITH_CONCURRENCY with type 'a t = 'a Lwt.t diff --git a/src/client/lwt/notifier.ml b/src/client/lwt/notifier_lwt.ml similarity index 97% rename from src/client/lwt/notifier.ml rename to src/client/lwt/notifier_lwt.ml index 4b924183..88fd366e 100644 --- a/src/client/lwt/notifier.ml +++ b/src/client/lwt/notifier_lwt.ml @@ -1,5 +1,7 @@ (** Notification that can be used on the consumer side of a bounded queue *) +module IO = Io_lwt + type t = { notified: bool Atomic.t; cond: unit Lwt_condition.t; diff --git a/src/client/lwt/notifier_lwt.mli b/src/client/lwt/notifier_lwt.mli new file mode 100644 index 00000000..c16ae992 --- /dev/null +++ b/src/client/lwt/notifier_lwt.mli @@ -0,0 +1 @@ +include Generic_notifier.S with module IO = Io_lwt diff --git a/src/client/notifier_sync.ml b/src/client/notifier_sync.ml new file mode 100644 index 00000000..4ce44bb8 --- /dev/null +++ b/src/client/notifier_sync.ml @@ -0,0 +1,8 @@ +include Util_thread.MCond +module IO = Generic_io.Direct_style + +let delete = ignore + +let trigger = signal + +let register_bounded_queue = wakeup_from_bq diff --git a/src/client/notifier_sync.mli b/src/client/notifier_sync.mli new file mode 100644 index 00000000..f896ccb2 --- /dev/null +++ b/src/client/notifier_sync.mli @@ -0,0 +1 @@ +include Generic_notifier.S with type 'a IO.t = 'a diff --git a/src/client/signal.ml b/src/client/signal.ml index e3337c60..cde963de 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -48,7 +48,9 @@ module Encode = struct let x = ctor resource in let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in enc x encoder; - Pbrt.Encoder.to_string encoder + let data = Pbrt.Encoder.to_string encoder in + Pbrt.Encoder.reset encoder; + data let logs ?encoder resource_logs = resource_to_string ~encoder resource_logs