refactor(subscriber): timestamps are int64ns now

This commit is contained in:
Simon Cruanes 2025-05-02 08:56:25 -04:00
parent 384dca93e2
commit ef50b578f1
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
9 changed files with 53 additions and 44 deletions

View file

@ -29,16 +29,16 @@ module type S = sig
type st type st
(** Type of the state passed to every callback. *) (** Type of the state passed to every callback. *)
val on_init : st -> time_ns:float -> unit val on_init : st -> time_ns:int64 -> unit
(** Called when the subscriber is initialized in a collector *) (** Called when the subscriber is initialized in a collector *)
val on_shutdown : st -> time_ns:float -> unit val on_shutdown : st -> time_ns:int64 -> unit
(** Called when the collector is shutdown *) (** Called when the collector is shutdown *)
val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit val on_name_thread : st -> time_ns:int64 -> tid:int -> name:string -> unit
(** Current thread is being named *) (** Current thread is being named *)
val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit val on_name_process : st -> time_ns:int64 -> tid:int -> name:string -> unit
(** Current process is being named *) (** Current process is being named *)
val on_enter_span : val on_enter_span :
@ -46,7 +46,7 @@ module type S = sig
__FUNCTION__:string option -> __FUNCTION__:string option ->
__FILE__:string -> __FILE__:string ->
__LINE__:int -> __LINE__:int ->
time_ns:float -> time_ns:int64 ->
tid:int -> tid:int ->
data:(string * user_data) list -> data:(string * user_data) list ->
name:string -> name:string ->
@ -54,7 +54,7 @@ module type S = sig
unit unit
(** Enter a regular (sync) span *) (** Enter a regular (sync) span *)
val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit val on_exit_span : st -> time_ns:int64 -> tid:int -> span -> unit
(** Exit a span. This and [on_enter_span] must follow strict stack discipline (** Exit a span. This and [on_enter_span] must follow strict stack discipline
*) *)
@ -63,7 +63,7 @@ module type S = sig
val on_message : val on_message :
st -> st ->
time_ns:float -> time_ns:int64 ->
tid:int -> tid:int ->
span:span option -> span:span option ->
data:(string * user_data) list -> data:(string * user_data) list ->
@ -73,7 +73,7 @@ module type S = sig
val on_counter : val on_counter :
st -> st ->
time_ns:float -> time_ns:int64 ->
tid:int -> tid:int ->
data:(string * user_data) list -> data:(string * user_data) list ->
name:string -> name:string ->
@ -86,7 +86,7 @@ module type S = sig
__FUNCTION__:string option -> __FUNCTION__:string option ->
__FILE__:string -> __FILE__:string ->
__LINE__:int -> __LINE__:int ->
time_ns:float -> time_ns:int64 ->
tid:int -> tid:int ->
parent:span option -> parent:span option ->
data:(string * user_data) list -> data:(string * user_data) list ->
@ -99,7 +99,7 @@ module type S = sig
val on_exit_manual_span : val on_exit_manual_span :
st -> st ->
time_ns:float -> time_ns:int64 ->
tid:int -> tid:int ->
name:string -> name:string ->
data:(string * user_data) list -> data:(string * user_data) list ->
@ -110,7 +110,7 @@ module type S = sig
(** Exit a manual span *) (** Exit a manual span *)
val on_extension_event : val on_extension_event :
st -> time_ns:float -> tid:int -> extension_event -> unit st -> time_ns:int64 -> tid:int -> extension_event -> unit
(** Extension event (** Extension event
@since 0.8 *) @since 0.8 *)
end end

View file

@ -1 +1 @@
let[@inline] get_time_ns () : float = 0. let[@inline] get_time_ns () : int64 = 0L

View file

@ -1 +1 @@
val get_time_ns : unit -> float val get_time_ns : unit -> int64

View file

@ -1,3 +1,3 @@
let[@inline] get_time_ns () : float = let[@inline] get_time_ns () : int64 =
let t = Mtime_clock.now () in let t = Mtime_clock.now () in
Int64.to_float (Mtime.to_uint64_ns t) Mtime.to_uint64_ns t

View file

@ -1,3 +1,3 @@
let[@inline] get_time_ns () : float = let[@inline] get_time_ns () : int64 =
let t = Unix.gettimeofday () in let t = Unix.gettimeofday () in
t *. 1e9 Int64.of_float (t *. 1e9)

View file

@ -6,19 +6,22 @@ include Types
type t = Subscriber.t type t = Subscriber.t
module Private_ = struct module Private_ = struct
let get_now_ns_ = ref None let mock = ref false
let get_tid_ = ref None let get_now_ns_ = ref Time_.get_time_ns
let get_tid_ = ref Thread_.get_tid
(** Now, in nanoseconds *) (** Now, in nanoseconds *)
let[@inline] now_ns () : float = let[@inline] now_ns () : int64 =
match !get_now_ns_ with if !mock then
| Some f -> f () !get_now_ns_ ()
| None -> Time_.get_time_ns () else
Time_.get_time_ns ()
let[@inline] tid_ () : int = let[@inline] tid_ () : int =
match !get_tid_ with if !mock then
| Some f -> f () !get_tid_ ()
| None -> Thread_.get_tid () else
Thread_.get_tid ()
end end
open struct open struct

View file

