rewrite trace-fuchsia to work with new collector

This commit is contained in:
Simon Cruanes 2026-01-15 20:43:05 -05:00
parent e2a942fedc
commit 22d91d4f40
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
7 changed files with 129 additions and 85 deletions

View file

@ -1,5 +1,5 @@
open Common_
open Trace_private_util
open Trace_util
type t = Buf.t Rpool.t

View file

@ -1,4 +1,5 @@
open Common_
open Types
open Trace_core
let on_tracing_error = on_tracing_error
@ -8,8 +9,7 @@ type t = {
pid: int;
buf_chain: Buf_chain.t;
exporter: Exporter.t;
span_id_gen: Sub.Span_id_generator.t;
trace_id_gen: Sub.Trace_id_generator.t;
trace_id_gen: Types.Trace_id.Gen.t;
}
(** Subscriber state *)
@ -61,15 +61,13 @@ let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
buf_chain;
exporter;
pid;
span_id_gen = Sub.Span_id_generator.create ();
trace_id_gen = Sub.Trace_id_generator.create ();
trace_id_gen = Types.Trace_id.Gen.create ();
}
open struct
let new_span_id (self : t) = Sub.Span_id_generator.gen self.span_id_gen
let new_trace_id self = Sub.Trace_id_generator.gen self.trace_id_gen
let new_trace_id self = Types.Trace_id.Gen.gen self.trace_id_gen
let on_init (self : t) ~time_ns:_ =
let init (self : t) =
Writer.Metadata.Magic_record.encode self.buf_chain;
Writer.Metadata.Initialization_record.(
encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
@ -80,19 +78,7 @@ open struct
write_ready_ self
let on_shutdown (self : t) ~time_ns:_ = close self
let on_name_process_ (self : t) name : unit =
Writer.Kernel_object.(
encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
write_ready_ self
let on_name_thread_ (self : t) ~tid name : unit =
Writer.Kernel_object.(
encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
~args:[ "process", A_kid (Int64.of_int self.pid) ]
());
write_ready_ self
let shutdown (self : t) = close self
(* add function name, if provided, to the metadata *)
let add_fun_name_ fun_name data : _ list =
@ -100,49 +86,84 @@ open struct
| None -> data
| Some f -> ("function", `String f) :: data
let on_enter_span (self : t) (span : Sub.Span_sub.t) : unit =
let { Sub.Span_sub.data; name; tid; time_ns; flavor; _ } = span in
let rec flavor_of_params = function
| [] -> `Sync
| Core_ext.Extension_span_flavor f :: _ -> f
| _ :: tl -> flavor_of_params tl
let enter_span (self : t) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~params ~data
~parent name : span =
let flavor = flavor_of_params params in
let time_ns = Trace_util.Mock_.now_ns () in
let tid = Trace_util.Mock_.get_tid () in
match flavor with
| `Sync -> ()
| `Sync ->
Span_fuchsia_sync
{
__FUNCTION__;
name;
pid = self.pid;
tid;
args = data;
start_ns = time_ns;
}
| `Async ->
let data = add_fun_name_ span.__FUNCTION__ data in
let data = add_fun_name_ __FUNCTION__ data in
let trace_id =
match parent with
| P_some (Span_fuchsia_async sp) -> sp.trace_id
| _ -> new_trace_id self
in
Writer.(
Event.Async_begin.encode self.buf_chain ~name
~args:(args_of_user_data data)
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~time_ns ~async_id:span.trace_id ());
write_ready_ self
~time_ns ~async_id:trace_id ());
write_ready_ self;
let on_exit_span (self : t) ~time_ns:end_time_ns ~tid:_
(span : Sub.Span_sub.t) : unit =
let { Sub.Span_sub.tid; name; flavor; time_ns = start_ns; data; _ } =
span
in
let data = add_fun_name_ span.__FUNCTION__ data in
Span_fuchsia_async { pid = self.pid; tid; trace_id; name; args = data }
match flavor with
| `Sync ->
let exit_span (self : t) sp =
let end_time_ns = Trace_util.Mock_.now_ns () in
match sp with
| Span_fuchsia_sync { __FUNCTION__; name; tid; pid; args = data; start_ns }
->
let data = add_fun_name_ __FUNCTION__ data in
Writer.(
Event.Duration_complete.encode self.buf_chain ~name
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~t_ref:(Thread_ref.inline ~pid ~tid)
~time_ns:start_ns ~end_time_ns ~args:(args_of_user_data data) ());
write_ready_ self
| `Async ->
| Span_fuchsia_async { name; tid; pid; trace_id; args = data } ->
Writer.(
Event.Async_end.encode self.buf_chain ~name
~args:(args_of_user_data data)
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~time_ns:end_time_ns ~async_id:span.trace_id ());
~t_ref:(Thread_ref.inline ~pid ~tid)
~time_ns:end_time_ns ~async_id:trace_id ());
write_ready_ self
| _ -> ()
let on_message (self : t) ~time_ns ~tid ~span:_ ~params:_ ~data msg : unit =
let add_data_to_span _st sp data =
match sp with
| Span_fuchsia_sync sp -> sp.args <- List.rev_append data sp.args
| Span_fuchsia_async sp -> sp.args <- List.rev_append data sp.args
| _ -> ()
let message (self : t) ~params:_ ~data ~span:_ msg : unit =
let time_ns = Trace_util.Mock_.now_ns () in
let tid = Trace_util.Mock_.get_tid () in
Writer.(
Event.Instant.encode self.buf_chain
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~name:msg ~time_ns ~args:(args_of_user_data data) ());
write_ready_ self
let on_counter (self : t) ~time_ns ~tid ~params:_ ~data ~name n : unit =
let counter_float (self : t) ~params:_ ~data name n : unit =
let tid = Trace_util.Mock_.get_tid () in
let time_ns = Trace_util.Mock_.now_ns () in
Writer.(
Event.Counter.encode self.buf_chain
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
@ -151,16 +172,40 @@ open struct
());
write_ready_ self
let on_extension_event (self : t) ~time_ns:_ ~tid ev =
let counter_int self ~params:_ ~data name n =
let tid = Trace_util.Mock_.get_tid () in
let time_ns = Trace_util.Mock_.now_ns () in
Writer.(
Event.Counter.encode self.buf_chain
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~name ~time_ns
~args:((name, A_int n) :: args_of_user_data data)
());
write_ready_ self
let name_process_ (self : t) name : unit =
Writer.Kernel_object.(
encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
write_ready_ self
let name_thread_ (self : t) ~tid name : unit =
Writer.Kernel_object.(
encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
~args:[ "process", A_kid (Int64.of_int self.pid) ]
());
write_ready_ self
let extension (self : t) ev =
match ev with
| Core_ext.Extension_set_thread_name name -> on_name_thread_ self ~tid name
| Core_ext.Extension_set_process_name name -> on_name_process_ self name
| Core_ext.Extension_set_thread_name name ->
let tid = Trace_util.Mock_.get_tid () in
name_thread_ self ~tid name
| Core_ext.Extension_set_process_name name -> name_process_ self name
| _ -> ()
end
let sub_callbacks : _ Sub.Callbacks.t =
Sub.Callbacks.make ~new_span_id ~new_trace_id ~on_init ~on_shutdown
~on_enter_span ~on_exit_span ~on_message ~on_counter ~on_extension_event ()
let callbacks : t Collector.Callbacks.t =
Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span
~add_data_to_span ~message ~counter_int ~counter_float ~extension ()
let subscriber (self : t) : Sub.t =
Sub.Subscriber.Sub { st = self; callbacks = sub_callbacks }
let collector (self : t) : Collector.t = Collector.C_some (self, callbacks)

View file

@ -1,3 +1,5 @@
open Trace_core
type t
(** Main subscriber state. *)
@ -7,9 +9,9 @@ val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t
val flush : t -> unit
val close : t -> unit
val active : t -> bool
val sub_callbacks : t Trace_subscriber.Callbacks.t
val callbacks : t Collector.Callbacks.t
val subscriber : t -> Trace_subscriber.t
val collector : t -> Collector.t
(** Subscriber that writes json into this writer *)
(**/**)

View file

@ -5,8 +5,7 @@
"A high-performance backend for trace, emitting a Fuchsia trace into a file")
(libraries
trace.core
trace.private.util
trace.subscriber
trace.util
thread-local-storage
bigarray
mtime

View file

@ -3,7 +3,7 @@ module Buf = Buf
module Buf_chain = Buf_chain
module Buf_pool = Buf_pool
module Exporter = Exporter
module Subscriber = Subscriber
module Collector_fuchsia = Collector_fuchsia
module Writer = Writer
type output =
@ -18,25 +18,18 @@ let get_out_ (out : [< output ]) : Exporter.t =
Exporter.of_out_channel ~close_channel:true oc
| `Exporter e -> e
let subscriber ~out () : Sub.t =
let collector ~out () : Trace_core.Collector.t =
let exporter = get_out_ out in
let pid =
if !Trace_subscriber.Private_.mock then
2
else
Unix.getpid ()
in
let sub = Subscriber.create ~pid ~exporter () in
Subscriber.subscriber sub
let collector ~out () = Sub.collector @@ subscriber ~out ()
let pid = Trace_util.Mock_.get_pid () in
let coll_st = Collector_fuchsia.create ~pid ~exporter () in
Collector_fuchsia.collector coll_st
let setup ?(out = `Env) () =
match out with
| `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) ()
| `Exporter _ as out ->
let sub = subscriber ~out () in
Trace_core.setup_collector @@ Sub.collector sub
let c = collector ~out () in
Trace_core.setup_collector c
| `Env ->
(match Sys.getenv_opt "TRACE" with
| Some ("1" | "true") ->
@ -52,23 +45,9 @@ let with_setup ?out () f =
setup ?out ();
Fun.protect ~finally:Trace_core.shutdown f
module Mock_ = struct
let now = ref 0
(* used to mock timing *)
let get_now_ns () : int64 =
let x = !now in
incr now;
Int64.(mul (of_int x) 1000L)
let get_tid_ () : int = 3
end
module Internal_ = struct
let mock_all_ () =
Sub.Private_.mock := true;
Sub.Private_.get_now_ns_ := Mock_.get_now_ns;
Sub.Private_.get_tid_ := Mock_.get_tid_;
Trace_util.Mock_.mock_all ();
()
let on_tracing_error = on_tracing_error

View file

@ -10,7 +10,7 @@ module Buf = Buf
module Buf_chain = Buf_chain
module Buf_pool = Buf_pool
module Exporter = Exporter
module Subscriber = Subscriber
module Collector_fuchsia = Collector_fuchsia
module Writer = Writer
type output =
@ -18,8 +18,6 @@ type output =
| `Exporter of Exporter.t
]
val subscriber : out:[< output ] -> unit -> Trace_subscriber.t
val collector : out:[< output ] -> unit -> Trace_core.collector
(** Make a collector that writes into the given output. See {!setup} for more
details. *)

21
src/fuchsia/types.ml Normal file
View file

@ -0,0 +1,21 @@
open Trace_core
module Trace_id = Trace_util.Trace_id64
type trace_id = Trace_id.t
type Trace_core.span +=
| Span_fuchsia_sync of {
__FUNCTION__: string option;
pid: int;
tid: int;
name: string;
start_ns: int64;
mutable args: (string * Trace_core.user_data) list;
}
| Span_fuchsia_async of {
pid: int;
tid: int;
name: string;
trace_id: trace_id;
mutable args: (string * Trace_core.user_data) list;
}