From 05608340e5be702de95629b10ac8dc3b41cb4e7e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 5 Dec 2025 15:50:09 -0500 Subject: [PATCH] client: lwt helpers, error helpers, thread utils, lwt notifier --- src/client/bounded_queue.ml | 5 +++ src/client/consumer.ml | 3 ++ src/client/export_error.ml | 50 +++++++++++++++++++++++++++ src/client/exporter_queued.ml | 4 ++- src/client/lwt/common_.ml | 1 + src/client/lwt/dune | 21 ++++++++++++ src/client/lwt/notifier.ml | 31 +++++++++++++++++ src/client/lwt/util_ticker.ml | 18 ++++++++++ src/client/util_thread.ml | 64 +++++++++++++++++++++++++++++++++++ 9 files changed, 196 insertions(+), 1 deletion(-) create mode 100644 src/client/export_error.ml create mode 100644 src/client/lwt/common_.ml create mode 100644 src/client/lwt/dune create mode 100644 src/client/lwt/notifier.ml create mode 100644 src/client/lwt/util_ticker.ml create mode 100644 src/client/util_thread.ml diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index 174c624e..63d79802 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -62,3 +62,8 @@ let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t = fundamentally asynchronous because it's done by consumers *) let flush_and_close () = close self in { closed; enabled; emit; tick; flush_and_close } + +module Defaults = struct + (** The default high watermark *) + let high_watermark : int = 2048 +end diff --git a/src/client/consumer.ml b/src/client/consumer.ml index 89288080..42bcf35d 100644 --- a/src/client/consumer.ml +++ b/src/client/consumer.ml @@ -25,3 +25,6 @@ module Builder = struct let start_consuming (self : _ t) bq = self.start_consuming bq end + +type any_resource_builder = Any_resource.t Builder.t +(** The type that's useful for OTEL backends *) diff --git a/src/client/export_error.ml b/src/client/export_error.ml new file mode 100644 index 00000000..b78447cb --- /dev/null +++ b/src/client/export_error.ml @@ -0,0 +1,50 @@ +open Common_ + +type t = + [ `Status of int * Opentelemetry.Proto.Status.status + | `Failure of string + | `Sysbreak + ] + +let str_to_hex (s : string) : string = + Opentelemetry_util.Util_bytes_.bytes_to_hex (Bytes.unsafe_of_string s) + +(** Report the error on stderr. *) +let report_err : t -> unit = function + | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" + | `Failure msg -> + Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg + | `Status + ( code, + { + Opentelemetry.Proto.Status.code = scode; + message; + details; + _presence = _; + } ) -> + let pp_details out l = + List.iter + (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) + l + in + Format.eprintf + "@[<2>opentelemetry: export failed with@ http code=%d@ status \ + {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." + code scode + (Bytes.unsafe_to_string message) + pp_details details + +let decode_invalid_http_response ~code ~url (body : string) : t = + try + let dec = Pbrt.Decoder.of_string body in + let status = Opentelemetry.Proto.Status.decode_pb_status dec in + `Status (code, status) + with e -> + let bt = Printexc.get_backtrace () in + `Failure + (Printf.sprintf + "httpc: decoding of status (url=%S, code=%d) failed with:\n\ + %s\n\ + HTTP body: %s\n\ + %s" + url code (Printexc.to_string e) (str_to_hex body) bt) diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 6ee9483e..55304959 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -30,7 +30,7 @@ end somewhere else, store them, etc. @param resource_attributes attributes added to every "resource" batch *) let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) - ~(consumer : Any_resource.t Consumer.t) () : OTEL.Exporter.t = + ~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t = let emit_spans = BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q in @@ -45,6 +45,8 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) let closed = Atomic.make false in + let consumer = consumer.start_consuming q in + let cleanup ~on_done () = if not (Atomic.exchange closed true) then ( Bounded_queue.close q; diff --git a/src/client/lwt/common_.ml b/src/client/lwt/common_.ml new file mode 100644 index 00000000..6a337b5c --- /dev/null +++ b/src/client/lwt/common_.ml @@ -0,0 +1 @@ +module OTEL = Opentelemetry diff --git a/src/client/lwt/dune b/src/client/lwt/dune new file mode 100644 index 00000000..2b7d082b --- /dev/null +++ b/src/client/lwt/dune @@ -0,0 +1,21 @@ +(library + (name opentelemetry_client_lwt) + (public_name opentelemetry-client.lwt) + (flags + :standard + -open + Opentelemetry_util + -open + Opentelemetry_client + -open + Opentelemetry_atomic) + (optional) ; lwt + (libraries + opentelemetry.core + opentelemetry.util + opentelemetry.atomic + opentelemetry.emitter + opentelemetry-client + lwt + lwt.unix) + (synopsis "Lwt-specific helpers for opentelemetry-client")) diff --git a/src/client/lwt/notifier.ml b/src/client/lwt/notifier.ml new file mode 100644 index 00000000..4b924183 --- /dev/null +++ b/src/client/lwt/notifier.ml @@ -0,0 +1,31 @@ +(** Notification that can be used on the consumer side of a bounded queue *) + +type t = { + notified: bool Atomic.t; + cond: unit Lwt_condition.t; + notification: int; + deleted: bool Atomic.t; +} + +let create () : t = + let notified = Atomic.make false in + let cond = Lwt_condition.create () in + let notification = + Lwt_unix.make_notification (fun () -> + Atomic.set notified false; + Lwt_condition.broadcast cond ()) + in + { notified; notification; cond; deleted = Atomic.make false } + +let delete self : unit = + if not (Atomic.exchange self.deleted true) then + Lwt_unix.stop_notification self.notification + +let trigger (self : t) : unit = + if not (Atomic.exchange self.notified true) then + Lwt_unix.send_notification self.notification + +let wait (self : t) : unit Lwt.t = Lwt_condition.wait self.cond + +let register_bounded_queue (self : t) (q : _ Bounded_queue.t) : unit = + Bounded_queue.on_non_empty q (fun () -> trigger self) diff --git a/src/client/lwt/util_ticker.ml b/src/client/lwt/util_ticker.ml new file mode 100644 index 00000000..64c26da8 --- /dev/null +++ b/src/client/lwt/util_ticker.ml @@ -0,0 +1,18 @@ +open Common_ +open Lwt.Syntax + +(** Lwt task that calls [Exporter.tick] regularly, to help enforce timeouts. + @param frequency_s how often in seconds does the tick tock? *) +let start_ticker_thread ?(finally = ignore) ~(stop : bool Atomic.t) + ~(frequency_s : float) (exp : OTEL.Exporter.t) : unit = + let frequency_s = max frequency_s 0.5 in + let rec tick_loop () = + if Atomic.get stop then ( + finally (); + Lwt.return () + ) else + let* () = Lwt_unix.sleep frequency_s in + OTEL.Exporter.tick exp; + tick_loop () + in + Lwt.async tick_loop diff --git a/src/client/util_thread.ml b/src/client/util_thread.ml new file mode 100644 index 00000000..37764ac3 --- /dev/null +++ b/src/client/util_thread.ml @@ -0,0 +1,64 @@ +open Common_ + +(** start a thread in the background, running [f()], blocking signals *) +let start_bg_thread (f : unit -> unit) : Thread.t = + let unix_run () = + let signals = + [ + Sys.sigusr1; + Sys.sigusr2; + Sys.sigterm; + Sys.sigpipe; + Sys.sigalrm; + Sys.sigstop; + ] + in + ignore (Thread.sigmask Unix.SIG_BLOCK signals : _ list); + f () + in + (* no signals on Windows *) + let run () = + if Sys.win32 then + f () + else + unix_run () + in + Thread.create run () + +(** thread that calls [tick()] regularly, to help enforce timeouts *) +let setup_ticker_thread ~stop ~sleep_ms (exp : OTEL.Exporter.t) () = + let sleep_s = float sleep_ms /. 1000. in + let tick_loop () = + try + while not @@ Atomic.get stop do + Thread.delay sleep_s; + OTEL.Exporter.tick exp + done + with + | Sync_queue.Closed -> () + | exn -> + (* print and ignore *) + Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!" + (Printexc.to_string exn) + in + start_bg_thread tick_loop + +module MCond = struct + type t = { + mutex: Mutex.t; + cond: Condition.t; + } + + let create () : t = { mutex = Mutex.create (); cond = Condition.create () } + + let signal self = Condition.signal self.cond + + let[@inline] protect self f = Util_mutex.protect self.mutex f + + (** NOTE: the mutex must be acquired *) + let wait self = Condition.wait self.cond self.mutex + + (** Ensure we get signalled when the queue goes from empty to non-empty *) + let wakeup_from_bq (self : t) (bq : _ Bounded_queue.t) : unit = + Bounded_queue.on_non_empty bq (fun () -> signal self) +end