client: lwt helpers, error helpers, thread utils, lwt notifier

This commit is contained in:
Simon Cruanes 2025-12-05 15:50:09 -05:00
parent 703755e775
commit 05608340e5
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
9 changed files with 196 additions and 1 deletions

View file

@ -62,3 +62,8 @@ let to_emitter (self : 'a t) : 'a Opentelemetry_emitter.Emitter.t =
fundamentally asynchronous because it's done by consumers *) fundamentally asynchronous because it's done by consumers *)
let flush_and_close () = close self in let flush_and_close () = close self in
{ closed; enabled; emit; tick; flush_and_close } { closed; enabled; emit; tick; flush_and_close }
module Defaults = struct
(** The default high watermark *)
let high_watermark : int = 2048
end

View file

@ -25,3 +25,6 @@ module Builder = struct
let start_consuming (self : _ t) bq = self.start_consuming bq let start_consuming (self : _ t) bq = self.start_consuming bq
end end
type any_resource_builder = Any_resource.t Builder.t
(** The type that's useful for OTEL backends *)

View file

@ -0,0 +1,50 @@
open Common_
type t =
[ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string
| `Sysbreak
]
let str_to_hex (s : string) : string =
Opentelemetry_util.Util_bytes_.bytes_to_hex (Bytes.unsafe_of_string s)
(** Report the error on stderr. *)
let report_err : t -> unit = function
| `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!"
| `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status
( code,
{
Opentelemetry.Proto.Status.code = scode;
message;
details;
_presence = _;
} ) ->
let pp_details out l =
List.iter
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
l
in
Format.eprintf
"@[<2>opentelemetry: export failed with@ http code=%d@ status \
{@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@."
code scode
(Bytes.unsafe_to_string message)
pp_details details
let decode_invalid_http_response ~code ~url (body : string) : t =
try
let dec = Pbrt.Decoder.of_string body in
let status = Opentelemetry.Proto.Status.decode_pb_status dec in
`Status (code, status)
with e ->
let bt = Printexc.get_backtrace () in
`Failure
(Printf.sprintf
"httpc: decoding of status (url=%S, code=%d) failed with:\n\
%s\n\
HTTP body: %s\n\
%s"
url code (Printexc.to_string e) (str_to_hex body) bt)

View file

@ -30,7 +30,7 @@ end
somewhere else, store them, etc. somewhere else, store them, etc.
@param resource_attributes attributes added to every "resource" batch *) @param resource_attributes attributes added to every "resource" batch *)
let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t) let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
~(consumer : Any_resource.t Consumer.t) () : OTEL.Exporter.t = ~(consumer : Consumer.any_resource_builder) () : OTEL.Exporter.t =
let emit_spans = let emit_spans =
BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q BQ_emitters.spans_emitter_of_bq ~attrs:resource_attributes q
in in
@ -45,6 +45,8 @@ let create ?(resource_attributes = []) ~(q : Any_resource.t Bounded_queue.t)
let closed = Atomic.make false in let closed = Atomic.make false in
let consumer = consumer.start_consuming q in
let cleanup ~on_done () = let cleanup ~on_done () =
if not (Atomic.exchange closed true) then ( if not (Atomic.exchange closed true) then (
Bounded_queue.close q; Bounded_queue.close q;

View file

@ -0,0 +1 @@
module OTEL = Opentelemetry

21
src/client/lwt/dune Normal file
View file

@ -0,0 +1,21 @@
(library
(name opentelemetry_client_lwt)
(public_name opentelemetry-client.lwt)
(flags
:standard
-open
Opentelemetry_util
-open
Opentelemetry_client
-open
Opentelemetry_atomic)
(optional) ; lwt
(libraries
opentelemetry.core
opentelemetry.util
opentelemetry.atomic
opentelemetry.emitter
opentelemetry-client
lwt
lwt.unix)
(synopsis "Lwt-specific helpers for opentelemetry-client"))

View file

@ -0,0 +1,31 @@
(** Notification that can be used on the consumer side of a bounded queue *)
type t = {
notified: bool Atomic.t;
cond: unit Lwt_condition.t;
notification: int;
deleted: bool Atomic.t;
}
let create () : t =
let notified = Atomic.make false in
let cond = Lwt_condition.create () in
let notification =
Lwt_unix.make_notification (fun () ->
Atomic.set notified false;
Lwt_condition.broadcast cond ())
in
{ notified; notification; cond; deleted = Atomic.make false }
let delete self : unit =
if not (Atomic.exchange self.deleted true) then
Lwt_unix.stop_notification self.notification
let trigger (self : t) : unit =
if not (Atomic.exchange self.notified true) then
Lwt_unix.send_notification self.notification
let wait (self : t) : unit Lwt.t = Lwt_condition.wait self.cond
let register_bounded_queue (self : t) (q : _ Bounded_queue.t) : unit =
Bounded_queue.on_non_empty q (fun () -> trigger self)

View file

@ -0,0 +1,18 @@
open Common_
open Lwt.Syntax
(** Lwt task that calls [Exporter.tick] regularly, to help enforce timeouts.
@param frequency_s how often in seconds does the tick tock? *)
let start_ticker_thread ?(finally = ignore) ~(stop : bool Atomic.t)
~(frequency_s : float) (exp : OTEL.Exporter.t) : unit =
let frequency_s = max frequency_s 0.5 in
let rec tick_loop () =
if Atomic.get stop then (
finally ();
Lwt.return ()
) else
let* () = Lwt_unix.sleep frequency_s in
OTEL.Exporter.tick exp;
tick_loop ()
in
Lwt.async tick_loop

64
src/client/util_thread.ml Normal file
View file

@ -0,0 +1,64 @@
open Common_
(** start a thread in the background, running [f()], blocking signals *)
let start_bg_thread (f : unit -> unit) : Thread.t =
let unix_run () =
let signals =
[
Sys.sigusr1;
Sys.sigusr2;
Sys.sigterm;
Sys.sigpipe;
Sys.sigalrm;
Sys.sigstop;
]
in
ignore (Thread.sigmask Unix.SIG_BLOCK signals : _ list);
f ()
in
(* no signals on Windows *)
let run () =
if Sys.win32 then
f ()
else
unix_run ()
in
Thread.create run ()
(** thread that calls [tick()] regularly, to help enforce timeouts *)
let setup_ticker_thread ~stop ~sleep_ms (exp : OTEL.Exporter.t) () =
let sleep_s = float sleep_ms /. 1000. in
let tick_loop () =
try
while not @@ Atomic.get stop do
Thread.delay sleep_s;
OTEL.Exporter.tick exp
done
with
| Sync_queue.Closed -> ()
| exn ->
(* print and ignore *)
Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!"
(Printexc.to_string exn)
in
start_bg_thread tick_loop
module MCond = struct
type t = {
mutex: Mutex.t;
cond: Condition.t;
}
let create () : t = { mutex = Mutex.create (); cond = Condition.create () }
let signal self = Condition.signal self.cond
let[@inline] protect self f = Util_mutex.protect self.mutex f
(** NOTE: the mutex must be acquired *)
let wait self = Condition.wait self.cond self.mutex
(** Ensure we get signalled when the queue goes from empty to non-empty *)
let wakeup_from_bq (self : t) (bq : _ Bounded_queue.t) : unit =
Bounded_queue.on_non_empty bq (fun () -> signal self)
end