From 03a93e7ad1b78d184c0139f754881b3809e47907 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 11 Dec 2025 15:05:07 -0500 Subject: [PATCH] wip --- src/perfetto/subscriber.ml | 175 ++++++++++++++++++++++++++++++++++++ src/perfetto/subscriber.mli | 28 ++++++ src/perfetto/writer.ml | 97 ++++++++++++++++++++ src/perfetto/writer.mli | 54 +++++++++++ 4 files changed, 354 insertions(+) create mode 100644 src/perfetto/subscriber.ml create mode 100644 src/perfetto/subscriber.mli create mode 100644 src/perfetto/writer.ml create mode 100644 src/perfetto/writer.mli diff --git a/src/perfetto/subscriber.ml b/src/perfetto/subscriber.ml new file mode 100644 index 0000000..bc7b8f6 --- /dev/null +++ b/src/perfetto/subscriber.ml @@ -0,0 +1,175 @@ +open Common_ +open Trace_core +open Trace_private_util +module Span_tbl = Sub.Span_tbl + +module Buf_pool = struct + type t = Buffer.t Rpool.t + + let create ?(max_size = 32) ?(buf_size = 256) () : t = + Rpool.create ~max_size ~clear:Buffer.reset + ~create:(fun () -> Buffer.create buf_size) + () +end + +open struct + let[@inline] time_us_of_time_ns (t : int64) : float = + Int64.div t 1_000L |> Int64.to_float + + let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = + if id == Trace_core.Collector.dummy_trace_id then + 0L + else + Bytes.get_int64_le (Bytes.unsafe_of_string id) 0 +end + +let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s) + +type span_info = { + tid: int; + name: string; + start_us: float; + mutable data: (string * Sub.user_data) list; + (* NOTE: thread safety: this is supposed to only be modified by the thread +that's running this (synchronous, stack-abiding) span. *) +} +(** Information we store about a span begin event, to emit a complete event when + we meet the corresponding span end event *) + +type t = { + active: bool A.t; + pid: int; + spans: span_info Span_tbl.t; + buf_pool: Buf_pool.t; + exporter: Exporter.t; +} +(** Subscriber state *) + +open struct + let print_non_closed_spans_warning spans = + let module Str_set = Set.Make (String) in + let spans = Span_tbl.to_list spans in + if spans <> [] then ( + !on_tracing_error + @@ Printf.sprintf "trace-tef: warning: %d spans were not closed" + (List.length spans); + let names = + List.fold_left + (fun set (_, span) -> Str_set.add span.name set) + Str_set.empty spans + in + Str_set.iter + (fun name -> + !on_tracing_error @@ Printf.sprintf " span %S was not closed" name) + names; + flush stderr + ) +end + +let close (self : t) : unit = + if A.exchange self.active false then ( + print_non_closed_spans_warning self.spans; + self.exporter.close () + ) + +let[@inline] active self = A.get self.active +let[@inline] flush (self : t) : unit = self.exporter.flush () + +let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = + { active = A.make true; exporter; buf_pool; pid; spans = Span_tbl.create () } + +module Callbacks = struct + type st = t + + let on_init _ ~time_ns:_ = () + let on_shutdown (self : st) ~time_ns:_ = close self + + let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_name_process ~pid:self.pid ~name buf; + self.exporter.on_json buf + + let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_name_thread buf ~pid:self.pid ~tid ~name; + self.exporter.on_json buf + + (* add function name, if provided, to the metadata *) + let add_fun_name_ fun_name data : _ list = + match fun_name with + | None -> data + | Some f -> ("function", Sub.U_string f) :: data + + let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = + let time_us = time_us_of_time_ns @@ time_ns in + let data = add_fun_name_ fun_name data in + let info = { tid; name; start_us = time_us; data } in + (* save the span so we find it at exit *) + Span_tbl.add self.spans span info + + let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = + let time_us = time_us_of_time_ns @@ time_ns in + + match Span_tbl.find_exn self.spans span with + | exception Not_found -> + !on_tracing_error + (Printf.sprintf "trace-tef: error: cannot find span %Ld" span) + | { tid; name; start_us; data } -> + Span_tbl.remove self.spans span; + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_duration_event buf ~pid:self.pid ~tid ~name ~start:start_us + ~end_:time_us ~args:data; + + self.exporter.on_json buf + + let on_add_data (self : st) ~data span = + if data <> [] then ( + try + let info = Span_tbl.find_exn self.spans span in + info.data <- List.rev_append data info.data + with Not_found -> + !on_tracing_error + (Printf.sprintf "trace-tef: error: cannot find span %Ld" span) + ) + + let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = + let time_us = time_us_of_time_ns @@ time_ns in + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us + ~args:data; + self.exporter.on_json buf + + let on_counter (self : st) ~time_ns ~tid ~data:_ ~name n : unit = + let time_us = time_us_of_time_ns @@ time_ns in + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n; + self.exporter.on_json buf + + let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span : + unit = + let time_us = time_us_of_time_ns @@ time_ns in + + let data = add_fun_name_ fun_name data in + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_manual_begin buf ~pid:self.pid ~tid ~name + ~id:(int64_of_trace_id_ trace_id) + ~ts:time_us ~args:data ~flavor; + self.exporter.on_json buf + + let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor + ~trace_id (_ : span) : unit = + let time_us = time_us_of_time_ns @@ time_ns in + + let@ buf = Rpool.with_ self.buf_pool in + Writer.emit_manual_end buf ~pid:self.pid ~tid ~name + ~id:(int64_of_trace_id_ trace_id) + ~ts:time_us ~flavor ~args:data; + self.exporter.on_json buf + + let on_extension_event _ ~time_ns:_ ~tid:_ _ev = () +end + +let subscriber (self : t) : Sub.t = + Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) } diff --git a/src/perfetto/subscriber.mli b/src/perfetto/subscriber.mli new file mode 100644 index 0000000..9f2f235 --- /dev/null +++ b/src/perfetto/subscriber.mli @@ -0,0 +1,28 @@ +open Common_ + +module Buf_pool : sig + type t + + val create : ?max_size:int -> ?buf_size:int -> unit -> t +end + +type t +(** Main subscriber state. *) + +val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t +(** Create a subscriber state. *) + +val flush : t -> unit +val close : t -> unit +val active : t -> bool + +module Callbacks : Sub.Callbacks.S with type st = t + +val subscriber : t -> Sub.t +(** Subscriber that writes json into this writer *) + +(**/**) + +val on_tracing_error : (string -> unit) ref + +(**/**) diff --git a/src/perfetto/writer.ml b/src/perfetto/writer.ml new file mode 100644 index 0000000..9865988 --- /dev/null +++ b/src/perfetto/writer.ml @@ -0,0 +1,97 @@ +open Common_ + +let char = Buffer.add_char +let raw_string = Buffer.add_string + +let str_val (buf : Buffer.t) (s : string) = + char buf '"'; + let encode_char c = + match c with + | '"' -> raw_string buf {|\"|} + | '\\' -> raw_string buf {|\\|} + | '\n' -> raw_string buf {|\n|} + | '\b' -> raw_string buf {|\b|} + | '\r' -> raw_string buf {|\r|} + | '\t' -> raw_string buf {|\t|} + | _ when Char.code c <= 0x1f -> + raw_string buf {|\u00|}; + Printf.bprintf buf "%02x" (Char.code c) + | c -> char buf c + in + String.iter encode_char s; + char buf '"' + +let pp_user_data_ (out : Buffer.t) : Sub.user_data -> unit = function + | U_none -> raw_string out "null" + | U_int i -> Printf.bprintf out "%d" i + | U_bool b -> Printf.bprintf out "%b" b + | U_string s -> str_val out s + | U_float f -> Printf.bprintf out "%g" f + +(* emit args, if not empty. [ppv] is used to print values. *) +let emit_args_o_ ppv (out : Buffer.t) args : unit = + if args <> [] then ( + Printf.bprintf out {json|,"args": {|json}; + List.iteri + (fun i (n, value) -> + if i > 0 then raw_string out ","; + Printf.bprintf out {json|"%s":%a|json} n ppv value) + args; + char out '}' + ) + +let emit_duration_event ~pid ~tid ~name ~start ~end_ ~args buf : unit = + let dur = end_ -. start in + let ts = start in + + Printf.bprintf buf + {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} + pid tid dur ts str_val name + (emit_args_o_ pp_user_data_) + args + +let emit_manual_begin ~pid ~tid ~name ~(id : int64) ~ts ~args + ~(flavor : Sub.flavor option) buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} + pid id tid ts str_val name + (match flavor with + | None | Some Async -> 'b' + | Some Sync -> 'B') + (emit_args_o_ pp_user_data_) + args + +let emit_manual_end ~pid ~tid ~name ~(id : int64) ~ts + ~(flavor : Sub.flavor option) ~args buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"cat":"trace","id":%Ld,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} + pid id tid ts str_val name + (match flavor with + | None | Some Async -> 'e' + | Some Sync -> 'E') + (emit_args_o_ pp_user_data_) + args + +let emit_instant_event ~pid ~tid ~name ~ts ~args buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} + pid tid ts str_val name + (emit_args_o_ pp_user_data_) + args + +let emit_name_thread ~pid ~tid ~name buf : unit = + Printf.bprintf buf + {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} pid tid + (emit_args_o_ pp_user_data_) + [ "name", U_string name ] + +let emit_name_process ~pid ~name buf : unit = + Printf.bprintf buf {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} pid + (emit_args_o_ pp_user_data_) + [ "name", U_string name ] + +let emit_counter ~pid ~tid ~name ~ts buf f : unit = + Printf.bprintf buf + {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} pid tid ts + (emit_args_o_ pp_user_data_) + [ name, U_float f ] diff --git a/src/perfetto/writer.mli b/src/perfetto/writer.mli new file mode 100644 index 0000000..d1563a7 --- /dev/null +++ b/src/perfetto/writer.mli @@ -0,0 +1,54 @@ +(** Write JSON events to a buffer. + + This is the part of the code that knows how to emit TEF-compliant JSON from + raw event data. *) + +open Common_ +open Trace_core + +val emit_duration_event : + pid:int -> + tid:int -> + name:string -> + start:float -> + end_:float -> + args:(string * Sub.user_data) list -> + Buffer.t -> + unit + +val emit_manual_begin : + pid:int -> + tid:int -> + name:string -> + id:span -> + ts:float -> + args:(string * Sub.user_data) list -> + flavor:Sub.flavor option -> + Buffer.t -> + unit + +val emit_manual_end : + pid:int -> + tid:int -> + name:string -> + id:span -> + ts:float -> + flavor:Sub.flavor option -> + args:(string * Sub.user_data) list -> + Buffer.t -> + unit + +val emit_instant_event : + pid:int -> + tid:int -> + name:string -> + ts:float -> + args:(string * Sub.user_data) list -> + Buffer.t -> + unit + +val emit_name_thread : pid:int -> tid:int -> name:string -> Buffer.t -> unit +val emit_name_process : pid:int -> name:string -> Buffer.t -> unit + +val emit_counter : + pid:int -> tid:int -> name:string -> ts:float -> Buffer.t -> float -> unit