@ -31,13 +31,17 @@ val collector : t -> Trace_core.collector
(**/**) (**/**)
module Private_ : sig module Private_ : sig
val get_now_ns_ : (unit -> float) option ref val mock : bool ref
(** Global mock flag. If enable, all timestamps, tid, etc should be faked. *)
val get_now_ns_ : (unit -> int64) ref
(** The callback used to get the current timestamp *) (** The callback used to get the current timestamp *)
val get_tid_ : (unit -> int) option ref val get_tid_ : (unit -> int) ref
(** The callback used to get the current thread's id *) (** The callback used to get the current thread's id *)
val now_ns : unit -> float val now_ns : unit -> int64
(** Get the current timestamp, or a mock version *)
end end
(**/**) (**/**)

View file

@ -22,7 +22,7 @@ let get_unix_socket () =
type as_client = { type as_client = {
trace_id: string; trace_id: string;
socket: string; socket: string; (** Unix socket address *)
emit_tef_at_exit: string option; emit_tef_at_exit: string option;
(** For parent, ask daemon to emit traces here *) (** For parent, ask daemon to emit traces here *)
} }

View file

@ -6,6 +6,9 @@ module A = Trace_core.Internal_.Atomic_
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
let[@inline] time_us_of_time_ns (t : int64) : float =
Int64.div t 1_000L |> Int64.to_float
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
if id == Trace_core.Collector.dummy_trace_id then if id == Trace_core.Collector.dummy_trace_id then
0L 0L
@ -13,14 +16,13 @@ let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 Bytes.get_int64_le (Bytes.unsafe_of_string id) 0
module Mock_ = struct module Mock_ = struct
let enabled = ref false
let now = ref 0 let now = ref 0
(* used to mock timing *) (* used to mock timing *)
let get_now_ns () : float = let get_now_ns () : int64 =
let x = !now in let x = !now in
incr now; incr now;
float_of_int x *. 1000. Int64.(mul (of_int x) 1000L)
let get_tid_ () : int = 3 let get_tid_ () : int = 3
end end
@ -63,7 +65,7 @@ module Writer = struct
| `Output oc -> oc, false | `Output oc -> oc, false
in in
let pid = let pid =
if !Mock_.enabled then if !Sub.Private_.mock then
2 2
else else
Unix.getpid () Unix.getpid ()
@ -300,7 +302,7 @@ let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit =
(* write a message about us closing *) (* write a message about us closing *)
Writer.emit_instant_event ~name:"tef-worker.exit" Writer.emit_instant_event ~name:"tef-worker.exit"
~tid:(Thread.id @@ Thread.self ()) ~tid:(Thread.id @@ Thread.self ())
~ts:(Sub.Private_.now_ns () *. 1e-3) ~ts:(time_us_of_time_ns @@ Sub.Private_.now_ns ())
~args:[] writer; ~args:[] writer;
(* warn if app didn't close all spans *) (* warn if app didn't close all spans *)
@ -354,12 +356,12 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t =
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
let time_us = time_ns *. 1e-3 in let time_us = time_us_of_time_ns @@ time_ns in
B_queue.push self.events B_queue.push self.events
@@ E_define_span { tid; name; time_us; id = span; fun_name; data } @@ E_define_span { tid; name; time_us; id = span; fun_name; data }
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
let time_us = time_ns *. 1e-3 in let time_us = time_us_of_time_ns @@ time_ns in
B_queue.push self.events @@ E_exit_span { id = span; time_us } B_queue.push self.events @@ E_exit_span { id = span; time_us }
let on_add_data (self : st) ~data span = let on_add_data (self : st) ~data span =
@ -367,24 +369,24 @@ let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t =
B_queue.push self.events @@ E_add_data { id = span; data } B_queue.push self.events @@ E_add_data { id = span; data }
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
let time_us = time_ns *. 1e-3 in let time_us = time_us_of_time_ns @@ time_ns in
B_queue.push self.events @@ E_message { tid; time_us; msg; data } B_queue.push self.events @@ E_message { tid; time_us; msg; data }
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit = let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit =
let time_us = time_ns *. 1e-3 in let time_us = time_us_of_time_ns @@ time_ns in
B_queue.push self.events @@ E_counter { name; n = f; time_us; tid } B_queue.push self.events @@ E_counter { name; n = f; time_us; tid }
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span
: unit = : unit =
let time_us = time_ns *. 1e-3 in let time_us = time_us_of_time_ns @@ time_ns in
B_queue.push self.events B_queue.push self.events
@@ E_enter_manual_span @@ E_enter_manual_span
{ id = trace_id; time_us; tid; data; name; fun_name; flavor } { id = trace_id; time_us; tid; data; name; fun_name; flavor }
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
~trace_id (_ : span) : unit = ~trace_id (_ : span) : unit =
let time_us = time_ns *. 1e-3 in let time_us = time_us_of_time_ns @@ time_ns in
B_queue.push self.events B_queue.push self.events
@@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor } @@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor }
@ -438,9 +440,9 @@ let with_setup ?out () f =
module Private_ = struct module Private_ = struct
let mock_all_ () = let mock_all_ () =
Mock_.enabled := true; Sub.Private_.mock := true;
Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns; Sub.Private_.get_now_ns_ := Mock_.get_now_ns;
Sub.Private_.get_tid_ := Some Mock_.get_tid_; Sub.Private_.get_tid_ := Mock_.get_tid_;
() ()
let on_tracing_error = on_tracing_error let on_tracing_error = on_tracing_error