feat client-ocurl: port to just being a consumer

the rest is reusable components from opentelemetry-client
This commit is contained in:
Simon Cruanes 2025-12-05 15:50:27 -05:00
parent 7d0cfb7500
commit a95037d7e2
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 100 additions and 228 deletions

View file

@ -14,89 +14,25 @@ let set_headers = Config.Env.set_headers
let n_errors = Atomic.make 0 let n_errors = Atomic.make 0
let n_dropped = Atomic.make 0 let n_bytes_sent : int Atomic.t = Atomic.make 0
(** Something to be sent via HTTP *)
module To_send = struct
open Opentelemetry.Proto
type t =
| Send_metric of Metrics.resource_metrics list
| Send_trace of Trace.resource_spans list
| Send_logs of Logs.resource_logs list
end
(** start a thread in the background, running [f()] *)
let start_bg_thread (f : unit -> unit) : Thread.t =
let unix_run () =
let signals =
[
Sys.sigusr1;
Sys.sigusr2;
Sys.sigterm;
Sys.sigpipe;
Sys.sigalrm;
Sys.sigstop;
]
in
ignore (Thread.sigmask Unix.SIG_BLOCK signals : _ list);
f ()
in
(* no signals on Windows *)
let run () =
if Sys.win32 then
f ()
else
unix_run ()
in
Thread.create run ()
let str_to_hex (s : string) : string =
Opentelemetry_util.Util_bytes_.bytes_to_hex (Bytes.unsafe_of_string s)
module Exporter_impl : sig
val n_bytes_sent : int Atomic.t
class type t = object
inherit OTEL.Exporter.t
method shutdown : on_done:(unit -> unit) -> unit -> unit
end
val create : stop:bool Atomic.t -> config:Config.t -> unit -> t
val shutdown : t -> on_done:(unit -> unit) -> unit
end = struct
open Opentelemetry.Proto
let n_bytes_sent : int Atomic.t = Atomic.make 0
class type t = object
inherit OTEL.Exporter.t
method shutdown : on_done:(unit -> unit) -> unit -> unit
end
module Consumer_impl = struct
type state = { type state = {
bq: Any_resource.t Bounded_queue.t; (** Queue of incoming workload *)
stop: bool Atomic.t; stop: bool Atomic.t;
cleaned: bool Atomic.t; (** True when we cleaned up after closing *)
config: Config.t; config: Config.t;
encoder_pool: Pbrt.Encoder.t Rpool.t; mutable send_threads: Thread.t array;
send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *) (** Threads that send data via http *)
traces: Proto.Trace.span Batch.t; cleaned: bool Atomic.t; (** True when we cleaned up after closing *)
logs: Proto.Logs.log_record Batch.t; mcond: Util_thread.MCond.t; (** how to wait for the queue *)
metrics: Proto.Metrics.metric Batch.t;
mutable send_threads: Thread.t array; (** Threads that send data via http *)
} }
let send_batch_ (self : state) ~force ~mk_to_send (b : _ Batch.t) : unit = let shutdown self : unit =
match Batch.pop_if_ready ~force ~now:(Mtime_clock.now ()) b with Atomic.set self.stop true;
| None -> () (* wakeup sleepers *)
| Some l -> Util_thread.MCond.signal self.mcond
let to_send = mk_to_send l in
Sync_queue.push self.send_q to_send
let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit = let send_http_ (self : state) (client : Curl.t) ~url (data : string) : unit =
let@ _sc = let@ _sc =
Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http" Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http"
in in
@ -105,12 +41,12 @@ end = struct
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
(String.length data); (String.length data);
let headers = let headers =
("Content-Type", "application/x-protobuf") :: config.common.headers ("Content-Type", "application/x-protobuf") :: self.config.common.headers
in in
match match
let@ _sc = let@ _sc =
Self_trace.with_ ~kind:Span_kind_internal "curl.post" Self_trace.with_ ~kind:Span_kind_internal "curl.post"
~attrs:[ "sz", `Int (String.length data); "url", `String url ] ~attrs:[ "size", `Int (String.length data); "url", `String url ]
in in
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
with with
@ -123,22 +59,13 @@ end = struct
@@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ]; @@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ];
if Config.Env.get_debug () then ( if Config.Env.get_debug () then (
let dec = Pbrt.Decoder.of_string body in let err = Export_error.decode_invalid_http_response ~url ~code body in
let body = Export_error.report_err err;
try ()
let status = Status.decode_pb_status dec in )
Format.asprintf "%a" Status.pp_status status
with _ ->
spf "(could not decode status)\nraw bytes: %s" (str_to_hex body)
in
Printf.eprintf
"opentelemetry: error while sending data to %s:\n code=%d\n %s\n%!"
url code body
);
()
| exception Sys.Break -> | exception Sys.Break ->
Printf.eprintf "ctrl-c captured, stopping\n%!"; Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set stop true shutdown self
| Error (code, msg) -> | Error (code, msg) ->
(* TODO: log error _via_ otel? *) (* TODO: log error _via_ otel? *)
Atomic.incr n_errors; Atomic.incr n_errors;
@ -150,182 +77,117 @@ end = struct
(* avoid crazy error loop *) (* avoid crazy error loop *)
Thread.delay 3. Thread.delay 3.
(** Thread that, in a loop, reads from [q] to get the next message to send via (** The main loop of a thread that, reads from [bq] to get the next message to
http *) send via http *)
let bg_thread_loop (self : state) : unit = let bg_thread_loop (self : state) : unit =
Ezcurl.with_client ?set_opts:None @@ fun client -> Ezcurl.with_client ?set_opts:None @@ fun client ->
let config = self.config in (* we need exactly one encoder per thread *)
let stop = self.stop in let encoder = Pbrt.Encoder.create ~size:2048 () in
let send ~name ~url ~conv (signals : _ list) =
let send ~name ~url ~conv (signals : _ list) : unit =
let@ _sp = let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer name Self_trace.with_ ~kind:Span_kind_producer name
~attrs:[ "n", `Int (List.length signals) ] ~attrs:[ "n", `Int (List.length signals) ]
in in
let msg : string =
(* borrow encoder from buffer pool and turn [signals] into bytes *) let msg : string = conv ?encoder:(Some encoder) signals in
let@ encoder = Rpool.with_resource self.encoder_pool in Pbrt.Encoder.reset encoder;
conv ?encoder:(Some encoder) signals
in
ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int);
send_http_ ~stop ~config ~url client msg send_http_ self client msg ~url;
()
in in
try while not (Atomic.get self.stop) do
while not (Atomic.get stop) do match Bounded_queue.try_pop self.bq with
let msg = Sync_queue.pop self.send_q in | `Closed -> shutdown self
match msg with | `Empty -> Util_thread.MCond.wait self.mcond
| To_send.Send_trace tr -> | `Item (Any_resource.R_spans tr) ->
send ~name:"send-traces" ~conv:Signal.Encode.traces send ~name:"send-traces" ~conv:Signal.Encode.traces
~url:config.common.url_traces tr ~url:self.config.common.url_traces tr
| To_send.Send_metric ms -> | `Item (Any_resource.R_metrics ms) ->
send ~name:"send-metrics" ~conv:Signal.Encode.metrics send ~name:"send-metrics" ~conv:Signal.Encode.metrics
~url:config.common.url_metrics ms ~url:self.config.common.url_metrics ms
| To_send.Send_logs logs -> | `Item (Any_resource.R_logs logs) ->
send ~name:"send-logs" ~conv:Signal.Encode.logs send ~name:"send-logs" ~conv:Signal.Encode.logs
~url:config.common.url_logs logs ~url:self.config.common.url_logs logs
done done
with Sync_queue.Closed -> ()
let batch_max_size_ = 200 let to_consumer (self : state) : _ Consumer.t =
let active () = not (Atomic.get self.stop) in
let batch_timeout_ = Mtime.Span.(20 * s) let tick () =
(* make sure to poll from time to time *)
let create_state ~stop ~config () : state = Util_thread.MCond.signal self.mcond
let n_send_threads = max 2 config.Config.bg_threads in
let encoder_pool =
Rpool.create
~mk_item:(fun () -> Pbrt.Encoder.create ~size:1024 ())
~max_size:32 ~clear:Pbrt.Encoder.reset ()
in in
let shutdown ~on_done =
shutdown self;
on_done ()
in
{ tick; active; shutdown }
let create_state ~stop ~(config : Config.t) ~q () : state =
let n_send_threads = min 100 @@ max 2 config.bg_threads in
let self = let self =
{ {
stop; stop;
config; config;
send_threads = [||]; send_threads = [||];
send_q = Sync_queue.create (); bq = q;
encoder_pool;
cleaned = Atomic.make false; cleaned = Atomic.make false;
traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); mcond = Util_thread.MCond.create ();
logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
metrics = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
} }
in in
Util_thread.MCond.wakeup_from_bq self.mcond q;
self.send_threads <- self.send_threads <-
Array.init n_send_threads (fun _i -> Array.init n_send_threads (fun _i ->
start_bg_thread (fun () -> bg_thread_loop self)); Util_thread.start_bg_thread (fun () -> bg_thread_loop self));
self self
let maybe_send_metrics ~force (self : state) = let create ~stop ~config () : Consumer.any_resource_builder =
send_batch_ self ~force self.metrics ~mk_to_send:(fun metrics -> {
let metrics = start_consuming =
Opentelemetry_client.Util_resources.make_resource_metrics metrics (fun q ->
in let st = create_state ~stop ~config ~q () in
To_send.Send_metric [ metrics ]) to_consumer st);
}
let maybe_send_logs ~force (self : state) =
send_batch_ self ~force self.logs ~mk_to_send:(fun logs ->
let logs =
Opentelemetry_client.Util_resources.make_resource_logs logs
in
To_send.Send_logs [ logs ])
let maybe_send_traces ~force (self : state) =
send_batch_ self ~force self.traces ~mk_to_send:(fun spans ->
let traces =
Opentelemetry_client.Util_resources.make_resource_spans spans
in
To_send.Send_trace [ traces ])
let[@inline] push_to_batch b e =
if e <> [] then (
match Batch.push b e with
| `Ok -> ()
| `Dropped -> Atomic.incr n_dropped
)
let create ~stop ~config () : #t =
let open Opentelemetry_util in
let st = create_state ~stop ~config () in
let ticker = Cb_set.create () in
object (self : #t)
method send_trace spans =
push_to_batch st.traces spans;
maybe_send_traces st ~force:false
method send_metrics m =
push_to_batch st.metrics m;
maybe_send_metrics st ~force:false
method send_logs m =
push_to_batch st.logs m;
maybe_send_logs st ~force:false
method add_on_tick_callback cb = Cb_set.register ticker cb
method tick () = Cb_set.trigger ticker
method cleanup ~on_done () : unit =
if not (Atomic.exchange st.cleaned true) then (
(* flush all signals *)
maybe_send_logs ~force:true st;
maybe_send_metrics ~force:true st;
maybe_send_traces ~force:true st;
(* close send queues, then wait for all threads *)
Sync_queue.close st.send_q;
Array.iter Thread.join st.send_threads
);
on_done ()
method shutdown ~on_done () =
Atomic.set st.stop true;
self#cleanup ~on_done ()
end
let shutdown (self : #t) ~on_done : unit = self#shutdown ~on_done ()
end end
let create_exporter ?(stop = Atomic.make false) let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () :
?(config : Config.t = Config.make ()) () : #OTEL.Exporter.t = Opentelemetry_client.Consumer.any_resource_builder =
let backend = Exporter_impl.create ~stop ~config () in Consumer_impl.create ~stop ~config ()
(backend :> OTEL.Exporter.t)
(** thread that calls [tick()] regularly, to help enforce timeouts *) let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t =
let setup_ticker_thread ~stop ~sleep_ms (exp : #OTEL.Exporter.t) () = let consumer = consumer ?stop ~config () in
let sleep_s = float sleep_ms /. 1000. in let bq =
let tick_loop () = Bounded_queue_sync.create
try ~high_watermark:Bounded_queue.Defaults.high_watermark ()
while not @@ Atomic.get stop do
Thread.delay sleep_s;
exp#tick ()
done
with
| Sync_queue.Closed -> ()
| exn ->
(* print and ignore *)
Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!"
(Printexc.to_string exn)
in in
start_bg_thread tick_loop
Exporter_queued.create ~q:bq ~consumer ()
|> Exporter_add_batching.add_batching ~config:config.common
let create_backend = create_exporter
let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
: unit = : unit =
let exporter = Exporter_impl.create ~stop ~config () in let exporter = create_exporter ~stop ~config () in
OTEL.Exporter.Main_exporter.set exporter; OTEL.Main_exporter.set exporter;
Self_trace.set_enabled config.common.self_trace; Self_trace.set_enabled config.common.self_trace;
if config.ticker_thread then ( if config.ticker_thread then (
(* at most a minute *) (* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t) ignore
(Util_thread.setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t)
) )
let remove_backend () : unit = let remove_backend () : unit =
(* we don't need the callback, this runs in the same thread *) (* we don't need the callback, this runs in the same thread *)
OTEL.Exporter.Main_exporter.remove () ~on_done:ignore OTEL.Main_exporter.remove () ~on_done:ignore
let setup ?stop ?config ?(enable = true) () = let setup ?stop ?config ?(enable = true) () =
if enable then setup_ ?stop ?config () if enable then setup_ ?stop ?config ()
@ -337,4 +199,4 @@ let with_setup ?stop ?config ?(enable = true) () f =
) else ) else
f () f ()
let[@inline] n_bytes_sent () = Atomic.get Exporter_impl.n_bytes_sent let[@inline] n_bytes_sent () = Atomic.get n_bytes_sent

View file

@ -4,7 +4,6 @@
*) *)
open Opentelemetry_atomic open Opentelemetry_atomic
open Opentelemetry_util
val get_headers : unit -> (string * string) list val get_headers : unit -> (string * string) list
@ -16,9 +15,20 @@ module Config = Config
val n_bytes_sent : unit -> int val n_bytes_sent : unit -> int
(** Global counter of bytes sent (or attempted to be sent) *) (** Global counter of bytes sent (or attempted to be sent) *)
val consumer :
?stop:bool Atomic.t ->
?config:Config.t ->
unit ->
Opentelemetry_client.Consumer.any_resource_builder
(** Consumer that pulls from a queue *)
val create_exporter : val create_exporter :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
val create_backend :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
[@@deprecated "use create_exporter"]
val setup : val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.