From 6f96d5271ac225f8a556b9255fa92eec2b8050b5 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 3 Dec 2025 15:09:10 -0500 Subject: [PATCH] feat client-ocurl: use common batch and queue; remove layer of queueing now we modify batches on the fly when we send signals; but there still is a thread pool to send signals via HTTP. --- src/client-ocurl/b_queue.ml | 59 --- src/client-ocurl/b_queue.mli | 23 - src/client-ocurl/batch.ml | 24 -- src/client-ocurl/batch.mli | 14 - src/client-ocurl/common_.ml | 4 +- src/client-ocurl/config.ml | 9 +- src/client-ocurl/config.mli | 6 +- .../opentelemetry_client_ocurl.ml | 401 ++++++------------ .../opentelemetry_client_ocurl.mli | 11 +- 9 files changed, 141 insertions(+), 410 deletions(-) delete mode 100644 src/client-ocurl/b_queue.ml delete mode 100644 src/client-ocurl/b_queue.mli delete mode 100644 src/client-ocurl/batch.ml delete mode 100644 src/client-ocurl/batch.mli diff --git a/src/client-ocurl/b_queue.ml b/src/client-ocurl/b_queue.ml deleted file mode 100644 index 98f43876..00000000 --- a/src/client-ocurl/b_queue.ml +++ /dev/null @@ -1,59 +0,0 @@ -open Opentelemetry.Util_mutex - -type 'a t = { - mutex: Mutex.t; - cond: Condition.t; - q: 'a Queue.t; - mutable closed: bool; -} - -exception Closed - -let create () : _ t = - { - mutex = Mutex.create (); - cond = Condition.create (); - q = Queue.create (); - closed = false; - } - -let close (self : _ t) = - protect self.mutex @@ fun () -> - if not self.closed then ( - self.closed <- true; - Condition.broadcast self.cond (* awake waiters so they fail *) - ) - -let push (self : _ t) x : unit = - protect self.mutex @@ fun () -> - if self.closed then - raise Closed - else ( - Queue.push x self.q; - Condition.signal self.cond - ) - -let pop (self : 'a t) : 'a = - let rec loop () = - if self.closed then - raise Closed - else if Queue.is_empty self.q then ( - Condition.wait self.cond self.mutex; - (loop [@tailcall]) () - ) else ( - let x = Queue.pop self.q in - x - ) - in - protect self.mutex loop - -let pop_all (self : 'a t) into : unit = - let rec loop () = - if Queue.is_empty self.q then ( - if self.closed then raise Closed; - Condition.wait self.cond self.mutex; - (loop [@tailcall]) () - ) else - Queue.transfer self.q into - in - protect self.mutex loop diff --git a/src/client-ocurl/b_queue.mli b/src/client-ocurl/b_queue.mli deleted file mode 100644 index d020dfb3..00000000 --- a/src/client-ocurl/b_queue.mli +++ /dev/null @@ -1,23 +0,0 @@ -(** Basic Blocking Queue *) - -type 'a t - -val create : unit -> _ t - -exception Closed - -val push : 'a t -> 'a -> unit -(** [push q x] pushes [x] into [q], and returns [()]. - @raise Closed if [close q] was previously called.*) - -val pop : 'a t -> 'a -(** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) - -val pop_all : 'a t -> 'a Queue.t -> unit -(** [pop_all q into] pops all the elements of [q] and moves them into [into]. It - might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) - -val close : _ t -> unit -(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/client-ocurl/batch.ml b/src/client-ocurl/batch.ml deleted file mode 100644 index 0be8b1b0..00000000 --- a/src/client-ocurl/batch.ml +++ /dev/null @@ -1,24 +0,0 @@ -type 'a t = { - mutable len: int; - mutable l: 'a list list; - mutable started: Mtime.t; -} - -let create () = { len = 0; l = []; started = Mtime_clock.now () } - -let push self l = - if l != [] then ( - if self.l == [] then self.started <- Mtime_clock.now (); - self.l <- l :: self.l; - self.len <- self.len + List.length l - ) - -let[@inline] len self = self.len - -let[@inline] time_started self = self.started - -let pop_all self = - let l = self.l in - self.l <- []; - self.len <- 0; - l diff --git a/src/client-ocurl/batch.mli b/src/client-ocurl/batch.mli deleted file mode 100644 index 2b867b88..00000000 --- a/src/client-ocurl/batch.mli +++ /dev/null @@ -1,14 +0,0 @@ -(** List of lists with length *) - -type 'a t - -val create : unit -> 'a t - -val push : 'a t -> 'a list -> unit - -val len : _ t -> int - -val time_started : _ t -> Mtime.t -(** Time at which the batch most recently became non-empty *) - -val pop_all : 'a t -> 'a list list diff --git a/src/client-ocurl/common_.ml b/src/client-ocurl/common_.ml index 10df0c1d..1ec6de25 100644 --- a/src/client-ocurl/common_.ml +++ b/src/client-ocurl/common_.ml @@ -1,8 +1,8 @@ module Atomic = Opentelemetry_atomic.Atomic -include Opentelemetry.Lock +module Proto = Opentelemetry_proto let spf = Printf.sprintf let ( let@ ) = ( @@ ) -let tid () = Thread.id @@ Thread.self () +let[@inline] tid () = Thread.id @@ Thread.self () diff --git a/src/client-ocurl/config.ml b/src/client-ocurl/config.ml index 0954fbe6..e06ebf7e 100644 --- a/src/client-ocurl/config.ml +++ b/src/client-ocurl/config.ml @@ -1,3 +1,5 @@ +open Opentelemetry_client + type t = { bg_threads: int; (** Are there background threads, and how many? Default [4]. This will be @@ -10,7 +12,7 @@ type t = { [ticker_thread] is [true]. This will be clamped between [2 ms] and some longer interval (maximum [60s] currently). Default 500. @since 0.7 *) - common: Opentelemetry_client.Config.t; + common: Client_config.t; (** Common configuration options @since 0.12*) } @@ -20,10 +22,9 @@ let pp out self = Format.fprintf out "{@[ bg_threads=%d;@ ticker_thread=%B;@ ticker_interval_ms=%d;@ common=%a \ @]}" - bg_threads ticker_thread ticker_interval_ms Opentelemetry_client.Config.pp - common + bg_threads ticker_thread ticker_interval_ms Client_config.pp common -module Env = Opentelemetry_client.Config.Env () +module Env = Client_config.Env () let make = Env.make diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 514ecb3e..7726de12 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -12,7 +12,7 @@ type t = { [ticker_thread] is [true]. This will be clamped between [2 ms] and some longer interval (maximum [60s] currently). Default 500. @since 0.7 *) - common: Opentelemetry_client.Config.t; + common: Opentelemetry_client.Client_config.t; (** Common configuration options @since 0.12*) } @@ -29,7 +29,7 @@ val make : ?ticker_interval_ms:int -> unit -> t) - Opentelemetry_client.Config.make + Opentelemetry_client.Client_config.make (** Make a configuration {!t}. *) -module Env : Opentelemetry_client.Config.ENV +module Env : Opentelemetry_client.Client_config.ENV diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index a58f78b7..507bc845 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -3,70 +3,27 @@ https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md *) +open Opentelemetry_client +open Common_ module OT = Opentelemetry module Config = Config -module Self_trace = Opentelemetry_client.Self_trace -module Signal = Opentelemetry_client.Signal -open Opentelemetry -include Common_ let get_headers = Config.Env.get_headers let set_headers = Config.Env.set_headers -let needs_gc_metrics = Atomic.make false - -let last_gc_metrics = Atomic.make (Mtime_clock.now ()) - -let timeout_gc_metrics = Mtime.Span.(20 * s) - -(** side channel for GC, appended to metrics batch data *) -let gc_metrics = AList.make () - -(** capture current GC metrics if {!needs_gc_metrics} is true or it has been a - long time since the last GC metrics collection, and push them into - {!gc_metrics} for later collection *) -let sample_gc_metrics_if_needed () = - let now = Mtime_clock.now () in - let alarm = Atomic.exchange needs_gc_metrics false in - let timeout () = - let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in - Mtime.Span.compare elapsed timeout_gc_metrics > 0 - in - if alarm || timeout () then ( - Atomic.set last_gc_metrics now; - let l = - OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics () - in - AList.add gc_metrics l - ) - let n_errors = Atomic.make 0 let n_dropped = Atomic.make 0 -(** Something sent to the collector *) -module Event = struct - open Opentelemetry.Proto - - type t = - | E_metric of Metrics.resource_metrics list - | E_trace of Trace.resource_spans list - | E_logs of Logs.resource_logs list - | E_tick - | E_flush_all (** Flush all batches *) -end - (** Something to be sent via HTTP *) module To_send = struct open Opentelemetry.Proto type t = - | Send_metric of Metrics.resource_metrics list list - | Send_trace of Trace.resource_spans list list - | Send_logs of Logs.resource_logs list list + | Send_metric of Metrics.resource_metrics list + | Send_trace of Trace.resource_spans list + | Send_logs of Logs.resource_logs list end (** start a thread in the background, running [f()] *) @@ -110,32 +67,50 @@ let str_to_hex (s : string) : string = done; Bytes.unsafe_to_string res -module Backend_impl : sig - type t +module Exporter_impl : sig + val n_bytes_sent : int Atomic.t + + class type t = object + inherit OT.Exporter.t + + method shutdown : on_done:(unit -> unit) -> unit -> unit + end val create : stop:bool Atomic.t -> config:Config.t -> unit -> t - val send_event : t -> Event.t -> unit - - val n_bytes_sent : unit -> int - val shutdown : t -> on_done:(unit -> unit) -> unit end = struct open Opentelemetry.Proto - type t = { + let n_bytes_sent : int Atomic.t = Atomic.make 0 + + class type t = object + inherit OT.Exporter.t + + method shutdown : on_done:(unit -> unit) -> unit -> unit + end + + type state = { stop: bool Atomic.t; cleaned: bool Atomic.t; (** True when we cleaned up after closing *) config: Config.t; - q: Event.t B_queue.t; (** Queue to receive data from the user's code *) - mutable main_th: Thread.t option; (** Thread that listens on [q] *) - send_q: To_send.t B_queue.t; (** Queue for the send worker threads *) + send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *) + traces: Proto.Trace.span Batch.t; + logs: Proto.Logs.log_record Batch.t; + metrics: Proto.Metrics.metric Batch.t; mutable send_threads: Thread.t array; (** Threads that send data via http *) } + let send_batch_ (self : state) ~force ~mk_to_send (b : _ Batch.t) : unit = + match Batch.pop_if_ready ~force ~now:(Mtime_clock.now ()) b with + | None -> () + | Some l -> + let to_send = mk_to_send l in + Sync_queue.push self.send_q to_send + let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit = let@ _sc = - Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" + Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http" in if Config.Env.get_debug () then @@ -146,7 +121,7 @@ end = struct in match let@ _sc = - Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post" + Self_trace.with_ ~kind:Span_kind_internal "curl.post" ~attrs:[ "sz", `Int (String.length data); "url", `String url ] in Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () @@ -187,31 +162,24 @@ end = struct (* avoid crazy error loop *) Thread.delay 3. - let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev - - let n_bytes_sent_ = Atomic.make 0 - - let[@inline] n_bytes_sent () = Atomic.get n_bytes_sent_ - (** Thread that, in a loop, reads from [q] to get the next message to send via http *) - let bg_thread_loop (self : t) : unit = + let bg_thread_loop (self : state) : unit = Ezcurl.with_client ?set_opts:None @@ fun client -> let config = self.config in let stop = self.stop in - let send ~name ~url ~conv signals = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] signals in + let send ~name ~url ~conv (signals : _ list) = let@ _sp = Self_trace.with_ ~kind:Span_kind_producer name - ~attrs:[ "n", `Int (List.length l) ] + ~attrs:[ "n", `Int (List.length signals) ] in - let msg = conv l in - ignore (Atomic.fetch_and_add n_bytes_sent_ (String.length msg) : int); + let msg = conv signals in + ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); send_http_ ~stop ~config ~url client msg in try while not (Atomic.get stop) do - let msg = B_queue.pop self.send_q in + let msg = Sync_queue.pop self.send_q in match msg with | To_send.Send_trace tr -> send ~name:"send-traces" ~conv:Signal.Encode.traces @@ -223,252 +191,135 @@ end = struct send ~name:"send-logs" ~conv:Signal.Encode.logs ~url:config.common.url_logs logs done - with B_queue.Closed -> () - - type batches = { - traces: Proto.Trace.resource_spans Batch.t; - logs: Proto.Logs.resource_logs Batch.t; - metrics: Proto.Metrics.resource_metrics Batch.t; - } + with Sync_queue.Closed -> () let batch_max_size_ = 200 - let should_send_batch_ ?(side = []) ~config ~now (b : _ Batch.t) : bool = - (Batch.len b > 0 || side != []) - && (Batch.len b >= batch_max_size_ - || - let timeout = Mtime.Span.(config.Config.common.batch_timeout_ms * ms) in - let elapsed = Mtime.span now (Batch.time_started b) in - Mtime.Span.compare elapsed timeout >= 0) + let batch_timeout_ = Mtime.Span.(20 * s) - let main_thread_loop (self : t) : unit = - let local_q = Queue.create () in - let config = self.config in - - (* keep track of batches *) - let batches = - { - traces = Batch.create (); - logs = Batch.create (); - metrics = Batch.create (); - } - in - - let send_metrics () = - let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in - B_queue.push self.send_q (To_send.Send_metric metrics) - in - - let send_logs () = - B_queue.push self.send_q (To_send.Send_logs (Batch.pop_all batches.logs)) - in - - let send_traces () = - B_queue.push self.send_q - (To_send.Send_trace (Batch.pop_all batches.traces)) - in - - try - while not (Atomic.get self.stop) do - (* read multiple events at once *) - B_queue.pop_all self.q local_q; - - (* are we asked to flush all events? *) - let must_flush_all = ref false in - - (* how to process a single event *) - let process_ev (ev : Event.t) : unit = - match ev with - | Event.E_metric m -> Batch.push batches.metrics m - | Event.E_trace tr -> Batch.push batches.traces tr - | Event.E_logs logs -> Batch.push batches.logs logs - | Event.E_tick -> - (* the only impact of "tick" is that it wakes us up regularly *) - () - | Event.E_flush_all -> must_flush_all := true - in - - Queue.iter process_ev local_q; - Queue.clear local_q; - - if !must_flush_all then ( - if Batch.len batches.metrics > 0 || not (AList.is_empty gc_metrics) - then - send_metrics (); - if Batch.len batches.logs > 0 then send_logs (); - if Batch.len batches.traces > 0 then send_traces () - ) else ( - let now = Mtime_clock.now () in - if - should_send_batch_ ~config ~now batches.metrics - ~side:(AList.get gc_metrics) - then - send_metrics (); - - if should_send_batch_ ~config ~now batches.traces then send_traces (); - if should_send_batch_ ~config ~now batches.logs then send_logs () - ) - done - with B_queue.Closed -> () - - let create ~stop ~config () : t = + let create_state ~stop ~config () : state = let n_send_threads = max 2 config.Config.bg_threads in let self = { stop; config; - q = B_queue.create (); send_threads = [||]; - send_q = B_queue.create (); + send_q = Sync_queue.create (); cleaned = Atomic.make false; - main_th = None; + traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); + logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); + metrics = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); } in - let main_th = start_bg_thread (fun () -> main_thread_loop self) in - self.main_th <- Some main_th; - self.send_threads <- Array.init n_send_threads (fun _i -> start_bg_thread (fun () -> bg_thread_loop self)); self - let shutdown self ~on_done : unit = - Atomic.set self.stop true; - if not (Atomic.exchange self.cleaned true) then ( - (* empty batches *) - send_event self Event.E_flush_all; - (* close the incoming queue, wait for the thread to finish - before we start cutting off the background threads, so that they - have time to receive the final batches *) - B_queue.close self.q; - Option.iter Thread.join self.main_th; - (* close send queues, then wait for all threads *) - B_queue.close self.send_q; - Array.iter Thread.join self.send_threads - ); - on_done () + let maybe_send_metrics ~force (self : state) = + send_batch_ self ~force self.metrics ~mk_to_send:(fun metrics -> + let metrics = + Opentelemetry_client.Util_resources.make_resource_metrics metrics + in + To_send.Send_metric [ metrics ]) + + let maybe_send_logs ~force (self : state) = + send_batch_ self ~force self.logs ~mk_to_send:(fun logs -> + let logs = + Opentelemetry_client.Util_resources.make_resource_logs logs + in + To_send.Send_logs [ logs ]) + + let maybe_send_traces ~force (self : state) = + send_batch_ self ~force self.traces ~mk_to_send:(fun spans -> + let traces = + Opentelemetry_client.Util_resources.make_resource_spans spans + in + To_send.Send_trace [ traces ]) + + let create ~stop ~config () : #t = + let open Opentelemetry_util in + let st = create_state ~stop ~config () in + let ticker = Cb_set.create () in + object (self : #t) + method send_trace spans = + Batch.push' st.traces spans; + maybe_send_traces st ~force:false + + method send_metrics m = + Batch.push' st.metrics m; + maybe_send_metrics st ~force:false + + method send_logs m = + Batch.push' st.logs m; + maybe_send_logs st ~force:false + + method add_on_tick_callback cb = Cb_set.register ticker cb + + method tick () = Cb_set.trigger ticker + + method cleanup ~on_done () : unit = + if not (Atomic.exchange st.cleaned true) then ( + (* flush all signals *) + maybe_send_logs ~force:true st; + maybe_send_metrics ~force:true st; + maybe_send_traces ~force:true st; + + (* close send queues, then wait for all threads *) + Sync_queue.close st.send_q; + Array.iter Thread.join st.send_threads + ); + on_done () + + method shutdown ~on_done () = + Atomic.set st.stop true; + self#cleanup ~on_done () + end + + let shutdown (self : #t) ~on_done : unit = self#shutdown ~on_done () end -let create_backend ?(stop = Atomic.make false) - ?(config : Config.t = Config.make ()) () : (module Collector.BACKEND) = - let module M = struct - open Opentelemetry.Proto - open Opentelemetry.Collector - - let backend = Backend_impl.create ~stop ~config () - - let send_trace : Trace.resource_spans list sender = - { - send = - (fun l ~ret -> - Backend_impl.send_event backend (Event.E_trace l); - ret ()); - } - - let last_sent_metrics = Atomic.make (Mtime_clock.now ()) - - (* send metrics from time to time *) - let timeout_sent_metrics = Mtime.Span.(5 * s) - - let signal_emit_gc_metrics () = - if config.common.debug then - Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true - - let additional_metrics () : Metrics.resource_metrics list = - (* add exporter metrics to the lot? *) - let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now () in - let add_own_metrics = - let elapsed = Mtime.span last_emit now in - Mtime.Span.compare elapsed timeout_sent_metrics > 0 - in - - (* there is a possible race condition here, as several threads might update - metrics at the same time. But that's harmless. *) - if add_own_metrics then ( - Atomic.set last_sent_metrics now; - let open OT.Metrics in - let now_unix = OT.Timestamp_ns.now_unix_ns () in - [ - make_resource_metrics - [ - sum ~name:"otel.export.dropped" ~is_monotonic:true - [ - int ~start_time_unix_nano:now_unix ~now:now_unix - (Atomic.get n_dropped); - ]; - sum ~name:"otel.export.errors" ~is_monotonic:true - [ - int ~start_time_unix_nano:now_unix ~now:now_unix - (Atomic.get n_errors); - ]; - ]; - ] - ) else - [] - - let send_metrics : Metrics.resource_metrics list sender = - { - send = - (fun m ~ret -> - let m = List.rev_append (additional_metrics ()) m in - Backend_impl.send_event backend (Event.E_metric m); - ret ()); - } - - let send_logs : Logs.resource_logs list sender = - { - send = - (fun m ~ret -> - Backend_impl.send_event backend (Event.E_logs m); - ret ()); - } - - let on_tick_cbs_ = Atomic.make (AList.make ()) - - let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - - let tick () = - sample_gc_metrics_if_needed (); - Backend_impl.send_event backend Event.E_tick; - List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_) - - let cleanup ~on_done () = Backend_impl.shutdown backend ~on_done - end in - (module M) +let create_exporter ?(stop = Atomic.make false) + ?(config : Config.t = Config.make ()) () : #OT.Exporter.t = + let backend = Exporter_impl.create ~stop ~config () in + (backend :> OT.Exporter.t) (** thread that calls [tick()] regularly, to help enforce timeouts *) -let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () = +let setup_ticker_thread ~stop ~sleep_ms (exp : #OT.Exporter.t) () = let sleep_s = float sleep_ms /. 1000. in let tick_loop () = try while not @@ Atomic.get stop do Thread.delay sleep_s; - B.tick () + exp#tick () done - with B_queue.Closed -> () + with + | Sync_queue.Closed -> () + | exn -> + (* print and ignore *) + Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!" + (Printexc.to_string exn) in start_bg_thread tick_loop let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () : unit = - let backend = create_backend ~stop ~config () in - Opentelemetry.Collector.set_backend backend; + let exporter = Exporter_impl.create ~stop ~config () in + OT.Exporter.Main_exporter.set exporter; Self_trace.set_enabled config.common.self_trace; if config.ticker_thread then ( (* at most a minute *) let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in - ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) + ignore (setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t) ) let remove_backend () : unit = (* we don't need the callback, this runs in the same thread *) - OT.Collector.remove_backend () ~on_done:ignore + OT.Exporter.Main_exporter.remove () ~on_done:ignore let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () @@ -480,4 +331,4 @@ let with_setup ?stop ?config ?(enable = true) () f = ) else f () -let n_bytes_sent = Backend_impl.n_bytes_sent +let[@inline] n_bytes_sent () = Atomic.get Exporter_impl.n_bytes_sent diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index 6d3918dc..77b8ea34 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -3,22 +3,21 @@ https://opentelemetry.io/docs/reference/specification/protocol/exporter/ *) +open Opentelemetry_atomic +open Opentelemetry_util + val get_headers : unit -> (string * string) list val set_headers : (string * string) list -> unit (** Set http headers that are sent on every http query to the collector. *) -module Atomic = Opentelemetry_atomic.Atomic module Config = Config val n_bytes_sent : unit -> int (** Global counter of bytes sent (or attempted to be sent) *) -val create_backend : - ?stop:bool Atomic.t -> - ?config:Config.t -> - unit -> - (module Opentelemetry.Collector.BACKEND) +val create_exporter : + ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit