This commit is contained in:
Simon Cruanes 2025-12-07 22:12:07 -05:00
parent 5daef6873b
commit 9f5506c1ee
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -3,10 +3,10 @@
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
*)
open Opentelemetry_client
module Config = Config
module OTELC = Opentelemetry_client
open Common_
module OTEL = Opentelemetry
module Config = Config
let get_headers = Config.Env.get_headers
@ -14,21 +14,22 @@ let set_headers = Config.Env.set_headers
let n_bytes_sent : int Atomic.t = Atomic.make 0
type error = Export_error.t
type error = OTELC.Export_error.t
open struct
module Notifier = Notifier_sync
module Notifier = OTELC.Notifier_sync
module IO : Generic_io.S_WITH_CONCURRENCY with type 'a t = 'a = struct
include Generic_io.Direct_style
module IO : OTELC.Generic_io.S_WITH_CONCURRENCY with type 'a t = 'a = struct
include OTELC.Generic_io.Direct_style
let sleep_s = Thread.delay
let[@inline] spawn f = ignore (Util_thread.start_bg_thread f : Thread.t)
let[@inline] spawn f =
ignore (OTELC.Util_thread.start_bg_thread f : Thread.t)
end
end
module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct
module IO = IO
type t = Curl.t
@ -72,11 +73,13 @@ module Httpc : Generic_http_consumer.HTTPC with module IO = IO = struct
(`Failure
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))))
| Ok { code; body; _ } ->
let err = Export_error.decode_invalid_http_response ~url ~code body in
let err =
OTELC.Export_error.decode_invalid_http_response ~url ~code body
in
Error err
end
module Consumer_impl = Generic_http_consumer.Make (IO) (Notifier) (Httpc)
module Consumer_impl = OTELC.Generic_http_consumer.Make (IO) (Notifier) (Httpc)
let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () :
Opentelemetry_client.Consumer.any_resource_builder =
@ -93,12 +96,12 @@ let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () :
let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t =
let consumer = consumer ?stop ~config () in
let bq =
Bounded_queue_sync.create
~high_watermark:Bounded_queue.Defaults.high_watermark ()
OTELC.Bounded_queue_sync.create
~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark ()
in
Exporter_queued.create ~q:bq ~consumer ()
|> Exporter_add_batching.add_batching ~config:config.common
OTELC.Exporter_queued.create ~q:bq ~consumer ()
|> OTELC.Exporter_add_batching.add_batching ~config:config.common
let create_backend = create_exporter
@ -107,13 +110,14 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
let exporter = create_exporter ~stop ~config () in
OTEL.Main_exporter.set exporter;
Self_trace.set_enabled config.common.self_trace;
OTELC.Self_trace.set_enabled config.common.self_trace;
if config.ticker_thread then (
(* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore
(Util_thread.setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t)
(OTELC.Util_thread.setup_ticker_thread ~stop ~sleep_ms exporter ()
: Thread.t)
)
let remove_backend () : unit =