From ef50b578f1fd5d659cd6940cfb4c282d74fda2de Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 2 May 2025 08:56:25 -0400 Subject: [PATCH] refactor(subscriber): timestamps are int64ns now --- src/subscriber/callbacks.ml | 22 ++++++++++----------- src/subscriber/time_.dummy.ml | 2 +- src/subscriber/time_.mli | 2 +- src/subscriber/time_.mtime.ml | 4 ++-- src/subscriber/time_.unix.ml | 4 ++-- src/subscriber/trace_subscriber.ml | 21 +++++++++++--------- src/subscriber/trace_subscriber.mli | 10 +++++++--- src/tef-tldrs/trace_tef_tldrs.ml | 2 +- src/tef/trace_tef.ml | 30 +++++++++++++++-------------- 9 files changed, 53 insertions(+), 44 deletions(-) diff --git a/src/subscriber/callbacks.ml b/src/subscriber/callbacks.ml index 5d2c759..95926ea 100644 --- a/src/subscriber/callbacks.ml +++ b/src/subscriber/callbacks.ml @@ -29,16 +29,16 @@ module type S = sig type st (** Type of the state passed to every callback. *) - val on_init : st -> time_ns:float -> unit + val on_init : st -> time_ns:int64 -> unit (** Called when the subscriber is initialized in a collector *) - val on_shutdown : st -> time_ns:float -> unit + val on_shutdown : st -> time_ns:int64 -> unit (** Called when the collector is shutdown *) - val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit + val on_name_thread : st -> time_ns:int64 -> tid:int -> name:string -> unit (** Current thread is being named *) - val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit + val on_name_process : st -> time_ns:int64 -> tid:int -> name:string -> unit (** Current process is being named *) val on_enter_span : @@ -46,7 +46,7 @@ module type S = sig __FUNCTION__:string option -> __FILE__:string -> __LINE__:int -> - time_ns:float -> + time_ns:int64 -> tid:int -> data:(string * user_data) list -> name:string -> @@ -54,7 +54,7 @@ module type S = sig unit (** Enter a regular (sync) span *) - val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit + val on_exit_span : st -> time_ns:int64 -> tid:int -> span -> unit (** Exit a span. This and [on_enter_span] must follow strict stack discipline *) @@ -63,7 +63,7 @@ module type S = sig val on_message : st -> - time_ns:float -> + time_ns:int64 -> tid:int -> span:span option -> data:(string * user_data) list -> @@ -73,7 +73,7 @@ module type S = sig val on_counter : st -> - time_ns:float -> + time_ns:int64 -> tid:int -> data:(string * user_data) list -> name:string -> @@ -86,7 +86,7 @@ module type S = sig __FUNCTION__:string option -> __FILE__:string -> __LINE__:int -> - time_ns:float -> + time_ns:int64 -> tid:int -> parent:span option -> data:(string * user_data) list -> @@ -99,7 +99,7 @@ module type S = sig val on_exit_manual_span : st -> - time_ns:float -> + time_ns:int64 -> tid:int -> name:string -> data:(string * user_data) list -> @@ -110,7 +110,7 @@ module type S = sig (** Exit a manual span *) val on_extension_event : - st -> time_ns:float -> tid:int -> extension_event -> unit + st -> time_ns:int64 -> tid:int -> extension_event -> unit (** Extension event @since 0.8 *) end diff --git a/src/subscriber/time_.dummy.ml b/src/subscriber/time_.dummy.ml index c727752..29ce8e8 100644 --- a/src/subscriber/time_.dummy.ml +++ b/src/subscriber/time_.dummy.ml @@ -1 +1 @@ -let[@inline] get_time_ns () : float = 0. +let[@inline] get_time_ns () : int64 = 0L diff --git a/src/subscriber/time_.mli b/src/subscriber/time_.mli index ee1a9f4..5b29ca0 100644 --- a/src/subscriber/time_.mli +++ b/src/subscriber/time_.mli @@ -1 +1 @@ -val get_time_ns : unit -> float +val get_time_ns : unit -> int64 diff --git a/src/subscriber/time_.mtime.ml b/src/subscriber/time_.mtime.ml index 6b512c8..baa3f86 100644 --- a/src/subscriber/time_.mtime.ml +++ b/src/subscriber/time_.mtime.ml @@ -1,3 +1,3 @@ -let[@inline] get_time_ns () : float = +let[@inline] get_time_ns () : int64 = let t = Mtime_clock.now () in - Int64.to_float (Mtime.to_uint64_ns t) + Mtime.to_uint64_ns t diff --git a/src/subscriber/time_.unix.ml b/src/subscriber/time_.unix.ml index b2683ff..f7411c1 100644 --- a/src/subscriber/time_.unix.ml +++ b/src/subscriber/time_.unix.ml @@ -1,3 +1,3 @@ -let[@inline] get_time_ns () : float = +let[@inline] get_time_ns () : int64 = let t = Unix.gettimeofday () in - t *. 1e9 + Int64.of_float (t *. 1e9) diff --git a/src/subscriber/trace_subscriber.ml b/src/subscriber/trace_subscriber.ml index b287d69..d419587 100644 --- a/src/subscriber/trace_subscriber.ml +++ b/src/subscriber/trace_subscriber.ml @@ -6,19 +6,22 @@ include Types type t = Subscriber.t module Private_ = struct - let get_now_ns_ = ref None - let get_tid_ = ref None + let mock = ref false + let get_now_ns_ = ref Time_.get_time_ns + let get_tid_ = ref Thread_.get_tid (** Now, in nanoseconds *) - let[@inline] now_ns () : float = - match !get_now_ns_ with - | Some f -> f () - | None -> Time_.get_time_ns () + let[@inline] now_ns () : int64 = + if !mock then + !get_now_ns_ () + else + Time_.get_time_ns () let[@inline] tid_ () : int = - match !get_tid_ with - | Some f -> f () - | None -> Thread_.get_tid () + if !mock then + !get_tid_ () + else + Thread_.get_tid () end open struct diff --git a/src/subscriber/trace_subscriber.mli b/src/subscriber/trace_subscriber.mli index 387b152..a3f2325 100644 --- a/src/subscriber/trace_subscriber.mli +++ b/src/subscriber/trace_subscriber.mli @@ -31,13 +31,17 @@ val collector : t -> Trace_core.collector (**/**) module Private_ : sig - val get_now_ns_ : (unit -> float) option ref + val mock : bool ref + (** Global mock flag. If enable, all timestamps, tid, etc should be faked. *) + + val get_now_ns_ : (unit -> int64) ref (** The callback used to get the current timestamp *) - val get_tid_ : (unit -> int) option ref + val get_tid_ : (unit -> int) ref (** The callback used to get the current thread's id *) - val now_ns : unit -> float + val now_ns : unit -> int64 + (** Get the current timestamp, or a mock version *) end (**/**) diff --git a/src/tef-tldrs/trace_tef_tldrs.ml b/src/tef-tldrs/trace_tef_tldrs.ml index 7d3eb2f..47584a4 100644 --- a/src/tef-tldrs/trace_tef_tldrs.ml +++ b/src/tef-tldrs/trace_tef_tldrs.ml @@ -22,7 +22,7 @@ let get_unix_socket () = type as_client = { trace_id: string; - socket: string; + socket: string; (** Unix socket address *) emit_tef_at_exit: string option; (** For parent, ask daemon to emit traces here *) } diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index a24f5a1..d9dd4f8 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -6,6 +6,9 @@ module A = Trace_core.Internal_.Atomic_ let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) +let[@inline] time_us_of_time_ns (t : int64) : float = + Int64.div t 1_000L |> Int64.to_float + let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = if id == Trace_core.Collector.dummy_trace_id then 0L @@ -13,14 +16,13 @@ let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 module Mock_ = struct - let enabled = ref false let now = ref 0 (* used to mock timing *) - let get_now_ns () : float = + let get_now_ns () : int64 = let x = !now in incr now; - float_of_int x *. 1000. + Int64.(mul (of_int x) 1000L) let get_tid_ () : int = 3 end @@ -63,7 +65,7 @@ module Writer = struct | `Output oc -> oc, false in let pid = - if !Mock_.enabled then + if !Sub.Private_.mock then 2 else Unix.getpid () @@ -300,7 +302,7 @@ let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit = (* write a message about us closing *) Writer.emit_instant_event ~name:"tef-worker.exit" ~tid:(Thread.id @@ Thread.self ()) - ~ts:(Sub.Private_.now_ns () *. 1e-3) + ~ts:(time_us_of_time_ns @@ Sub.Private_.now_ns ()) ~args:[] writer; (* warn if app didn't close all spans *) @@ -354,12 +356,12 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t = let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = - let time_us = time_ns *. 1e-3 in + let time_us = time_us_of_time_ns @@ time_ns in B_queue.push self.events @@ E_define_span { tid; name; time_us; id = span; fun_name; data } let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = - let time_us = time_ns *. 1e-3 in + let time_us = time_us_of_time_ns @@ time_ns in B_queue.push self.events @@ E_exit_span { id = span; time_us } let on_add_data (self : st) ~data span = @@ -367,24 +369,24 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t = B_queue.push self.events @@ E_add_data { id = span; data } let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = - let time_us = time_ns *. 1e-3 in + let time_us = time_us_of_time_ns @@ time_ns in B_queue.push self.events @@ E_message { tid; time_us; msg; data } let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit = - let time_us = time_ns *. 1e-3 in + let time_us = time_us_of_time_ns @@ time_ns in B_queue.push self.events @@ E_counter { name; n = f; time_us; tid } let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span : unit = - let time_us = time_ns *. 1e-3 in + let time_us = time_us_of_time_ns @@ time_ns in B_queue.push self.events @@ E_enter_manual_span { id = trace_id; time_us; tid; data; name; fun_name; flavor } let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor ~trace_id (_ : span) : unit = - let time_us = time_ns *. 1e-3 in + let time_us = time_us_of_time_ns @@ time_ns in B_queue.push self.events @@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor } @@ -438,9 +440,9 @@ let with_setup ?out () f = module Private_ = struct let mock_all_ () = - Mock_.enabled := true; - Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns; - Sub.Private_.get_tid_ := Some Mock_.get_tid_; + Sub.Private_.mock := true; + Sub.Private_.get_now_ns_ := Mock_.get_now_ns; + Sub.Private_.get_tid_ := Mock_.get_tid_; () let on_tracing_error = on_tracing_error