From 092b9a5d2e9304a482f779a198d3905212c96530 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 17 Dec 2025 11:18:55 -0500 Subject: [PATCH] have a clock in exporter, pass a mtime in `tick` --- .../opentelemetry_client_cohttp_eio.ml | 2 +- .../opentelemetry_client_cohttp_lwt.ml | 2 +- .../opentelemetry_client_ocurl_lwt.ml | 2 +- .../opentelemetry_client_ocurl.ml | 3 ++- src/client/bounded_queue.ml | 2 +- src/client/consumer.ml | 5 +++-- src/client/emitter_combine.ml | 2 +- src/client/exporter_add_batching.ml | 3 +++ src/client/exporter_combine.ml | 3 ++- src/client/exporter_debug.ml | 4 +++- src/client/exporter_queued.ml | 13 +++++++------ src/client/exporter_stdout.ml | 5 +++-- src/client/generic_consumer.ml | 6 +++--- src/client/generic_consumer_exporter.ml | 11 ++++------- src/client/lwt/util_ticker.ml | 3 ++- src/client/sampler.ml | 2 +- src/client/util_thread.ml | 5 ++++- src/core/event.ml | 4 ++-- src/core/exporter.ml | 11 ++++++----- src/core/log_record.ml | 19 +++++++++---------- src/core/metrics.ml | 19 +++++++------------ src/emitter/emitter.ml | 10 +++++----- src/emitter/to_list.ml | 2 +- src/lib/gc_metrics.ml | 2 +- src/lib/main_exporter.ml | 8 +++++--- src/lib/opentelemetry.ml | 3 ++- 26 files changed, 80 insertions(+), 71 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 9788a057..d726e82e 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -174,7 +174,7 @@ let create_exporter ?(config = Config.make ()) ~sw ~env () = Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () in - Exporter_queued.create ~q:bq ~consumer () + Exporter_queued.create ~clock:Opentelemetry_ptime.clock ~q:bq ~consumer () |> Exporter_add_batching.add_batching ~config let create_backend = create_exporter diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index cab5fc3d..149c272a 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -114,7 +114,7 @@ let create_exporter ?(config = Config.make ()) () = Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () in - Exporter_queued.create ~q:bq ~consumer () + Exporter_queued.create ~clock:Opentelemetry_ptime.clock ~q:bq ~consumer () |> Exporter_add_batching.add_batching ~config let create_backend = create_exporter diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index fd8ff28e..f062d8dd 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -85,7 +85,7 @@ let create_exporter ?(config = Config.make ()) () = Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () in - Exporter_queued.create ~q:bq ~consumer () + Exporter_queued.create ~clock:Opentelemetry_ptime.clock ~q:bq ~consumer () |> Exporter_add_batching.add_batching ~config let create_backend = create_exporter diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 0d5c0d6d..9980e22a 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -92,7 +92,8 @@ let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t = ~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark () in - OTELC.Exporter_queued.create ~q:bq ~consumer () + OTELC.Exporter_queued.create ~clock:Opentelemetry_ptime.clock ~q:bq ~consumer + () |> OTELC.Exporter_add_batching.add_batching ~config:config.common let create_backend = create_exporter diff --git a/src/client/bounded_queue.ml b/src/client/bounded_queue.ml index 4c60d967..89a0ff30 100644 --- a/src/client/bounded_queue.ml +++ b/src/client/bounded_queue.ml @@ -111,7 +111,7 @@ module Send = struct let closed () = closed self in let enabled () = not (closed ()) in let emit x = if x <> [] then push self x in - let tick ~now:_ = () in + let tick ~mtime:_ = () in (* NOTE: we cannot actually flush, only close. Emptying the queue is fundamentally asynchronous because it's done by consumers *) diff --git a/src/client/consumer.ml b/src/client/consumer.ml index 48b5008f..1f416bba 100644 --- a/src/client/consumer.ml +++ b/src/client/consumer.ml @@ -10,7 +10,8 @@ type t = { tick: unit -> unit; (** Regularly called, eg to emit metrics, check timeouts, etc. Must be thread safe. *) - self_metrics: unit -> OTEL.Metrics.t list; (** Self observing metrics *) + self_metrics: clock:OTEL.Clock.t -> unit -> OTEL.Metrics.t list; + (** Self observing metrics *) } (** A consumer for signals of type ['a] *) @@ -20,7 +21,7 @@ let[@inline] active (self : t) : Aswitch.t = self.active () let[@inline] shutdown (self : t) : unit = self.shutdown () -let[@inline] self_metrics self : _ list = self.self_metrics () +let[@inline] self_metrics ~clock self : _ list = self.self_metrics ~clock () (** [on_stop e f] calls [f()] when [e] stops, or now if it's already stopped *) let on_stop self f = Aswitch.on_turn_off (self.active ()) f diff --git a/src/client/emitter_combine.ml b/src/client/emitter_combine.ml index 398b98be..c35fe4dc 100644 --- a/src/client/emitter_combine.ml +++ b/src/client/emitter_combine.ml @@ -26,7 +26,7 @@ let combine_l ?(closing : closing_behavior = `Close_when_all_closed) in let enabled () = not (closed ()) in let emit x = if x <> [] then List.iter (fun e -> emit e x) es in - let tick ~now = List.iter (tick ~now) es in + let tick ~mtime = List.iter (tick ~mtime) es in let flush_and_close () = List.iter flush_and_close es in { closed; enabled; emit; tick; flush_and_close } diff --git a/src/client/exporter_add_batching.ml b/src/client/exporter_add_batching.ml index 0f2f68ca..41ff7086 100644 --- a/src/client/exporter_add_batching.ml +++ b/src/client/exporter_add_batching.ml @@ -26,6 +26,8 @@ let add_batching ~(config : Http_config.t) (exp : OTEL.Exporter.t) : let active = exp.active in let tick = exp.tick in let on_tick = exp.on_tick in + let clock = exp.clock in + let self_metrics () = exp.self_metrics () in let shutdown () = let open Opentelemetry_emitter in @@ -38,6 +40,7 @@ let add_batching ~(config : Http_config.t) (exp : OTEL.Exporter.t) : { OTEL.Exporter.active; + clock; emit_spans; emit_metrics; emit_logs; diff --git a/src/client/exporter_combine.ml b/src/client/exporter_combine.ml index 0c2dbbc9..78bf84dc 100644 --- a/src/client/exporter_combine.ml +++ b/src/client/exporter_combine.ml @@ -22,13 +22,14 @@ let combine_l (es : OTEL.Exporter.t list) : OTEL.Exporter.t = let active, trigger = Aswitch.create () in { active = (fun () -> active); + clock = (List.hd es).clock; emit_spans = Emitter_combine.combine_l (List.map (fun e -> e.emit_spans) es); emit_logs = Emitter_combine.combine_l (List.map (fun e -> e.emit_logs) es); emit_metrics = Emitter_combine.combine_l (List.map (fun e -> e.emit_metrics) es); on_tick = (fun f -> List.iter (fun e -> e.on_tick f) es); - tick = (fun () -> List.iter tick es); + tick = (fun () -> List.iter (fun e -> e.tick ()) es); shutdown = (fun () -> shutdown_l es ~trigger); self_metrics = (fun () -> List.fold_left (fun acc e -> e.self_metrics () @ acc) [] es); diff --git a/src/client/exporter_debug.ml b/src/client/exporter_debug.ml index 946e1af4..dedc8659 100644 --- a/src/client/exporter_debug.ml +++ b/src/client/exporter_debug.ml @@ -3,12 +3,14 @@ open Opentelemetry_emitter (** [debug ?out ()] is an exporter that pretty-prints signals on [out]. @param out the formatter into which to print, default [stderr]. *) -let debug ?(out = Format.err_formatter) () : OTEL.Exporter.t = +let debug ?(clock = OTEL.Clock.Main.dynamic_main) ?(out = Format.err_formatter) + () : OTEL.Exporter.t = let open Proto in let active, trigger = Aswitch.create () in let ticker = Cb_set.create () in { active = (fun () -> active); + clock; emit_spans = Emitter.make_simple () ~emit:(fun sp -> List.iter (Format.fprintf out "SPAN: %a@." Trace.pp_span) sp); diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index 883bef48..c8ab2043 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -9,18 +9,18 @@ module BQ_emitters = struct The bounded queue is a shared resource. *) let logs_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : - OTEL.Logger.t = + _ OTEL.Emitter.t = Bounded_queue.Send.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_logs_or_empty let spans_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : - OTEL.Tracer.t = + _ OTEL.Emitter.t = Bounded_queue.Send.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_spans_or_empty let metrics_emitter_of_bq (q : OTEL.Any_signal_l.t Bounded_queue.Send.t) : - OTEL.Metrics_emitter.t = + _ OTEL.Emitter.t = Bounded_queue.Send.to_emitter q ~close_queue_on_close:false |> Opentelemetry_emitter.Emitter.flat_map OTEL.Any_signal_l.of_metrics_or_empty @@ -32,7 +32,7 @@ end bounded queue; while the consumer takes them from the queue to forward them somewhere else, store them, etc. @param resource_attributes attributes added to every "resource" batch *) -let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) +let create ~clock ~(q : OTEL.Any_signal_l.t Bounded_queue.t) ~(consumer : Consumer.any_signal_l_builder) () : OTEL.Exporter.t = let open Opentelemetry_emitter in let shutdown_started = Atomic.make false in @@ -48,7 +48,7 @@ let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) let on_tick f = Cb_set.register tick_set f in let self_metrics () : _ list = - let now = OTEL.Timestamp_ns.now_unix_ns () in + let now = OTEL.Clock.now clock in let m_size = OTEL.Metrics.gauge ~name:"otel.sdk.exporter.queue.size" [ OTEL.Metrics.int ~now (Bounded_queue.Recv.size q.recv) ] @@ -60,7 +60,7 @@ let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) ~name:"otel_ocaml.exporter_queue.discarded" [ OTEL.Metrics.int ~now (Bounded_queue.Recv.num_discarded q.recv) ] in - m_size :: m_cap :: m_discarded :: Consumer.self_metrics consumer + m_size :: m_cap :: m_discarded :: Consumer.self_metrics consumer ~clock in let shutdown () = @@ -86,6 +86,7 @@ let create ~(q : OTEL.Any_signal_l.t Bounded_queue.t) let active () = active in { active; + clock; emit_logs; emit_metrics; emit_spans; diff --git a/src/client/exporter_stdout.ml b/src/client/exporter_stdout.ml index 0613c07b..551a5f4f 100644 --- a/src/client/exporter_stdout.ml +++ b/src/client/exporter_stdout.ml @@ -34,7 +34,7 @@ open struct ) end -let stdout : OTEL.Exporter.t = +let stdout ?(clock = OTEL.Clock.Main.dynamic_main) () : OTEL.Exporter.t = let open Opentelemetry_util in let out = Format.std_formatter in let mutex = Mutex.create () in @@ -49,7 +49,7 @@ let stdout : OTEL.Exporter.t = pp_vlist mutex pp_signal out l in let enabled () = Aswitch.is_on active in - let tick ~now:_ = () in + let tick ~mtime:_ = () in let flush_and_close () = if Aswitch.is_on active then let@ () = Util_mutex.protect mutex in @@ -73,6 +73,7 @@ let stdout : OTEL.Exporter.t = { active = (fun () -> active); + clock; emit_spans; emit_logs; emit_metrics; diff --git a/src/client/generic_consumer.ml b/src/client/generic_consumer.ml index afa5ad99..f3951909 100644 --- a/src/client/generic_consumer.ml +++ b/src/client/generic_consumer.ml @@ -204,9 +204,9 @@ end = struct self - let self_metrics (self : state) : OTEL.Metrics.t list = + let self_metrics ~clock (self : state) : OTEL.Metrics.t list = let open OTEL.Metrics in - let now = OTEL.Timestamp_ns.now_unix_ns () in + let now = OTEL.Clock.now clock in let attrs = [ "otel.component.name", `String "otel_ocaml" ] in [ sum ~name:"otel.sdk.exporter.errors" ~is_monotonic:true @@ -220,7 +220,7 @@ end = struct let to_consumer (self : state) : Consumer.t = let shutdown () = shutdown self in let tick () = tick self in - let self_metrics () = self_metrics self in + let self_metrics ~clock () = self_metrics self ~clock in { active = (fun () -> self.active); tick; shutdown; self_metrics } let consumer ~sender_config ~n_workers ~ticker_task () : diff --git a/src/client/generic_consumer_exporter.ml b/src/client/generic_consumer_exporter.ml index 178864dd..cbbae6eb 100644 --- a/src/client/generic_consumer_exporter.ml +++ b/src/client/generic_consumer_exporter.ml @@ -113,22 +113,19 @@ end = struct start_worker self; self - let self_metrics (self : state) : OTEL.Metrics.t list = + let self_metrics (self : state) ~clock : OTEL.Metrics.t list = let open OTEL.Metrics in - let now = Mtime_clock.now () in + let now = OTEL.Clock.now clock in [ sum ~name:"otel_ocaml.export.batches_discarded_by_bounded_queue" ~is_monotonic:true - [ - int ~now:(Mtime.to_uint64_ns now) - (Bounded_queue.Recv.num_discarded self.q); - ]; + [ int ~now (Bounded_queue.Recv.num_discarded self.q) ]; ] let to_consumer (self : state) : Consumer.t = let shutdown () = shutdown self in let tick () = tick self in - let self_metrics () = self_metrics self in + let self_metrics ~clock () = self_metrics self ~clock in { active = (fun () -> self.active); tick; shutdown; self_metrics } let consumer exporter : _ Consumer.Builder.t = diff --git a/src/client/lwt/util_ticker.ml b/src/client/lwt/util_ticker.ml index 64c26da8..95aed6ad 100644 --- a/src/client/lwt/util_ticker.ml +++ b/src/client/lwt/util_ticker.ml @@ -12,7 +12,8 @@ let start_ticker_thread ?(finally = ignore) ~(stop : bool Atomic.t) Lwt.return () ) else let* () = Lwt_unix.sleep frequency_s in - OTEL.Exporter.tick exp; + let mtime = Mtime_clock.now () in + OTEL.Exporter.tick exp ~mtime; tick_loop () in Lwt.async tick_loop diff --git a/src/client/sampler.ml b/src/client/sampler.ml index 9ae63e0d..671eaf81 100644 --- a/src/client/sampler.ml +++ b/src/client/sampler.ml @@ -37,7 +37,7 @@ let wrap_emitter (self : t) (e : _ Emitter.t) : _ Emitter.t = let enabled () = e.enabled () in let closed () = Emitter.closed e in let flush_and_close () = Emitter.flush_and_close e in - let tick ~now = Emitter.tick e ~now in + let tick ~mtime = Emitter.tick e ~mtime in let emit l = if l <> [] && e.enabled () then ( diff --git a/src/client/util_thread.ml b/src/client/util_thread.ml index dfbe61ac..9fad94e5 100644 --- a/src/client/util_thread.ml +++ b/src/client/util_thread.ml @@ -34,7 +34,10 @@ let setup_ticker_thread ~(active : Aswitch.t) ~sleep_ms (exp : OTEL.Exporter.t) while Aswitch.is_on active do Thread.delay sleep_s; - if Aswitch.is_on active then OTEL.Exporter.tick exp + if Aswitch.is_on active then ( + let mtime = Mtime_clock.now () in + OTEL.Exporter.tick exp ~mtime + ) done with | Sync_queue.Closed -> () diff --git a/src/core/event.ml b/src/core/event.ml index 3d632a4d..34915e2e 100644 --- a/src/core/event.ml +++ b/src/core/event.ml @@ -3,7 +3,7 @@ open Proto.Trace type t = span_event -let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = []) - (name : string) : t = +let make ?(time_unix_nano = Clock.now_main ()) ?(attrs = []) (name : string) : t + = let attrs = List.map Key_value.conv attrs in make_span_event ~time_unix_nano ~name ~attributes:attrs () diff --git a/src/core/exporter.ml b/src/core/exporter.ml index d4630173..bc7514a5 100644 --- a/src/core/exporter.ml +++ b/src/core/exporter.ml @@ -12,6 +12,7 @@ open Opentelemetry_emitter type t = { active: unit -> Aswitch.t; (** Is the exporer currently active? After shutdown this is turned off. *) + clock: Clock.t; emit_spans: Proto.Trace.span Emitter.t; emit_metrics: Proto.Metrics.metric Emitter.t; emit_logs: Proto.Logs.log_record Emitter.t; @@ -34,6 +35,7 @@ let dummy () : t = let active, trigger = Aswitch.create () in { active = (fun () -> active); + clock = Clock.unix; emit_spans = Emitter.dummy; emit_metrics = Emitter.dummy; emit_logs = Emitter.dummy; @@ -56,12 +58,11 @@ let[@inline] on_tick (self : t) f = self.on_tick f (** Do background work. Call this regularly if the collector doesn't already have a ticker thread or internal timer. *) -let tick (self : t) = +let tick ~mtime (self : t) = (* make sure emitters get the chance to check timeouts, flush, etc. *) - let now = Mtime_clock.now () in - Emitter.tick ~now self.emit_spans; - Emitter.tick ~now self.emit_metrics; - Emitter.tick ~now self.emit_logs; + Emitter.tick ~mtime self.emit_spans; + Emitter.tick ~mtime self.emit_metrics; + Emitter.tick ~mtime self.emit_logs; (* call the callbacks *) self.tick (); diff --git a/src/core/log_record.ml b/src/core/log_record.ml index 8d9e7a0f..57e6bfed 100644 --- a/src/core/log_record.ml +++ b/src/core/log_record.ml @@ -47,10 +47,9 @@ let pp_flags = Proto.Logs.pp_log_record_flags let pp = Proto.Logs.pp_log_record -(** Make a single log entry *) -let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ()) - ?severity ?log_level ?flags ?trace_id ?span_id ?(attrs = []) - (body : Value.t) : t = +(** Make a single log entry. *) +let make ?time ?severity ?log_level ?flags ?trace_id ?span_id ?(attrs = []) + ~(observed_time_unix_nano : Timestamp_ns.t) (body : Value.t) : t = let time_unix_nano = match time with | None -> observed_time_unix_nano @@ -65,16 +64,16 @@ let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ()) ~attributes ?body () (** Make a log entry whose body is a string *) -let make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags - ?trace_id ?span_id ?attrs (body : string) : t = - make ?time ?observed_time_unix_nano ?severity ?log_level ?flags ?trace_id +let make_str ?time ?severity ?log_level ?flags ?trace_id ?span_id ?attrs + ~observed_time_unix_nano (body : string) : t = + make ?time ~observed_time_unix_nano ?severity ?log_level ?flags ?trace_id ?span_id ?attrs (`String body) (** Make a log entry with format *) -let make_strf ?time ?observed_time_unix_nano ?severity ?log_level ?flags - ?trace_id ?span_id ?attrs fmt = +let make_strf ?time ?severity ?log_level ?flags ?trace_id ?span_id ?attrs + ~observed_time_unix_nano fmt = Format.kasprintf (fun bod -> - make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags + make_str ?time ~observed_time_unix_nano ?severity ?log_level ?flags ?trace_id ?span_id ?attrs bod) fmt diff --git a/src/core/metrics.ml b/src/core/metrics.ml index 988b7bd6..7298aefb 100644 --- a/src/core/metrics.ml +++ b/src/core/metrics.ml @@ -15,20 +15,16 @@ type t = Metrics.metric let pp = Proto.Metrics.pp_metric -open struct - let _program_start = Timestamp_ns.now_unix_ns () -end - (** Number data point, as a float *) -let float ?start_time_unix_nano ?(now = Timestamp_ns.now_unix_ns ()) - ?(attrs = []) (d : float) : number_data_point = +let float ?start_time_unix_nano ?(now = Clock.now_main ()) ?(attrs = []) + (d : float) : number_data_point = let attributes = attrs |> List.map Key_value.conv in make_number_data_point ?start_time_unix_nano ~time_unix_nano:now ~attributes ~value:(As_double d) () (** Number data point, as an int *) -let int ?start_time_unix_nano ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) - (i : int) : number_data_point = +let int ?start_time_unix_nano ?(now = Clock.now_main ()) ?(attrs = []) (i : int) + : number_data_point = let attributes = attrs |> List.map Key_value.conv in make_number_data_point ?start_time_unix_nano ~time_unix_nano:now ~attributes ~value:(As_int (Int64.of_int i)) @@ -60,10 +56,9 @@ let sum ~name ?description ?unit_ count value of histogram for each bucket. Sum of the counts must be equal to [count]. length must be [1+length explicit_bounds] @param explicit_bounds strictly increasing list of bounds for the buckets *) -let histogram_data_point ?start_time_unix_nano - ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = []) - ?(explicit_bounds = []) ?sum ~bucket_counts ~count () : histogram_data_point - = +let histogram_data_point ?start_time_unix_nano ?(now = Clock.now_main ()) + ?(attrs = []) ?(exemplars = []) ?(explicit_bounds = []) ?sum ~bucket_counts + ~count () : histogram_data_point = let attributes = attrs |> List.map Key_value.conv in make_histogram_data_point ?start_time_unix_nano ~time_unix_nano:now ~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum () diff --git a/src/emitter/emitter.ml b/src/emitter/emitter.ml index f0a23248..a3fc0f67 100644 --- a/src/emitter/emitter.ml +++ b/src/emitter/emitter.ml @@ -13,9 +13,9 @@ type -'a t = { signals it's given. *) emit: 'a list -> unit; (** Emit signals. @raise Closed if the emitter is closed. *) - tick: now:Mtime.t -> unit; + tick: mtime:Mtime.t -> unit; (** Call regularly to ensure background work is done. The current - timestamp is passed to improve testability. *) + monotonic timestamp is passed to improve testability. *) closed: unit -> bool; (** True if the emitter is already closed. Beware TOCTOU bugs. *) flush_and_close: unit -> unit; @@ -27,7 +27,7 @@ let[@inline] enabled self : bool = self.enabled () let[@inline] emit (self : _ t) l : unit = if l <> [] then self.emit l -let[@inline] tick (self : _ t) ~now : unit = self.tick ~now +let[@inline] tick (self : _ t) ~mtime : unit = self.tick ~mtime let[@inline] closed self : bool = self.closed () @@ -61,7 +61,7 @@ let make_simple ?tick ?closed ?enabled ?(flush_and_close = ignore) ~emit () : _ t = let tick = match tick with - | None -> fun ~now:_ -> () + | None -> fun ~mtime:_ -> () | Some f -> f in let closed, enabled = @@ -78,7 +78,7 @@ let dummy : _ t = { enabled = (fun () -> false); emit = ignore; - tick = (fun ~now:_ -> ()); + tick = (fun ~mtime:_ -> ()); closed = (fun () -> true); flush_and_close = ignore; } diff --git a/src/emitter/to_list.ml b/src/emitter/to_list.ml index 04e228ca..e1601132 100644 --- a/src/emitter/to_list.ml +++ b/src/emitter/to_list.ml @@ -8,7 +8,7 @@ let to_list (l : 'a list ref) : 'a Emitter.t = (fun sigs -> if Atomic.get closed then raise Emitter.Closed; l := List.rev_append sigs !l); - tick = (fun ~now:_ -> ()); + tick = (fun ~mtime:_ -> ()); closed = (fun () -> Atomic.get closed); flush_and_close = (fun () -> Atomic.set closed true); } diff --git a/src/lib/gc_metrics.ml b/src/lib/gc_metrics.ml index 42dacac6..2c909de5 100644 --- a/src/lib/gc_metrics.ml +++ b/src/lib/gc_metrics.ml @@ -10,7 +10,7 @@ end let get_metrics () : Metrics.t list = let gc = Gc.quick_stat () in - let now = Timestamp_ns.now_unix_ns () in + let now = Clock.now_main () in let open Metrics in let open Conventions.Metrics in [ diff --git a/src/lib/main_exporter.ml b/src/lib/main_exporter.ml index e35035f3..24c71974 100644 --- a/src/lib/main_exporter.ml +++ b/src/lib/main_exporter.ml @@ -21,7 +21,8 @@ let remove ~on_done () : unit = | None -> on_done () | Some exp -> Aswitch.on_turn_off (Exporter.active exp) on_done; - tick exp; + let mtime = Mtime_clock.now () in + tick exp ~mtime; shutdown exp (** Is there a configured exporter? *) @@ -42,10 +43,10 @@ module Util = struct let enabled () = present () in let closed () = not (enabled ()) in let flush_and_close () = () in - let tick ~now:_ = + let tick ~mtime = match get () with | None -> () - | Some exp -> Exporter.tick exp + | Some exp -> Exporter.tick exp ~mtime in let emit signals = if signals <> [] then ( @@ -100,6 +101,7 @@ let dynamic_forward_to_main_exporter : Exporter.t = let shutdown () = () in { Exporter.active; + clock = Clock.Main.dynamic_main; emit_metrics; emit_spans; emit_logs; diff --git a/src/lib/opentelemetry.ml b/src/lib/opentelemetry.ml index b1af8234..615a3a6b 100644 --- a/src/lib/opentelemetry.ml +++ b/src/lib/opentelemetry.ml @@ -14,8 +14,9 @@ module Proto = Opentelemetry_proto This is mostly useful internally. Users should not need to touch it. *) -(** {2 Timestamps} *) +(** {2 Time} *) +module Clock = Clock module Timestamp_ns = Timestamp_ns (** {2 Export signals to some external collector.} *)