mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 20:07:55 -04:00
feat core: add Any_signal.t; make Exporter a record of emitters
This commit is contained in:
parent
57b790d2d2
commit
cded07d90a
5 changed files with 61 additions and 57 deletions
13
src/core/any_signal.ml
Normal file
13
src/core/any_signal.ml
Normal file
|
|
@ -0,0 +1,13 @@
|
||||||
|
(** Any kind of signal *)
|
||||||
|
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
type t =
|
||||||
|
| Span of Span.t
|
||||||
|
| Metric of Metrics.t
|
||||||
|
| Log of Log_record.t
|
||||||
|
|
||||||
|
let pp out = function
|
||||||
|
| Span s -> Proto.Trace.pp_span out s
|
||||||
|
| Metric m -> Proto.Metrics.pp_metric out m
|
||||||
|
| Log l -> Proto.Logs.pp_log_record out l
|
||||||
|
|
@ -6,9 +6,10 @@
|
||||||
(libraries
|
(libraries
|
||||||
opentelemetry.proto
|
opentelemetry.proto
|
||||||
opentelemetry.util
|
opentelemetry.util
|
||||||
|
opentelemetry.atomic
|
||||||
|
opentelemetry.emitter
|
||||||
ptime
|
ptime
|
||||||
ptime.clock.os
|
ptime.clock.os
|
||||||
pbrt
|
pbrt
|
||||||
threads
|
threads
|
||||||
opentelemetry.atomic
|
|
||||||
hmap))
|
hmap))
|
||||||
|
|
|
||||||
|
|
@ -7,69 +7,61 @@
|
||||||
in their own library. *)
|
in their own library. *)
|
||||||
|
|
||||||
open Common_
|
open Common_
|
||||||
|
open Opentelemetry_emitter
|
||||||
|
|
||||||
open struct
|
open struct
|
||||||
module Proto = Opentelemetry_proto
|
module Proto = Opentelemetry_proto
|
||||||
end
|
end
|
||||||
|
|
||||||
(** Main exporter interface *)
|
type t = {
|
||||||
class type t = object
|
emit_spans: Proto.Trace.span Emitter.t;
|
||||||
method send_trace : Proto.Trace.span list -> unit
|
emit_metrics: Proto.Metrics.metric Emitter.t;
|
||||||
|
emit_logs: Proto.Logs.log_record Emitter.t;
|
||||||
method send_metrics : Proto.Metrics.metric list -> unit
|
on_tick: Cb_set.t;
|
||||||
|
(** Set of callbacks for "on tick". Should be triggered regularly for
|
||||||
method send_logs : Proto.Logs.log_record list -> unit
|
background processing, timeout checks, etc. *)
|
||||||
|
cleanup: on_done:(unit -> unit) -> unit -> unit;
|
||||||
method tick : unit -> unit
|
(** [cleanup ~on_done ()] is called when the exporter is shut down, and is
|
||||||
(** Should be called regularly for background processing, timeout checks, etc.
|
|
||||||
*)
|
|
||||||
|
|
||||||
method add_on_tick_callback : (unit -> unit) -> unit
|
|
||||||
(** Add the given of callback to the exporter when [tick()] is called. The
|
|
||||||
callback should be short and reentrant. Depending on the exporter's
|
|
||||||
implementation, it might be called from a thread that is not the one that
|
|
||||||
called [on_tick]. *)
|
|
||||||
|
|
||||||
method cleanup : on_done:(unit -> unit) -> unit -> unit
|
|
||||||
(** [cleanup ~on_done ()] is called when the exporter is shut down, and is
|
|
||||||
responsible for sending remaining batches, flushing sockets, etc.
|
responsible for sending remaining batches, flushing sockets, etc.
|
||||||
@param on_done
|
@param on_done
|
||||||
callback invoked after the cleanup is done. @since 0.12 *)
|
callback invoked after the cleanup is done. @since 0.12 *)
|
||||||
end
|
}
|
||||||
|
(** Main exporter interface. *)
|
||||||
|
|
||||||
(** Dummy exporter, does nothing *)
|
(** Dummy exporter, does nothing *)
|
||||||
let dummy : t =
|
let dummy () : t =
|
||||||
let tick_cbs = Cb_set.create () in
|
let on_tick = Cb_set.create () in
|
||||||
object
|
{
|
||||||
method send_trace = ignore
|
emit_spans = Emitter.dummy ();
|
||||||
|
emit_metrics = Emitter.dummy ();
|
||||||
|
emit_logs = Emitter.dummy ();
|
||||||
|
on_tick;
|
||||||
|
cleanup = (fun ~on_done () -> on_done ());
|
||||||
|
}
|
||||||
|
|
||||||
method send_metrics = ignore
|
let[@inline] send_trace (self : t) (l : Proto.Trace.span list) =
|
||||||
|
Emitter.emit self.emit_spans l
|
||||||
|
|
||||||
method send_logs = ignore
|
let[@inline] send_metrics (self : t) (l : Proto.Metrics.metric list) =
|
||||||
|
Emitter.emit self.emit_metrics l
|
||||||
|
|
||||||
method tick () = Cb_set.trigger tick_cbs
|
let[@inline] send_logs (self : t) (l : Proto.Logs.log_record list) =
|
||||||
|
Emitter.emit self.emit_logs l
|
||||||
|
|
||||||
method add_on_tick_callback cb = Cb_set.register tick_cbs cb
|
let on_tick (self : t) f = Cb_set.register self.on_tick f
|
||||||
|
|
||||||
method cleanup ~on_done () = on_done ()
|
|
||||||
end
|
|
||||||
|
|
||||||
let[@inline] send_trace (self : #t) (l : Proto.Trace.span list) =
|
|
||||||
self#send_trace l
|
|
||||||
|
|
||||||
let[@inline] send_metrics (self : #t) (l : Proto.Metrics.metric list) =
|
|
||||||
self#send_metrics l
|
|
||||||
|
|
||||||
let[@inline] send_logs (self : #t) (l : Proto.Logs.log_record list) =
|
|
||||||
self#send_logs l
|
|
||||||
|
|
||||||
let[@inline] on_tick (self : #t) f = self#add_on_tick_callback f
|
|
||||||
|
|
||||||
(** Do background work. Call this regularly if the collector doesn't already
|
(** Do background work. Call this regularly if the collector doesn't already
|
||||||
have a ticker thread or internal timer. *)
|
have a ticker thread or internal timer. *)
|
||||||
let[@inline] tick (self : #t) = self#tick ()
|
let tick (self : t) =
|
||||||
|
Cb_set.trigger self.on_tick;
|
||||||
|
(* also tick each emitter! *)
|
||||||
|
let now = Mtime_clock.now () in
|
||||||
|
Emitter.tick ~now self.emit_spans;
|
||||||
|
Emitter.tick ~now self.emit_metrics;
|
||||||
|
Emitter.tick ~now self.emit_logs;
|
||||||
|
()
|
||||||
|
|
||||||
let[@inline] cleanup (self : #t) ~on_done : unit = self#cleanup ~on_done ()
|
let[@inline] cleanup (self : t) ~on_done : unit = self.cleanup ~on_done ()
|
||||||
|
|
||||||
(** Main exporter, used by the main tracing functions.
|
(** Main exporter, used by the main tracing functions.
|
||||||
|
|
||||||
|
|
@ -84,9 +76,8 @@ module Main_exporter = struct
|
||||||
end
|
end
|
||||||
|
|
||||||
(** Set the global exporter *)
|
(** Set the global exporter *)
|
||||||
let set (exp : #t) : unit =
|
let set (exp : t) : unit =
|
||||||
let exp = (exp :> t) in
|
List.iter (on_tick exp) (Alist.get on_tick_cbs_);
|
||||||
List.iter exp#add_on_tick_callback (Alist.get on_tick_cbs_);
|
|
||||||
Atomic.set exporter (Some exp)
|
Atomic.set exporter (Some exp)
|
||||||
|
|
||||||
(** Remove current exporter, if any.
|
(** Remove current exporter, if any.
|
||||||
|
|
@ -95,7 +86,7 @@ module Main_exporter = struct
|
||||||
match Atomic.exchange exporter None with
|
match Atomic.exchange exporter None with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some exp ->
|
| Some exp ->
|
||||||
exp#tick ();
|
tick exp;
|
||||||
cleanup exp ~on_done
|
cleanup exp ~on_done
|
||||||
|
|
||||||
(** Is there a configured exporter? *)
|
(** Is there a configured exporter? *)
|
||||||
|
|
@ -106,7 +97,7 @@ module Main_exporter = struct
|
||||||
|
|
||||||
let add_on_tick_callback f =
|
let add_on_tick_callback f =
|
||||||
Alist.add on_tick_cbs_ f;
|
Alist.add on_tick_cbs_ f;
|
||||||
Option.iter (fun exp -> exp#add_on_tick_callback f) (get ())
|
Option.iter (fun exp -> on_tick exp f) (get ())
|
||||||
end
|
end
|
||||||
|
|
||||||
let (set_backend [@deprecated "use `Main_exporter.set`"]) = Main_exporter.set
|
let (set_backend [@deprecated "use `Main_exporter.set`"]) = Main_exporter.set
|
||||||
|
|
@ -119,9 +110,8 @@ let (has_backend [@deprecated "use `Main_exporter.present`"]) =
|
||||||
|
|
||||||
let (get_backend [@deprecated "use `Main_exporter.ge"]) = Main_exporter.get
|
let (get_backend [@deprecated "use `Main_exporter.ge"]) = Main_exporter.get
|
||||||
|
|
||||||
let with_setup_debug_backend ?(on_done = ignore) (exp : #t) ?(enable = true) ()
|
let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f
|
||||||
f =
|
=
|
||||||
let exp = (exp :> t) in
|
|
||||||
if enable then (
|
if enable then (
|
||||||
Main_exporter.set exp;
|
Main_exporter.set exp;
|
||||||
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f
|
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ let get_metrics () : Metrics.t list =
|
||||||
[ int ~now gc.Gc.compactions ];
|
[ int ~now gc.Gc.compactions ];
|
||||||
]
|
]
|
||||||
|
|
||||||
let setup ?(min_interval_s = default_interval_s) (exp : #Exporter.t) =
|
let setup ?(min_interval_s = default_interval_s) (exp : Exporter.t) =
|
||||||
(* limit rate *)
|
(* limit rate *)
|
||||||
let min_interval_s = max 5 min_interval_s in
|
let min_interval_s = max 5 min_interval_s in
|
||||||
let min_interval = Mtime.Span.(min_interval_s * s) in
|
let min_interval = Mtime.Span.(min_interval_s * s) in
|
||||||
|
|
@ -45,7 +45,7 @@ let setup ?(min_interval_s = default_interval_s) (exp : #Exporter.t) =
|
||||||
let on_tick () =
|
let on_tick () =
|
||||||
if Interval_limiter.make_attempt limiter then (
|
if Interval_limiter.make_attempt limiter then (
|
||||||
let m = get_metrics () in
|
let m = get_metrics () in
|
||||||
exp#send_metrics m
|
Exporter.send_metrics exp m
|
||||||
)
|
)
|
||||||
in
|
in
|
||||||
Exporter.on_tick exp on_tick
|
Exporter.on_tick exp on_tick
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
val get_metrics : unit -> Metrics.t list
|
val get_metrics : unit -> Metrics.t list
|
||||||
(** Get a few metrics from the current state of the GC. *)
|
(** Get a few metrics from the current state of the GC. *)
|
||||||
|
|
||||||
val setup : ?min_interval_s:int -> #Exporter.t -> unit
|
val setup : ?min_interval_s:int -> Exporter.t -> unit
|
||||||
(** Setup a hook that will emit GC statistics on every tick. It does assume that
|
(** Setup a hook that will emit GC statistics on every tick. It does assume that
|
||||||
[tick] is called regularly on the exporter. For example, if we ensure the
|
[tick] is called regularly on the exporter. For example, if we ensure the
|
||||||
exporter's [tick] function is called every 5s, we'll get GC metrics every
|
exporter's [tick] function is called every 5s, we'll get GC metrics every
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue