mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
feat exporter: split tick/on_tick again
This commit is contained in:
parent
fe0aa297a6
commit
35f8bbc67d
2 changed files with 17 additions and 68 deletions
|
|
@ -9,17 +9,14 @@
|
||||||
open Common_
|
open Common_
|
||||||
open Opentelemetry_emitter
|
open Opentelemetry_emitter
|
||||||
|
|
||||||
open struct
|
|
||||||
module Proto = Opentelemetry_proto
|
|
||||||
end
|
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
emit_spans: Proto.Trace.span Emitter.t;
|
emit_spans: Proto.Trace.span Emitter.t;
|
||||||
emit_metrics: Proto.Metrics.metric Emitter.t;
|
emit_metrics: Proto.Metrics.metric Emitter.t;
|
||||||
emit_logs: Proto.Logs.log_record Emitter.t;
|
emit_logs: Proto.Logs.log_record Emitter.t;
|
||||||
on_tick: Cb_set.t;
|
on_tick: (unit -> unit) -> unit;
|
||||||
(** Set of callbacks for "on tick". Should be triggered regularly for
|
tick: unit -> unit;
|
||||||
background processing, timeout checks, etc. *)
|
(** Call all the callbacks registered with [on_tick]. Should be triggered
|
||||||
|
regularly for background processing, timeout checks, etc. *)
|
||||||
cleanup: on_done:(unit -> unit) -> unit -> unit;
|
cleanup: on_done:(unit -> unit) -> unit -> unit;
|
||||||
(** [cleanup ~on_done ()] is called when the exporter is shut down, and is
|
(** [cleanup ~on_done ()] is called when the exporter is shut down, and is
|
||||||
responsible for sending remaining batches, flushing sockets, etc.
|
responsible for sending remaining batches, flushing sockets, etc.
|
||||||
|
|
@ -30,12 +27,13 @@ type t = {
|
||||||
|
|
||||||
(** Dummy exporter, does nothing *)
|
(** Dummy exporter, does nothing *)
|
||||||
let dummy () : t =
|
let dummy () : t =
|
||||||
let on_tick = Cb_set.create () in
|
let ticker = Cb_set.create () in
|
||||||
{
|
{
|
||||||
emit_spans = Emitter.dummy ();
|
emit_spans = Emitter.dummy ();
|
||||||
emit_metrics = Emitter.dummy ();
|
emit_metrics = Emitter.dummy ();
|
||||||
emit_logs = Emitter.dummy ();
|
emit_logs = Emitter.dummy ();
|
||||||
on_tick;
|
on_tick = Cb_set.register ticker;
|
||||||
|
tick = (fun () -> Cb_set.trigger ticker);
|
||||||
cleanup = (fun ~on_done () -> on_done ());
|
cleanup = (fun ~on_done () -> on_done ());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -48,72 +46,19 @@ let[@inline] send_metrics (self : t) (l : Proto.Metrics.metric list) =
|
||||||
let[@inline] send_logs (self : t) (l : Proto.Logs.log_record list) =
|
let[@inline] send_logs (self : t) (l : Proto.Logs.log_record list) =
|
||||||
Emitter.emit self.emit_logs l
|
Emitter.emit self.emit_logs l
|
||||||
|
|
||||||
let on_tick (self : t) f = Cb_set.register self.on_tick f
|
let[@inline] on_tick (self : t) f = self.on_tick f
|
||||||
|
|
||||||
(** Do background work. Call this regularly if the collector doesn't already
|
(** Do background work. Call this regularly if the collector doesn't already
|
||||||
have a ticker thread or internal timer. *)
|
have a ticker thread or internal timer. *)
|
||||||
let tick (self : t) =
|
let tick (self : t) =
|
||||||
Cb_set.trigger self.on_tick;
|
(* make sure emitters get the chance to check timeouts, flush, etc. *)
|
||||||
(* also tick each emitter! *)
|
|
||||||
let now = Mtime_clock.now () in
|
let now = Mtime_clock.now () in
|
||||||
Emitter.tick ~now self.emit_spans;
|
Emitter.tick ~now self.emit_spans;
|
||||||
Emitter.tick ~now self.emit_metrics;
|
Emitter.tick ~now self.emit_metrics;
|
||||||
Emitter.tick ~now self.emit_logs;
|
Emitter.tick ~now self.emit_logs;
|
||||||
|
|
||||||
|
(* call the callbacks *)
|
||||||
|
self.tick ();
|
||||||
()
|
()
|
||||||
|
|
||||||
let[@inline] cleanup (self : t) ~on_done : unit = self.cleanup ~on_done ()
|
let[@inline] cleanup (self : t) ~on_done : unit = self.cleanup ~on_done ()
|
||||||
|
|
||||||
(** Main exporter, used by the main tracing functions.
|
|
||||||
|
|
||||||
It is better to pass an explicit exporter when possible. *)
|
|
||||||
module Main_exporter = struct
|
|
||||||
(* hidden *)
|
|
||||||
open struct
|
|
||||||
(* a list of callbacks automatically added to the main exporter *)
|
|
||||||
let on_tick_cbs_ = Alist.make ()
|
|
||||||
|
|
||||||
let exporter : t option Atomic.t = Atomic.make None
|
|
||||||
end
|
|
||||||
|
|
||||||
(** Set the global exporter *)
|
|
||||||
let set (exp : t) : unit =
|
|
||||||
List.iter (on_tick exp) (Alist.get on_tick_cbs_);
|
|
||||||
Atomic.set exporter (Some exp)
|
|
||||||
|
|
||||||
(** Remove current exporter, if any.
|
|
||||||
@param on_done see {!t#cleanup}, @since 0.12 *)
|
|
||||||
let remove ~on_done () : unit =
|
|
||||||
match Atomic.exchange exporter None with
|
|
||||||
| None -> ()
|
|
||||||
| Some exp ->
|
|
||||||
tick exp;
|
|
||||||
cleanup exp ~on_done
|
|
||||||
|
|
||||||
(** Is there a configured exporter? *)
|
|
||||||
let present () : bool = Option.is_some (Atomic.get exporter)
|
|
||||||
|
|
||||||
(** Current exporter, if any *)
|
|
||||||
let[@inline] get () : t option = Atomic.get exporter
|
|
||||||
|
|
||||||
let add_on_tick_callback f =
|
|
||||||
Alist.add on_tick_cbs_ f;
|
|
||||||
Option.iter (fun exp -> on_tick exp f) (get ())
|
|
||||||
end
|
|
||||||
|
|
||||||
let (set_backend [@deprecated "use `Main_exporter.set`"]) = Main_exporter.set
|
|
||||||
|
|
||||||
let (remove_backend [@deprecated "use `Main_exporter.remove`"]) =
|
|
||||||
Main_exporter.remove
|
|
||||||
|
|
||||||
let (has_backend [@deprecated "use `Main_exporter.present`"]) =
|
|
||||||
Main_exporter.present
|
|
||||||
|
|
||||||
let (get_backend [@deprecated "use `Main_exporter.ge"]) = Main_exporter.get
|
|
||||||
|
|
||||||
let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f
|
|
||||||
=
|
|
||||||
if enable then (
|
|
||||||
Main_exporter.set exp;
|
|
||||||
Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f
|
|
||||||
) else
|
|
||||||
f ()
|
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,8 @@ val id : t -> Span_id.t
|
||||||
|
|
||||||
val trace_id : t -> Trace_id.t
|
val trace_id : t -> Trace_id.t
|
||||||
|
|
||||||
|
val is_not_dummy : t -> bool
|
||||||
|
|
||||||
val create_new :
|
val create_new :
|
||||||
?kind:kind ->
|
?kind:kind ->
|
||||||
?id:Span_id.t ->
|
?id:Span_id.t ->
|
||||||
|
|
@ -96,7 +98,9 @@ val add_links' : t -> (unit -> Span_link.t list) -> unit
|
||||||
Note that this takes a function that produces links, and will only call it
|
Note that this takes a function that produces links, and will only call it
|
||||||
if there is an instrumentation backend. *)
|
if there is an instrumentation backend. *)
|
||||||
|
|
||||||
val add_attrs : t -> (unit -> Key_value.t list) -> unit
|
val add_attrs : t -> Key_value.t list -> unit
|
||||||
|
|
||||||
|
val add_attrs' : t -> (unit -> Key_value.t list) -> unit
|
||||||
|
|
||||||
val set_status : t -> Span_status.t -> unit
|
val set_status : t -> Span_status.t -> unit
|
||||||
(** set the span status.
|
(** set the span status.
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue