mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-07 18:37:56 -05:00
feat client-ocurl: use common batch and queue; remove layer of queueing
now we modify batches on the fly when we send signals; but there still is a thread pool to send signals via HTTP.
This commit is contained in:
parent
ced8dd421f
commit
6f96d5271a
9 changed files with 141 additions and 410 deletions
|
|
@ -1,59 +0,0 @@
|
|||
open Opentelemetry.Util_mutex
|
||||
|
||||
type 'a t = {
|
||||
mutex: Mutex.t;
|
||||
cond: Condition.t;
|
||||
q: 'a Queue.t;
|
||||
mutable closed: bool;
|
||||
}
|
||||
|
||||
exception Closed
|
||||
|
||||
let create () : _ t =
|
||||
{
|
||||
mutex = Mutex.create ();
|
||||
cond = Condition.create ();
|
||||
q = Queue.create ();
|
||||
closed = false;
|
||||
}
|
||||
|
||||
let close (self : _ t) =
|
||||
protect self.mutex @@ fun () ->
|
||||
if not self.closed then (
|
||||
self.closed <- true;
|
||||
Condition.broadcast self.cond (* awake waiters so they fail *)
|
||||
)
|
||||
|
||||
let push (self : _ t) x : unit =
|
||||
protect self.mutex @@ fun () ->
|
||||
if self.closed then
|
||||
raise Closed
|
||||
else (
|
||||
Queue.push x self.q;
|
||||
Condition.signal self.cond
|
||||
)
|
||||
|
||||
let pop (self : 'a t) : 'a =
|
||||
let rec loop () =
|
||||
if self.closed then
|
||||
raise Closed
|
||||
else if Queue.is_empty self.q then (
|
||||
Condition.wait self.cond self.mutex;
|
||||
(loop [@tailcall]) ()
|
||||
) else (
|
||||
let x = Queue.pop self.q in
|
||||
x
|
||||
)
|
||||
in
|
||||
protect self.mutex loop
|
||||
|
||||
let pop_all (self : 'a t) into : unit =
|
||||
let rec loop () =
|
||||
if Queue.is_empty self.q then (
|
||||
if self.closed then raise Closed;
|
||||
Condition.wait self.cond self.mutex;
|
||||
(loop [@tailcall]) ()
|
||||
) else
|
||||
Queue.transfer self.q into
|
||||
in
|
||||
protect self.mutex loop
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
(** Basic Blocking Queue *)
|
||||
|
||||
type 'a t
|
||||
|
||||
val create : unit -> _ t
|
||||
|
||||
exception Closed
|
||||
|
||||
val push : 'a t -> 'a -> unit
|
||||
(** [push q x] pushes [x] into [q], and returns [()].
|
||||
@raise Closed if [close q] was previously called.*)
|
||||
|
||||
val pop : 'a t -> 'a
|
||||
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
||||
@raise Closed if the queue was closed before a new element was available. *)
|
||||
|
||||
val pop_all : 'a t -> 'a Queue.t -> unit
|
||||
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. It
|
||||
might block until an element comes.
|
||||
@raise Closed if the queue was closed before a new element was available. *)
|
||||
|
||||
val close : _ t -> unit
|
||||
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
type 'a t = {
|
||||
mutable len: int;
|
||||
mutable l: 'a list list;
|
||||
mutable started: Mtime.t;
|
||||
}
|
||||
|
||||
let create () = { len = 0; l = []; started = Mtime_clock.now () }
|
||||
|
||||
let push self l =
|
||||
if l != [] then (
|
||||
if self.l == [] then self.started <- Mtime_clock.now ();
|
||||
self.l <- l :: self.l;
|
||||
self.len <- self.len + List.length l
|
||||
)
|
||||
|
||||
let[@inline] len self = self.len
|
||||
|
||||
let[@inline] time_started self = self.started
|
||||
|
||||
let pop_all self =
|
||||
let l = self.l in
|
||||
self.l <- [];
|
||||
self.len <- 0;
|
||||
l
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
(** List of lists with length *)
|
||||
|
||||
type 'a t
|
||||
|
||||
val create : unit -> 'a t
|
||||
|
||||
val push : 'a t -> 'a list -> unit
|
||||
|
||||
val len : _ t -> int
|
||||
|
||||
val time_started : _ t -> Mtime.t
|
||||
(** Time at which the batch most recently became non-empty *)
|
||||
|
||||
val pop_all : 'a t -> 'a list list
|
||||
|
|
@ -1,8 +1,8 @@
|
|||
module Atomic = Opentelemetry_atomic.Atomic
|
||||
include Opentelemetry.Lock
|
||||
module Proto = Opentelemetry_proto
|
||||
|
||||
let spf = Printf.sprintf
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
let tid () = Thread.id @@ Thread.self ()
|
||||
let[@inline] tid () = Thread.id @@ Thread.self ()
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
open Opentelemetry_client
|
||||
|
||||
type t = {
|
||||
bg_threads: int;
|
||||
(** Are there background threads, and how many? Default [4]. This will be
|
||||
|
|
@ -10,7 +12,7 @@ type t = {
|
|||
[ticker_thread] is [true]. This will be clamped between [2 ms] and
|
||||
some longer interval (maximum [60s] currently). Default 500.
|
||||
@since 0.7 *)
|
||||
common: Opentelemetry_client.Config.t;
|
||||
common: Client_config.t;
|
||||
(** Common configuration options
|
||||
@since 0.12*)
|
||||
}
|
||||
|
|
@ -20,10 +22,9 @@ let pp out self =
|
|||
Format.fprintf out
|
||||
"{@[ bg_threads=%d;@ ticker_thread=%B;@ ticker_interval_ms=%d;@ common=%a \
|
||||
@]}"
|
||||
bg_threads ticker_thread ticker_interval_ms Opentelemetry_client.Config.pp
|
||||
common
|
||||
bg_threads ticker_thread ticker_interval_ms Client_config.pp common
|
||||
|
||||
module Env = Opentelemetry_client.Config.Env ()
|
||||
module Env = Client_config.Env ()
|
||||
|
||||
let make =
|
||||
Env.make
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ type t = {
|
|||
[ticker_thread] is [true]. This will be clamped between [2 ms] and
|
||||
some longer interval (maximum [60s] currently). Default 500.
|
||||
@since 0.7 *)
|
||||
common: Opentelemetry_client.Config.t;
|
||||
common: Opentelemetry_client.Client_config.t;
|
||||
(** Common configuration options
|
||||
@since 0.12*)
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@ val make :
|
|||
?ticker_interval_ms:int ->
|
||||
unit ->
|
||||
t)
|
||||
Opentelemetry_client.Config.make
|
||||
Opentelemetry_client.Client_config.make
|
||||
(** Make a configuration {!t}. *)
|
||||
|
||||
module Env : Opentelemetry_client.Config.ENV
|
||||
module Env : Opentelemetry_client.Client_config.ENV
|
||||
|
|
|
|||
|
|
@ -3,70 +3,27 @@
|
|||
https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md
|
||||
*)
|
||||
|
||||
open Opentelemetry_client
|
||||
open Common_
|
||||
module OT = Opentelemetry
|
||||
module Config = Config
|
||||
module Self_trace = Opentelemetry_client.Self_trace
|
||||
module Signal = Opentelemetry_client.Signal
|
||||
open Opentelemetry
|
||||
include Common_
|
||||
|
||||
let get_headers = Config.Env.get_headers
|
||||
|
||||
let set_headers = Config.Env.set_headers
|
||||
|
||||
let needs_gc_metrics = Atomic.make false
|
||||
|
||||
let last_gc_metrics = Atomic.make (Mtime_clock.now ())
|
||||
|
||||
let timeout_gc_metrics = Mtime.Span.(20 * s)
|
||||
|
||||
(** side channel for GC, appended to metrics batch data *)
|
||||
let gc_metrics = AList.make ()
|
||||
|
||||
(** capture current GC metrics if {!needs_gc_metrics} is true or it has been a
|
||||
long time since the last GC metrics collection, and push them into
|
||||
{!gc_metrics} for later collection *)
|
||||
let sample_gc_metrics_if_needed () =
|
||||
let now = Mtime_clock.now () in
|
||||
let alarm = Atomic.exchange needs_gc_metrics false in
|
||||
let timeout () =
|
||||
let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in
|
||||
Mtime.Span.compare elapsed timeout_gc_metrics > 0
|
||||
in
|
||||
if alarm || timeout () then (
|
||||
Atomic.set last_gc_metrics now;
|
||||
let l =
|
||||
OT.Metrics.make_resource_metrics
|
||||
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
|
||||
@@ Opentelemetry.GC_metrics.get_metrics ()
|
||||
in
|
||||
AList.add gc_metrics l
|
||||
)
|
||||
|
||||
let n_errors = Atomic.make 0
|
||||
|
||||
let n_dropped = Atomic.make 0
|
||||
|
||||
(** Something sent to the collector *)
|
||||
module Event = struct
|
||||
open Opentelemetry.Proto
|
||||
|
||||
type t =
|
||||
| E_metric of Metrics.resource_metrics list
|
||||
| E_trace of Trace.resource_spans list
|
||||
| E_logs of Logs.resource_logs list
|
||||
| E_tick
|
||||
| E_flush_all (** Flush all batches *)
|
||||
end
|
||||
|
||||
(** Something to be sent via HTTP *)
|
||||
module To_send = struct
|
||||
open Opentelemetry.Proto
|
||||
|
||||
type t =
|
||||
| Send_metric of Metrics.resource_metrics list list
|
||||
| Send_trace of Trace.resource_spans list list
|
||||
| Send_logs of Logs.resource_logs list list
|
||||
| Send_metric of Metrics.resource_metrics list
|
||||
| Send_trace of Trace.resource_spans list
|
||||
| Send_logs of Logs.resource_logs list
|
||||
end
|
||||
|
||||
(** start a thread in the background, running [f()] *)
|
||||
|
|
@ -110,32 +67,50 @@ let str_to_hex (s : string) : string =
|
|||
done;
|
||||
Bytes.unsafe_to_string res
|
||||
|
||||
module Backend_impl : sig
|
||||
type t
|
||||
module Exporter_impl : sig
|
||||
val n_bytes_sent : int Atomic.t
|
||||
|
||||
class type t = object
|
||||
inherit OT.Exporter.t
|
||||
|
||||
method shutdown : on_done:(unit -> unit) -> unit -> unit
|
||||
end
|
||||
|
||||
val create : stop:bool Atomic.t -> config:Config.t -> unit -> t
|
||||
|
||||
val send_event : t -> Event.t -> unit
|
||||
|
||||
val n_bytes_sent : unit -> int
|
||||
|
||||
val shutdown : t -> on_done:(unit -> unit) -> unit
|
||||
end = struct
|
||||
open Opentelemetry.Proto
|
||||
|
||||
type t = {
|
||||
let n_bytes_sent : int Atomic.t = Atomic.make 0
|
||||
|
||||
class type t = object
|
||||
inherit OT.Exporter.t
|
||||
|
||||
method shutdown : on_done:(unit -> unit) -> unit -> unit
|
||||
end
|
||||
|
||||
type state = {
|
||||
stop: bool Atomic.t;
|
||||
cleaned: bool Atomic.t; (** True when we cleaned up after closing *)
|
||||
config: Config.t;
|
||||
q: Event.t B_queue.t; (** Queue to receive data from the user's code *)
|
||||
mutable main_th: Thread.t option; (** Thread that listens on [q] *)
|
||||
send_q: To_send.t B_queue.t; (** Queue for the send worker threads *)
|
||||
send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *)
|
||||
traces: Proto.Trace.span Batch.t;
|
||||
logs: Proto.Logs.log_record Batch.t;
|
||||
metrics: Proto.Metrics.metric Batch.t;
|
||||
mutable send_threads: Thread.t array; (** Threads that send data via http *)
|
||||
}
|
||||
|
||||
let send_batch_ (self : state) ~force ~mk_to_send (b : _ Batch.t) : unit =
|
||||
match Batch.pop_if_ready ~force ~now:(Mtime_clock.now ()) b with
|
||||
| None -> ()
|
||||
| Some l ->
|
||||
let to_send = mk_to_send l in
|
||||
Sync_queue.push self.send_q to_send
|
||||
|
||||
let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit =
|
||||
let@ _sc =
|
||||
Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
|
||||
Self_trace.with_ ~kind:Span_kind_producer "otel-ocurl.send-http"
|
||||
in
|
||||
|
||||
if Config.Env.get_debug () then
|
||||
|
|
@ -146,7 +121,7 @@ end = struct
|
|||
in
|
||||
match
|
||||
let@ _sc =
|
||||
Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post"
|
||||
Self_trace.with_ ~kind:Span_kind_internal "curl.post"
|
||||
~attrs:[ "sz", `Int (String.length data); "url", `String url ]
|
||||
in
|
||||
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
|
||||
|
|
@ -187,31 +162,24 @@ end = struct
|
|||
(* avoid crazy error loop *)
|
||||
Thread.delay 3.
|
||||
|
||||
let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev
|
||||
|
||||
let n_bytes_sent_ = Atomic.make 0
|
||||
|
||||
let[@inline] n_bytes_sent () = Atomic.get n_bytes_sent_
|
||||
|
||||
(** Thread that, in a loop, reads from [q] to get the next message to send via
|
||||
http *)
|
||||
let bg_thread_loop (self : t) : unit =
|
||||
let bg_thread_loop (self : state) : unit =
|
||||
Ezcurl.with_client ?set_opts:None @@ fun client ->
|
||||
let config = self.config in
|
||||
let stop = self.stop in
|
||||
let send ~name ~url ~conv signals =
|
||||
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] signals in
|
||||
let send ~name ~url ~conv (signals : _ list) =
|
||||
let@ _sp =
|
||||
Self_trace.with_ ~kind:Span_kind_producer name
|
||||
~attrs:[ "n", `Int (List.length l) ]
|
||||
~attrs:[ "n", `Int (List.length signals) ]
|
||||
in
|
||||
let msg = conv l in
|
||||
ignore (Atomic.fetch_and_add n_bytes_sent_ (String.length msg) : int);
|
||||
let msg = conv signals in
|
||||
ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int);
|
||||
send_http_ ~stop ~config ~url client msg
|
||||
in
|
||||
try
|
||||
while not (Atomic.get stop) do
|
||||
let msg = B_queue.pop self.send_q in
|
||||
let msg = Sync_queue.pop self.send_q in
|
||||
match msg with
|
||||
| To_send.Send_trace tr ->
|
||||
send ~name:"send-traces" ~conv:Signal.Encode.traces
|
||||
|
|
@ -223,252 +191,135 @@ end = struct
|
|||
send ~name:"send-logs" ~conv:Signal.Encode.logs
|
||||
~url:config.common.url_logs logs
|
||||
done
|
||||
with B_queue.Closed -> ()
|
||||
|
||||
type batches = {
|
||||
traces: Proto.Trace.resource_spans Batch.t;
|
||||
logs: Proto.Logs.resource_logs Batch.t;
|
||||
metrics: Proto.Metrics.resource_metrics Batch.t;
|
||||
}
|
||||
with Sync_queue.Closed -> ()
|
||||
|
||||
let batch_max_size_ = 200
|
||||
|
||||
let should_send_batch_ ?(side = []) ~config ~now (b : _ Batch.t) : bool =
|
||||
(Batch.len b > 0 || side != [])
|
||||
&& (Batch.len b >= batch_max_size_
|
||||
||
|
||||
let timeout = Mtime.Span.(config.Config.common.batch_timeout_ms * ms) in
|
||||
let elapsed = Mtime.span now (Batch.time_started b) in
|
||||
Mtime.Span.compare elapsed timeout >= 0)
|
||||
let batch_timeout_ = Mtime.Span.(20 * s)
|
||||
|
||||
let main_thread_loop (self : t) : unit =
|
||||
let local_q = Queue.create () in
|
||||
let config = self.config in
|
||||
|
||||
(* keep track of batches *)
|
||||
let batches =
|
||||
{
|
||||
traces = Batch.create ();
|
||||
logs = Batch.create ();
|
||||
metrics = Batch.create ();
|
||||
}
|
||||
in
|
||||
|
||||
let send_metrics () =
|
||||
let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in
|
||||
B_queue.push self.send_q (To_send.Send_metric metrics)
|
||||
in
|
||||
|
||||
let send_logs () =
|
||||
B_queue.push self.send_q (To_send.Send_logs (Batch.pop_all batches.logs))
|
||||
in
|
||||
|
||||
let send_traces () =
|
||||
B_queue.push self.send_q
|
||||
(To_send.Send_trace (Batch.pop_all batches.traces))
|
||||
in
|
||||
|
||||
try
|
||||
while not (Atomic.get self.stop) do
|
||||
(* read multiple events at once *)
|
||||
B_queue.pop_all self.q local_q;
|
||||
|
||||
(* are we asked to flush all events? *)
|
||||
let must_flush_all = ref false in
|
||||
|
||||
(* how to process a single event *)
|
||||
let process_ev (ev : Event.t) : unit =
|
||||
match ev with
|
||||
| Event.E_metric m -> Batch.push batches.metrics m
|
||||
| Event.E_trace tr -> Batch.push batches.traces tr
|
||||
| Event.E_logs logs -> Batch.push batches.logs logs
|
||||
| Event.E_tick ->
|
||||
(* the only impact of "tick" is that it wakes us up regularly *)
|
||||
()
|
||||
| Event.E_flush_all -> must_flush_all := true
|
||||
in
|
||||
|
||||
Queue.iter process_ev local_q;
|
||||
Queue.clear local_q;
|
||||
|
||||
if !must_flush_all then (
|
||||
if Batch.len batches.metrics > 0 || not (AList.is_empty gc_metrics)
|
||||
then
|
||||
send_metrics ();
|
||||
if Batch.len batches.logs > 0 then send_logs ();
|
||||
if Batch.len batches.traces > 0 then send_traces ()
|
||||
) else (
|
||||
let now = Mtime_clock.now () in
|
||||
if
|
||||
should_send_batch_ ~config ~now batches.metrics
|
||||
~side:(AList.get gc_metrics)
|
||||
then
|
||||
send_metrics ();
|
||||
|
||||
if should_send_batch_ ~config ~now batches.traces then send_traces ();
|
||||
if should_send_batch_ ~config ~now batches.logs then send_logs ()
|
||||
)
|
||||
done
|
||||
with B_queue.Closed -> ()
|
||||
|
||||
let create ~stop ~config () : t =
|
||||
let create_state ~stop ~config () : state =
|
||||
let n_send_threads = max 2 config.Config.bg_threads in
|
||||
let self =
|
||||
{
|
||||
stop;
|
||||
config;
|
||||
q = B_queue.create ();
|
||||
send_threads = [||];
|
||||
send_q = B_queue.create ();
|
||||
send_q = Sync_queue.create ();
|
||||
cleaned = Atomic.make false;
|
||||
main_th = None;
|
||||
traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
|
||||
logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
|
||||
metrics = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
|
||||
}
|
||||
in
|
||||
|
||||
let main_th = start_bg_thread (fun () -> main_thread_loop self) in
|
||||
self.main_th <- Some main_th;
|
||||
|
||||
self.send_threads <-
|
||||
Array.init n_send_threads (fun _i ->
|
||||
start_bg_thread (fun () -> bg_thread_loop self));
|
||||
|
||||
self
|
||||
|
||||
let shutdown self ~on_done : unit =
|
||||
Atomic.set self.stop true;
|
||||
if not (Atomic.exchange self.cleaned true) then (
|
||||
(* empty batches *)
|
||||
send_event self Event.E_flush_all;
|
||||
(* close the incoming queue, wait for the thread to finish
|
||||
before we start cutting off the background threads, so that they
|
||||
have time to receive the final batches *)
|
||||
B_queue.close self.q;
|
||||
Option.iter Thread.join self.main_th;
|
||||
(* close send queues, then wait for all threads *)
|
||||
B_queue.close self.send_q;
|
||||
Array.iter Thread.join self.send_threads
|
||||
);
|
||||
on_done ()
|
||||
let maybe_send_metrics ~force (self : state) =
|
||||
send_batch_ self ~force self.metrics ~mk_to_send:(fun metrics ->
|
||||
let metrics =
|
||||
Opentelemetry_client.Util_resources.make_resource_metrics metrics
|
||||
in
|
||||
To_send.Send_metric [ metrics ])
|
||||
|
||||
let maybe_send_logs ~force (self : state) =
|
||||
send_batch_ self ~force self.logs ~mk_to_send:(fun logs ->
|
||||
let logs =
|
||||
Opentelemetry_client.Util_resources.make_resource_logs logs
|
||||
in
|
||||
To_send.Send_logs [ logs ])
|
||||
|
||||
let maybe_send_traces ~force (self : state) =
|
||||
send_batch_ self ~force self.traces ~mk_to_send:(fun spans ->
|
||||
let traces =
|
||||
Opentelemetry_client.Util_resources.make_resource_spans spans
|
||||
in
|
||||
To_send.Send_trace [ traces ])
|
||||
|
||||
let create ~stop ~config () : #t =
|
||||
let open Opentelemetry_util in
|
||||
let st = create_state ~stop ~config () in
|
||||
let ticker = Cb_set.create () in
|
||||
object (self : #t)
|
||||
method send_trace spans =
|
||||
Batch.push' st.traces spans;
|
||||
maybe_send_traces st ~force:false
|
||||
|
||||
method send_metrics m =
|
||||
Batch.push' st.metrics m;
|
||||
maybe_send_metrics st ~force:false
|
||||
|
||||
method send_logs m =
|
||||
Batch.push' st.logs m;
|
||||
maybe_send_logs st ~force:false
|
||||
|
||||
method add_on_tick_callback cb = Cb_set.register ticker cb
|
||||
|
||||
method tick () = Cb_set.trigger ticker
|
||||
|
||||
method cleanup ~on_done () : unit =
|
||||
if not (Atomic.exchange st.cleaned true) then (
|
||||
(* flush all signals *)
|
||||
maybe_send_logs ~force:true st;
|
||||
maybe_send_metrics ~force:true st;
|
||||
maybe_send_traces ~force:true st;
|
||||
|
||||
(* close send queues, then wait for all threads *)
|
||||
Sync_queue.close st.send_q;
|
||||
Array.iter Thread.join st.send_threads
|
||||
);
|
||||
on_done ()
|
||||
|
||||
method shutdown ~on_done () =
|
||||
Atomic.set st.stop true;
|
||||
self#cleanup ~on_done ()
|
||||
end
|
||||
|
||||
let shutdown (self : #t) ~on_done : unit = self#shutdown ~on_done ()
|
||||
end
|
||||
|
||||
let create_backend ?(stop = Atomic.make false)
|
||||
?(config : Config.t = Config.make ()) () : (module Collector.BACKEND) =
|
||||
let module M = struct
|
||||
open Opentelemetry.Proto
|
||||
open Opentelemetry.Collector
|
||||
|
||||
let backend = Backend_impl.create ~stop ~config ()
|
||||
|
||||
let send_trace : Trace.resource_spans list sender =
|
||||
{
|
||||
send =
|
||||
(fun l ~ret ->
|
||||
Backend_impl.send_event backend (Event.E_trace l);
|
||||
ret ());
|
||||
}
|
||||
|
||||
let last_sent_metrics = Atomic.make (Mtime_clock.now ())
|
||||
|
||||
(* send metrics from time to time *)
|
||||
let timeout_sent_metrics = Mtime.Span.(5 * s)
|
||||
|
||||
let signal_emit_gc_metrics () =
|
||||
if config.common.debug then
|
||||
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
|
||||
Atomic.set needs_gc_metrics true
|
||||
|
||||
let additional_metrics () : Metrics.resource_metrics list =
|
||||
(* add exporter metrics to the lot? *)
|
||||
let last_emit = Atomic.get last_sent_metrics in
|
||||
let now = Mtime_clock.now () in
|
||||
let add_own_metrics =
|
||||
let elapsed = Mtime.span last_emit now in
|
||||
Mtime.Span.compare elapsed timeout_sent_metrics > 0
|
||||
in
|
||||
|
||||
(* there is a possible race condition here, as several threads might update
|
||||
metrics at the same time. But that's harmless. *)
|
||||
if add_own_metrics then (
|
||||
Atomic.set last_sent_metrics now;
|
||||
let open OT.Metrics in
|
||||
let now_unix = OT.Timestamp_ns.now_unix_ns () in
|
||||
[
|
||||
make_resource_metrics
|
||||
[
|
||||
sum ~name:"otel.export.dropped" ~is_monotonic:true
|
||||
[
|
||||
int ~start_time_unix_nano:now_unix ~now:now_unix
|
||||
(Atomic.get n_dropped);
|
||||
];
|
||||
sum ~name:"otel.export.errors" ~is_monotonic:true
|
||||
[
|
||||
int ~start_time_unix_nano:now_unix ~now:now_unix
|
||||
(Atomic.get n_errors);
|
||||
];
|
||||
];
|
||||
]
|
||||
) else
|
||||
[]
|
||||
|
||||
let send_metrics : Metrics.resource_metrics list sender =
|
||||
{
|
||||
send =
|
||||
(fun m ~ret ->
|
||||
let m = List.rev_append (additional_metrics ()) m in
|
||||
Backend_impl.send_event backend (Event.E_metric m);
|
||||
ret ());
|
||||
}
|
||||
|
||||
let send_logs : Logs.resource_logs list sender =
|
||||
{
|
||||
send =
|
||||
(fun m ~ret ->
|
||||
Backend_impl.send_event backend (Event.E_logs m);
|
||||
ret ());
|
||||
}
|
||||
|
||||
let on_tick_cbs_ = Atomic.make (AList.make ())
|
||||
|
||||
let set_on_tick_callbacks = Atomic.set on_tick_cbs_
|
||||
|
||||
let tick () =
|
||||
sample_gc_metrics_if_needed ();
|
||||
Backend_impl.send_event backend Event.E_tick;
|
||||
List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_)
|
||||
|
||||
let cleanup ~on_done () = Backend_impl.shutdown backend ~on_done
|
||||
end in
|
||||
(module M)
|
||||
let create_exporter ?(stop = Atomic.make false)
|
||||
?(config : Config.t = Config.make ()) () : #OT.Exporter.t =
|
||||
let backend = Exporter_impl.create ~stop ~config () in
|
||||
(backend :> OT.Exporter.t)
|
||||
|
||||
(** thread that calls [tick()] regularly, to help enforce timeouts *)
|
||||
let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () =
|
||||
let setup_ticker_thread ~stop ~sleep_ms (exp : #OT.Exporter.t) () =
|
||||
let sleep_s = float sleep_ms /. 1000. in
|
||||
let tick_loop () =
|
||||
try
|
||||
while not @@ Atomic.get stop do
|
||||
Thread.delay sleep_s;
|
||||
B.tick ()
|
||||
exp#tick ()
|
||||
done
|
||||
with B_queue.Closed -> ()
|
||||
with
|
||||
| Sync_queue.Closed -> ()
|
||||
| exn ->
|
||||
(* print and ignore *)
|
||||
Printf.eprintf "otel-ocurl: ticker thread: uncaught exn:\n%s\n%!"
|
||||
(Printexc.to_string exn)
|
||||
in
|
||||
start_bg_thread tick_loop
|
||||
|
||||
let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
|
||||
: unit =
|
||||
let backend = create_backend ~stop ~config () in
|
||||
Opentelemetry.Collector.set_backend backend;
|
||||
let exporter = Exporter_impl.create ~stop ~config () in
|
||||
OT.Exporter.Main_exporter.set exporter;
|
||||
|
||||
Self_trace.set_enabled config.common.self_trace;
|
||||
|
||||
if config.ticker_thread then (
|
||||
(* at most a minute *)
|
||||
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
|
||||
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
|
||||
ignore (setup_ticker_thread ~stop ~sleep_ms exporter () : Thread.t)
|
||||
)
|
||||
|
||||
let remove_backend () : unit =
|
||||
(* we don't need the callback, this runs in the same thread *)
|
||||
OT.Collector.remove_backend () ~on_done:ignore
|
||||
OT.Exporter.Main_exporter.remove () ~on_done:ignore
|
||||
|
||||
let setup ?stop ?config ?(enable = true) () =
|
||||
if enable then setup_ ?stop ?config ()
|
||||
|
|
@ -480,4 +331,4 @@ let with_setup ?stop ?config ?(enable = true) () f =
|
|||
) else
|
||||
f ()
|
||||
|
||||
let n_bytes_sent = Backend_impl.n_bytes_sent
|
||||
let[@inline] n_bytes_sent () = Atomic.get Exporter_impl.n_bytes_sent
|
||||
|
|
|
|||
|
|
@ -3,22 +3,21 @@
|
|||
https://opentelemetry.io/docs/reference/specification/protocol/exporter/
|
||||
*)
|
||||
|
||||
open Opentelemetry_atomic
|
||||
open Opentelemetry_util
|
||||
|
||||
val get_headers : unit -> (string * string) list
|
||||
|
||||
val set_headers : (string * string) list -> unit
|
||||
(** Set http headers that are sent on every http query to the collector. *)
|
||||
|
||||
module Atomic = Opentelemetry_atomic.Atomic
|
||||
module Config = Config
|
||||
|
||||
val n_bytes_sent : unit -> int
|
||||
(** Global counter of bytes sent (or attempted to be sent) *)
|
||||
|
||||
val create_backend :
|
||||
?stop:bool Atomic.t ->
|
||||
?config:Config.t ->
|
||||
unit ->
|
||||
(module Opentelemetry.Collector.BACKEND)
|
||||
val create_exporter :
|
||||
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
|
||||
|
||||
val setup :
|
||||
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue