diff --git a/src/core/any_signal.ml b/src/core/any_signal.ml new file mode 100644 index 00000000..4b3ec102 --- /dev/null +++ b/src/core/any_signal.ml @@ -0,0 +1,13 @@ +(** Any kind of signal *) + +open Common_ + +type t = + | Span of Span.t + | Metric of Metrics.t + | Log of Log_record.t + +let pp out = function + | Span s -> Proto.Trace.pp_span out s + | Metric m -> Proto.Metrics.pp_metric out m + | Log l -> Proto.Logs.pp_log_record out l diff --git a/src/core/dune b/src/core/dune index dce03edd..c33b4edc 100644 --- a/src/core/dune +++ b/src/core/dune @@ -6,9 +6,10 @@ (libraries opentelemetry.proto opentelemetry.util + opentelemetry.atomic + opentelemetry.emitter ptime ptime.clock.os pbrt threads - opentelemetry.atomic hmap)) diff --git a/src/core/exporter.ml b/src/core/exporter.ml index 7c08f588..c077cfe7 100644 --- a/src/core/exporter.ml +++ b/src/core/exporter.ml @@ -7,69 +7,61 @@ in their own library. *) open Common_ +open Opentelemetry_emitter open struct module Proto = Opentelemetry_proto end -(** Main exporter interface *) -class type t = object - method send_trace : Proto.Trace.span list -> unit - - method send_metrics : Proto.Metrics.metric list -> unit - - method send_logs : Proto.Logs.log_record list -> unit - - method tick : unit -> unit - (** Should be called regularly for background processing, timeout checks, etc. - *) - - method add_on_tick_callback : (unit -> unit) -> unit - (** Add the given of callback to the exporter when [tick()] is called. The - callback should be short and reentrant. Depending on the exporter's - implementation, it might be called from a thread that is not the one that - called [on_tick]. *) - - method cleanup : on_done:(unit -> unit) -> unit -> unit - (** [cleanup ~on_done ()] is called when the exporter is shut down, and is +type t = { + emit_spans: Proto.Trace.span Emitter.t; + emit_metrics: Proto.Metrics.metric Emitter.t; + emit_logs: Proto.Logs.log_record Emitter.t; + on_tick: Cb_set.t; + (** Set of callbacks for "on tick". Should be triggered regularly for + background processing, timeout checks, etc. *) + cleanup: on_done:(unit -> unit) -> unit -> unit; + (** [cleanup ~on_done ()] is called when the exporter is shut down, and is responsible for sending remaining batches, flushing sockets, etc. @param on_done callback invoked after the cleanup is done. @since 0.12 *) -end +} +(** Main exporter interface. *) (** Dummy exporter, does nothing *) -let dummy : t = - let tick_cbs = Cb_set.create () in - object - method send_trace = ignore +let dummy () : t = + let on_tick = Cb_set.create () in + { + emit_spans = Emitter.dummy (); + emit_metrics = Emitter.dummy (); + emit_logs = Emitter.dummy (); + on_tick; + cleanup = (fun ~on_done () -> on_done ()); + } - method send_metrics = ignore +let[@inline] send_trace (self : t) (l : Proto.Trace.span list) = + Emitter.emit self.emit_spans l - method send_logs = ignore +let[@inline] send_metrics (self : t) (l : Proto.Metrics.metric list) = + Emitter.emit self.emit_metrics l - method tick () = Cb_set.trigger tick_cbs +let[@inline] send_logs (self : t) (l : Proto.Logs.log_record list) = + Emitter.emit self.emit_logs l - method add_on_tick_callback cb = Cb_set.register tick_cbs cb - - method cleanup ~on_done () = on_done () - end - -let[@inline] send_trace (self : #t) (l : Proto.Trace.span list) = - self#send_trace l - -let[@inline] send_metrics (self : #t) (l : Proto.Metrics.metric list) = - self#send_metrics l - -let[@inline] send_logs (self : #t) (l : Proto.Logs.log_record list) = - self#send_logs l - -let[@inline] on_tick (self : #t) f = self#add_on_tick_callback f +let on_tick (self : t) f = Cb_set.register self.on_tick f (** Do background work. Call this regularly if the collector doesn't already have a ticker thread or internal timer. *) -let[@inline] tick (self : #t) = self#tick () +let tick (self : t) = + Cb_set.trigger self.on_tick; + (* also tick each emitter! *) + let now = Mtime_clock.now () in + Emitter.tick ~now self.emit_spans; + Emitter.tick ~now self.emit_metrics; + Emitter.tick ~now self.emit_logs; + () -let[@inline] cleanup (self : #t) ~on_done : unit = self#cleanup ~on_done () +let[@inline] cleanup (self : t) ~on_done : unit = self.cleanup ~on_done () (** Main exporter, used by the main tracing functions. @@ -84,9 +76,8 @@ module Main_exporter = struct end (** Set the global exporter *) - let set (exp : #t) : unit = - let exp = (exp :> t) in - List.iter exp#add_on_tick_callback (Alist.get on_tick_cbs_); + let set (exp : t) : unit = + List.iter (on_tick exp) (Alist.get on_tick_cbs_); Atomic.set exporter (Some exp) (** Remove current exporter, if any. @@ -95,7 +86,7 @@ module Main_exporter = struct match Atomic.exchange exporter None with | None -> () | Some exp -> - exp#tick (); + tick exp; cleanup exp ~on_done (** Is there a configured exporter? *) @@ -106,7 +97,7 @@ module Main_exporter = struct let add_on_tick_callback f = Alist.add on_tick_cbs_ f; - Option.iter (fun exp -> exp#add_on_tick_callback f) (get ()) + Option.iter (fun exp -> on_tick exp f) (get ()) end let (set_backend [@deprecated "use `Main_exporter.set`"]) = Main_exporter.set @@ -119,9 +110,8 @@ let (has_backend [@deprecated "use `Main_exporter.present`"]) = let (get_backend [@deprecated "use `Main_exporter.ge"]) = Main_exporter.get -let with_setup_debug_backend ?(on_done = ignore) (exp : #t) ?(enable = true) () - f = - let exp = (exp :> t) in +let with_setup_debug_backend ?(on_done = ignore) (exp : t) ?(enable = true) () f + = if enable then ( Main_exporter.set exp; Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f diff --git a/src/core/gc_metrics.ml b/src/core/gc_metrics.ml index 9e3c17cf..e2ade0dd 100644 --- a/src/core/gc_metrics.ml +++ b/src/core/gc_metrics.ml @@ -36,7 +36,7 @@ let get_metrics () : Metrics.t list = [ int ~now gc.Gc.compactions ]; ] -let setup ?(min_interval_s = default_interval_s) (exp : #Exporter.t) = +let setup ?(min_interval_s = default_interval_s) (exp : Exporter.t) = (* limit rate *) let min_interval_s = max 5 min_interval_s in let min_interval = Mtime.Span.(min_interval_s * s) in @@ -45,7 +45,7 @@ let setup ?(min_interval_s = default_interval_s) (exp : #Exporter.t) = let on_tick () = if Interval_limiter.make_attempt limiter then ( let m = get_metrics () in - exp#send_metrics m + Exporter.send_metrics exp m ) in Exporter.on_tick exp on_tick diff --git a/src/core/gc_metrics.mli b/src/core/gc_metrics.mli index e0c01883..d17496aa 100644 --- a/src/core/gc_metrics.mli +++ b/src/core/gc_metrics.mli @@ -5,7 +5,7 @@ val get_metrics : unit -> Metrics.t list (** Get a few metrics from the current state of the GC. *) -val setup : ?min_interval_s:int -> #Exporter.t -> unit +val setup : ?min_interval_s:int -> Exporter.t -> unit (** Setup a hook that will emit GC statistics on every tick. It does assume that [tick] is called regularly on the exporter. For example, if we ensure the exporter's [tick] function is called every 5s, we'll get GC metrics every