good progress on fuchsia collector

This commit is contained in:
Simon Cruanes 2023-12-25 22:52:50 -05:00
parent 9567c1b4a7
commit 68d3969cde
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
15 changed files with 666 additions and 534 deletions

View file

@ -1,74 +1,67 @@
open Common_
(** Background thread, takes events from the queue, puts them
in context using local state, and writes fully resolved
TEF events to [out]. *)
let bg_thread ~out (events : event B_queue.t) : unit =
(* open a writer to [out] *)
Writer.with_ ~out @@ fun writer ->
(* local state, to keep track of span information and implicit stack context *)
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
type out =
[ `Stdout
| `Stderr
| `File of string
]
(* add function name, if provided, to the metadata *)
let add_fun_name_ fun_name data : _ list =
match fun_name with
| None -> data
| Some f -> ("function", `String f) :: data
type event =
| E_write_buf of Buf.t
| E_tick
type state = {
buf_pool: Buf_pool.t;
oc: out_channel;
events: event B_queue.t;
}
let with_out_ (out : out) f =
let oc, must_close =
match out with
| `Stdout -> stdout, false
| `Stderr -> stderr, false
| `File path -> open_out path, true
in
(* how to deal with an event *)
let handle_ev (ev : event) : unit =
match ev with
| E_tick -> Writer.flush writer
| E_message { tid; msg; time_us; data } ->
Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer
| E_define_span { tid; name; id; time_us; fun_name; data } ->
let data = add_fun_name_ fun_name data in
let info = { tid; name; start_us = time_us; data } in
(* save the span so we find it at exit *)
Span_tbl.add spans id info
| E_exit_span { id; time_us = stop_us } ->
(match Span_tbl.find_opt spans id with
| None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id)
| Some { tid; name; start_us; data } ->
Span_tbl.remove spans id;
Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us
~args:data writer)
| E_add_data { id; data } ->
(match Span_tbl.find_opt spans id with
| None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id)
| Some info -> info.data <- List.rev_append data info.data)
| E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } ->
let data = add_fun_name_ fun_name data in
Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor
writer
| E_exit_manual_span { tid; time_us; name; id; flavor; data } ->
Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data
writer
| E_counter { tid; name; time_us; n } ->
Writer.emit_counter ~name ~tid ~ts:time_us writer n
| E_name_process { name } -> Writer.emit_name_process ~name writer
| E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer
in
if must_close then (
let finally () = close_out_noerr oc in
Fun.protect ~finally (fun () -> f oc)
) else
f oc
try
while true do
(* get all the events in the incoming blocking queue, in
one single critical section. *)
let local = B_queue.pop_all events in
List.iter handle_ev local
done
with B_queue.Closed ->
(* write a message about us closing *)
Writer.emit_instant_event ~name:"tef-worker.exit"
~tid:(Thread.id @@ Thread.self ())
~ts:(now_us ()) ~args:[] writer;
let handle_ev (self : state) (ev : event) : unit =
match ev with
| E_tick -> flush self.oc
| E_write_buf buf ->
output self.oc buf.buf 0 buf.offset;
Buf_pool.recycle self.buf_pool buf
(* warn if app didn't close all spans *)
if Span_tbl.length spans > 0 then
Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!"
(Span_tbl.length spans);
()
let bg_loop (self : state) : unit =
let continue = ref true in
while !continue do
match B_queue.pop_all self.events with
| exception B_queue.Closed -> continue := false
| evs -> List.iter (handle_ev self) evs
done
let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
let@ oc = with_out_ out in
let st = { oc; buf_pool; events } in
bg_loop st
(* TODO:
(* write a message about us closing *)
Writer.emit_instant_event ~name:"tef-worker.exit"
~tid:(Thread.id @@ Thread.self ())
~ts:(now_us ()) ~args:[] writer;
(* warn if app didn't close all spans *)
if Span_tbl.length spans > 0 then
Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!"
(Span_tbl.length spans);
*)
(** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file *)

View file

@ -1,4 +1,9 @@
module A = Trace_core.Internal_.Atomic_
module FWrite = Trace_fuchsia_write
module B_queue = Trace_private_util.B_queue
module Buf = FWrite.Buf
module Buf_pool = FWrite.Buf_pool
module Output = FWrite.Output
module Span_tbl = Hashtbl.Make (struct
include Int64
@ -8,3 +13,6 @@ end)
let on_tracing_error =
ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s)
let ( let@ ) = ( @@ )
let spf = Printf.sprintf

View file

@ -4,5 +4,7 @@
(name trace_fuchsia)
(public_name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(flags :standard -w -27) ; TODO: remove
(libraries trace.core trace.private.util thread-local-storage
(re_export trace-fuchsia.write)
mtime mtime.clock.os atomic unix threads))

265
src/fuchsia/fcollector.ml Normal file
View file

@ -0,0 +1,265 @@
open Trace_core
open Common_
module TLS = Thread_local_storage
let pid = Unix.getpid ()
type state = {
active: bool A.t;
events: Bg_thread.event B_queue.t;
span_id_gen: int A.t; (** Used for async spans *)
bg_thread: Thread.t;
buf_pool: Buf_pool.t;
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
}
type span_info = {
start_time_ns: int64;
name: string;
mutable data: (string * user_data) list;
}
(* TODO:
(** key used to carry a unique "id" for all spans in an async context *)
let key_async_id : int Meta_map.Key.t = Meta_map.Key.create ()
let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t =
Meta_map.Key.create ()
let key_data : (string * user_data) list ref Meta_map.Key.t =
Meta_map.Key.create ()
*)
open struct
let state_id_ = A.make 0
(* re-raise exception with its backtrace *)
external reraise : exn -> 'a = "%reraise"
end
type per_thread_state = {
tid: int;
state_id: int; (** ID of the current collector state *)
local_span_id_gen: int A.t; (** Used for thread-local spans *)
mutable thread_ref: FWrite.Thread_ref.t;
mutable out: Output.t option;
spans: span_info Span_tbl.t; (** In-flight spans *)
}
let key_thread_local_st : per_thread_state TLS.key =
TLS.new_key (fun () ->
let tid = Thread.id @@ Thread.self () in
{
tid;
state_id = A.get state_id_;
thread_ref = FWrite.Thread_ref.inline ~pid ~tid;
local_span_id_gen = A.make 0;
out = None;
spans = Span_tbl.create 32;
})
let out_of_st (st : state) : Output.t =
FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf ->
if A.get st.active then (
try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ()
))
module C (St : sig
val st : state
end)
() =
struct
open St
let state_id = 1 + A.fetch_and_add state_id_ 1
(** prepare the thread's state *)
let[@inline never] update_local_state (self : per_thread_state) : unit =
(* get an output *)
let out = out_of_st st in
self.out <- Some out;
(* try to allocate a thread ref for current thread *)
let th_ref = A.fetch_and_add st.next_thread_ref 1 in
if th_ref <= 0xff then (
self.thread_ref <- FWrite.Thread_ref.ref th_ref;
FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid ()
);
()
(** Obtain the output for the current thread *)
let[@inline] get_thread_output () : Output.t * per_thread_state =
let st = TLS.get key_thread_local_st in
if st.state_id != state_id || st.out == None then update_local_state st;
Option.get st.out, st
let shutdown () =
if A.exchange st.active false then (
B_queue.close st.events;
(* wait for writer thread to be done. The writer thread will exit
after processing remaining events because the queue is now closed *)
Thread.join st.bg_thread
)
let enter_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : span =
let tls = TLS.get key_thread_local_st in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let time_ns = Time.now_ns () in
Span_tbl.add tls.spans span { name; data; start_time_ns = time_ns };
span
let exit_span span : unit =
let out, tls = get_thread_output () in
let end_time_ns = Time.now_ns () in
match Span_tbl.find_opt tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span)
| Some info ->
Span_tbl.remove tls.spans span;
FWrite.Event.Duration_complete.encode out ~name:info.name
~t_ref:tls.thread_ref ~time_ns:info.start_time_ns ~end_time_ns
~args:info.data ()
let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let info = { start_time_ns = time_ns; data; name } in
Span_tbl.add tls.spans span info;
let[@inline] exit () : unit =
let end_time_ns = Time.now_ns () in
Span_tbl.remove tls.spans span;
FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns
~t_ref:tls.thread_ref ~args:info.data ()
in
try
let x = f span in
exit ();
x
with exn ->
exit ();
reraise exn
let add_data_to_span span data =
let tls = TLS.get key_thread_local_st in
match Span_tbl.find_opt tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span)
| Some info -> info.data <- List.rev_append data info.data
let enter_manual_span ~(parent : explicit_span option) ~flavor
~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span
=
assert false
(* TODO:
(* get the id, or make a new one *)
let id =
match parent with
| Some m -> Meta_map.find_exn key_async_id m.meta
| None -> A.fetch_and_add span_id_gen_ 1
in
let time_us = now_us () in
B_queue.push events
(E_enter_manual_span
{ id; time_us; tid = get_tid_ (); data; name; fun_name; flavor });
{
span = 0L;
meta =
Meta_map.(
empty |> add key_async_id id |> add key_async_data (name, flavor));
}
*)
let exit_manual_span (es : explicit_span) : unit = assert false
(* TODO:
let id = Meta_map.find_exn key_async_id es.meta in
let name, flavor = Meta_map.find_exn key_async_data es.meta in
let data =
try !(Meta_map.find_exn key_data es.meta) with Not_found -> []
in
let time_us = now_us () in
let tid = get_tid_ () in
B_queue.push events
(E_exit_manual_span { tid; id; name; time_us; data; flavor })
*)
let add_data_to_manual_span (es : explicit_span) data = assert false
(* TODO:
if data <> [] then (
let data_ref, add =
try Meta_map.find_exn key_data es.meta, false
with Not_found -> ref [], true
in
let new_data = List.rev_append data !data_ref in
data_ref := new_data;
if add then es.meta <- Meta_map.add key_data data_ref es.meta
)
*)
let message ?span:_ ~data msg : unit =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Instant.encode out ~name:msg ~time_ns ~t_ref:tls.thread_ref
~args:data ()
let counter_float ~data name f =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref
~args:((name, `Float f) :: data)
()
let counter_int ~data name i =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref
~args:((name, `Int i) :: data)
()
let name_process name : unit = ()
(* TODO: B_queue.push events (E_name_process { name }) *)
let name_thread name : unit = ()
(* TODO:
let tid = get_tid_ () in
B_queue.push events (E_name_thread { tid; name })
*)
end
let create ~out () : collector =
let buf_pool = Buf_pool.create () in
let events = B_queue.create () in
let bg_thread =
Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) ()
in
let _tick_thread = Thread.create Bg_thread.tick_thread events in
let st =
{
active = A.make true;
buf_pool;
bg_thread;
events;
span_id_gen = A.make 0;
next_thread_ref = A.make 1;
}
in
(* write header *)
let out = out_of_st st in
FWrite.Metadata.Magic_record.encode out;
FWrite.Metadata.Initialization_record.(
encode out ~ticks_per_secs:default_ticks_per_sec ());
FWrite.Metadata.Provider_info.encode out ~id:0 ~name:"ocaml-trace" ();
Output.flush out;
Output.dispose out;
let module Coll =
C
(struct
let st = st
end)
()
in
(module Coll)

View file

@ -0,0 +1,3 @@
open Trace_core
val create : out:Bg_thread.out -> unit -> collector

View file

@ -0,0 +1,4 @@
(** A bit of global state that can be reached
from each thread without too much overhead *)
open Common_

View file

@ -1,20 +1,6 @@
module Mock_ = struct
let enabled = ref false
let now = ref 0
let[@inline never] now_us () : int64 =
let x = !now in
incr now;
Int64.of_int x
end
let counter = Mtime_clock.counter ()
(** Now, in nanoseconds *)
let[@inline] now_ns () : int64 =
if !Mock_.enabled then
Mock_.now_us ()
else (
let t = Mtime_clock.count counter in
Mtime.Span.to_uint64_ns t
)
let t = Mtime_clock.count counter in
Mtime.Span.to_uint64_ns t

View file

@ -1,416 +1,31 @@
open Trace_core
open Trace_private_util
open Common_
(*
type span_info = {
tid: int;
name: string;
start_ns: float;
mutable data: (string * user_data) list;
}
(** key used to carry a unique "id" for all spans in an async context *)
let key_async_id : int Meta_map.Key.t = Meta_map.Key.create ()
let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t =
Meta_map.Key.create ()
let key_data : (string * user_data) list ref Meta_map.Key.t =
Meta_map.Key.create ()
*)
(* TODO:
(** Writer: knows how to write entries to a file in TEF format *)
module Writer = struct
type t = {
oc: out_channel;
mutable first: bool; (** first event? *)
buf: Buffer.t; (** Buffer to write into *)
must_close: bool; (** Do we have to close the underlying channel [oc]? *)
pid: int;
}
(** A writer to a [out_channel]. It writes JSON entries in an array
and closes the array at the end. *)
let create ~out () : t =
let oc, must_close =
match out with
| `Stdout -> stdout, false
| `Stderr -> stderr, false
| `File path -> open_out path, true
in
let pid =
if !Mock_.enabled then
2
else
Unix.getpid ()
in
output_char oc '[';
{ oc; first = true; pid; must_close; buf = Buffer.create 2_048 }
let close (self : t) : unit =
output_char self.oc ']';
flush self.oc;
if self.must_close then close_out self.oc
let with_ ~out f =
let writer = create ~out () in
Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer)
let[@inline] flush (self : t) : unit = flush self.oc
(** Emit "," if we need, and get the buffer ready *)
let emit_sep_and_start_ (self : t) =
Buffer.reset self.buf;
if self.first then
self.first <- false
else
Buffer.add_string self.buf ",\n"
let char = Buffer.add_char
let raw_string = Buffer.add_string
let str_val (buf : Buffer.t) (s : string) =
char buf '"';
let encode_char c =
match c with
| '"' -> raw_string buf {|\"|}
| '\\' -> raw_string buf {|\\|}
| '\n' -> raw_string buf {|\n|}
| '\b' -> raw_string buf {|\b|}
| '\r' -> raw_string buf {|\r|}
| '\t' -> raw_string buf {|\t|}
| _ when Char.code c <= 0x1f ->
raw_string buf {|\u00|};
Printf.bprintf buf "%02x" (Char.code c)
| c -> char buf c
in
String.iter encode_char s;
char buf '"'
let pp_user_data_ (out : Buffer.t) : [< user_data ] -> unit = function
| `None -> raw_string out "null"
| `Int i -> Printf.bprintf out "%d" i
| `Bool b -> Printf.bprintf out "%b" b
| `String s -> str_val out s
| `Float f -> Printf.bprintf out "%g" f
(* emit args, if not empty. [ppv] is used to print values. *)
let emit_args_o_ ppv (out : Buffer.t) args : unit =
if args <> [] then (
Printf.bprintf out {json|,"args": {|json};
List.iteri
(fun i (n, value) ->
if i > 0 then raw_string out ",";
Printf.bprintf out {json|"%s":%a|json} n ppv value)
args;
char out '}'
)
let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit =
let dur = end_ -. start in
let ts = start in
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json}
self.pid tid dur ts str_val name
(emit_args_o_ pp_user_data_)
args;
Buffer.output_buffer self.oc self.buf
let emit_manual_begin ~tid ~name ~id ~ts ~args ~flavor (self : t) : unit =
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
self.pid id tid ts str_val name
(match flavor with
| None | Some `Async -> 'b'
| Some `Sync -> 'B')
(emit_args_o_ pp_user_data_)
args;
Buffer.output_buffer self.oc self.buf
let emit_manual_end ~tid ~name ~id ~ts ~flavor ~args (self : t) : unit =
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json}
self.pid id tid ts str_val name
(match flavor with
| None | Some `Async -> 'e'
| Some `Sync -> 'E')
(emit_args_o_ pp_user_data_)
args;
Buffer.output_buffer self.oc self.buf
let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit =
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json}
self.pid tid ts str_val name
(emit_args_o_ pp_user_data_)
args;
Buffer.output_buffer self.oc self.buf
let emit_name_thread ~tid ~name (self : t) : unit =
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid
tid
(emit_args_o_ pp_user_data_)
[ "name", `String name ];
Buffer.output_buffer self.oc self.buf
let emit_name_process ~name (self : t) : unit =
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid
(emit_args_o_ pp_user_data_)
[ "name", `String name ];
Buffer.output_buffer self.oc self.buf
let emit_counter ~name ~tid ~ts (self : t) f : unit =
emit_sep_and_start_ self;
Printf.bprintf self.buf
{json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid
tid ts
(emit_args_o_ pp_user_data_)
[ name, `Float f ];
Buffer.output_buffer self.oc self.buf
end
*)
(* TODO:
(** Background thread, takes events from the queue, puts them
in context using local state, and writes fully resolved
TEF events to [out]. *)
let bg_thread ~out (events : event B_queue.t) : unit =
(* open a writer to [out] *)
Writer.with_ ~out @@ fun writer ->
(* local state, to keep track of span information and implicit stack context *)
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
(* add function name, if provided, to the metadata *)
let add_fun_name_ fun_name data : _ list =
match fun_name with
| None -> data
| Some f -> ("function", `String f) :: data
in
(* how to deal with an event *)
let handle_ev (ev : event) : unit =
match ev with
| E_tick -> Writer.flush writer
| E_message { tid; msg; time_us; data } ->
Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer
| E_define_span { tid; name; id; time_us; fun_name; data } ->
let data = add_fun_name_ fun_name data in
let info = { tid; name; start_us = time_us; data } in
(* save the span so we find it at exit *)
Span_tbl.add spans id info
| E_exit_span { id; time_us = stop_us } ->
(match Span_tbl.find_opt spans id with
| None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id)
| Some { tid; name; start_us; data } ->
Span_tbl.remove spans id;
Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us
~args:data writer)
| E_add_data { id; data } ->
(match Span_tbl.find_opt spans id with
| None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id)
| Some info -> info.data <- List.rev_append data info.data)
| E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } ->
let data = add_fun_name_ fun_name data in
Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor
writer
| E_exit_manual_span { tid; time_us; name; id; flavor; data } ->
Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data
writer
| E_counter { tid; name; time_us; n } ->
Writer.emit_counter ~name ~tid ~ts:time_us writer n
| E_name_process { name } -> Writer.emit_name_process ~name writer
| E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer
in
try
while true do
(* get all the events in the incoming blocking queue, in
one single critical section. *)
let local = B_queue.pop_all events in
List.iter handle_ev local
done
with B_queue.Closed ->
(* write a message about us closing *)
Writer.emit_instant_event ~name:"tef-worker.exit"
~tid:(Thread.id @@ Thread.self ())
~ts:(now_us ()) ~args:[] writer;
(* warn if app didn't close all spans *)
if Span_tbl.length spans > 0 then
Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!"
(Span_tbl.length spans);
()
(** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file *)
let tick_thread events : unit =
try
while true do
Thread.delay 0.5;
B_queue.push events E_tick
done
with B_queue.Closed -> ()
*)
type output =
[ `Stdout
| `Stderr
| `File of string
]
let collector ~out () : collector = assert false
(* TODO:
let module M = struct
let active = A.make true
(** generator for span ids *)
let span_id_gen_ = A.make 0
(* queue of messages to write *)
let events : event B_queue.t = B_queue.create ()
(** writer thread. It receives events and writes them to [oc]. *)
let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) ()
(** ticker thread, regularly sends a message to the writer thread.
no need to join it. *)
let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) ()
let shutdown () =
if A.exchange active false then (
B_queue.close events;
(* wait for writer thread to be done. The writer thread will exit
after processing remaining events because the queue is now closed *)
Thread.join t_write
)
let get_tid_ () : int =
if !Mock_.enabled then
3
else
Thread.id (Thread.self ())
let[@inline] enter_span_ ~fun_name ~data name : span =
let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in
let tid = get_tid_ () in
let time_us = now_us () in
B_queue.push events
(E_define_span { tid; name; time_us; id = span; fun_name; data });
span
let enter_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name :
span =
enter_span_ ~fun_name ~data name
let exit_span span : unit =
let time_us = now_us () in
B_queue.push events (E_exit_span { id = span; time_us })
(* re-raise exception with its backtrace *)
external reraise : exn -> 'a = "%reraise"
let with_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name f =
let span = enter_span_ ~fun_name ~data name in
try
let x = f span in
exit_span span;
x
with exn ->
exit_span span;
reraise exn
let add_data_to_span span data =
if data <> [] then B_queue.push events (E_add_data { id = span; data })
let enter_manual_span ~(parent : explicit_span option) ~flavor
~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name :
explicit_span =
(* get the id, or make a new one *)
let id =
match parent with
| Some m -> Meta_map.find_exn key_async_id m.meta
| None -> A.fetch_and_add span_id_gen_ 1
in
let time_us = now_us () in
B_queue.push events
(E_enter_manual_span
{ id; time_us; tid = get_tid_ (); data; name; fun_name; flavor });
{
span = 0L;
meta =
Meta_map.(
empty |> add key_async_id id |> add key_async_data (name, flavor));
}
let exit_manual_span (es : explicit_span) : unit =
let id = Meta_map.find_exn key_async_id es.meta in
let name, flavor = Meta_map.find_exn key_async_data es.meta in
let data =
try !(Meta_map.find_exn key_data es.meta) with Not_found -> []
in
let time_us = now_us () in
let tid = get_tid_ () in
B_queue.push events
(E_exit_manual_span { tid; id; name; time_us; data; flavor })
let add_data_to_manual_span (es : explicit_span) data =
if data <> [] then (
let data_ref, add =
try Meta_map.find_exn key_data es.meta, false
with Not_found -> ref [], true
in
let new_data = List.rev_append data !data_ref in
data_ref := new_data;
if add then es.meta <- Meta_map.add key_data data_ref es.meta
)
let message ?span:_ ~data msg : unit =
let time_us = now_us () in
let tid = get_tid_ () in
B_queue.push events (E_message { tid; time_us; msg; data })
let counter_float ~data:_ name f =
let time_us = now_us () in
let tid = get_tid_ () in
B_queue.push events (E_counter { name; n = f; time_us; tid })
let counter_int ~data name i = counter_float ~data name (float_of_int i)
let name_process name : unit = B_queue.push events (E_name_process { name })
let name_thread name : unit =
let tid = get_tid_ () in
B_queue.push events (E_name_thread { tid; name })
end in
(module M)
*)
let collector = Fcollector.create
let setup ?(out = `Env) () =
match out with
| `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr ()
| `Stdout -> Trace_core.setup_collector @@ collector ~out:`Stdout ()
| `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) ()
| `Stderr -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr ()
| `Stdout -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout ()
| `File path ->
Trace_core.setup_collector @@ Fcollector.create ~out:(`File path) ()
| `Env ->
(match Sys.getenv_opt "TRACE" with
| Some ("1" | "true") ->
let path = "trace.fxt" in
let c = collector ~out:(`File path) () in
let c = Fcollector.create ~out:(`File path) () in
Trace_core.setup_collector c
| Some "stdout" -> Trace_core.setup_collector @@ collector ~out:`Stdout ()
| Some "stderr" -> Trace_core.setup_collector @@ collector ~out:`Stderr ()
| Some "stdout" ->
Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout ()
| Some "stderr" ->
Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr ()
| Some path ->
let c = collector ~out:(`File path) () in
let c = Fcollector.create ~out:(`File path) () in
Trace_core.setup_collector c
| None -> ())
@ -419,6 +34,5 @@ let with_setup ?out () f =
Fun.protect ~finally:Trace_core.shutdown f
module Internal_ = struct
let mock_all_ () = Mock_.enabled := true
let on_tracing_error = on_tracing_error
end

View file

@ -40,9 +40,6 @@ val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
(**/**)
module Internal_ : sig
val mock_all_ : unit -> unit
(** use fake, deterministic timestamps, TID, PID *)
val on_tracing_error : (string -> unit) ref
end

42
src/fuchsia/write/buf.ml Normal file
View file

@ -0,0 +1,42 @@
open Util
type t = {
buf: bytes;
mutable offset: int;
}
let empty : t = { buf = Bytes.empty; offset = 0 }
let create (n : int) : t =
let buf = Bytes.create (round_to_word n) in
{ buf; offset = 0 }
let[@inline] clear self = self.offset <- 0
let[@inline] available self = Bytes.length self.buf - self.offset
let[@inline] size self = self.offset
(* see below: we assume little endian *)
let () = assert (not Sys.big_endian)
let[@inline] add_i64 (self : t) (i : int64) : unit =
(* NOTE: we use LE, most systems are this way, even though fuchsia
says we should use the system's native endianess *)
Bytes.set_int64_le self.buf self.offset i;
self.offset <- self.offset + 8
let[@inline] add_string (self : t) (s : string) : unit =
let len = String.length s in
let missing = missing_to_round len in
(* bound check *)
assert (len + missing + self.offset < Bytes.length self.buf);
Bytes.unsafe_blit_string s 0 self.buf self.offset len;
self.offset <- self.offset + len;
(* add 0-padding *)
if missing != 0 then (
Bytes.unsafe_fill self.buf self.offset missing '\x00';
self.offset <- self.offset + missing
)
let to_string (self : t) : string = Bytes.sub_string self.buf 0 self.offset

View file

@ -0,0 +1,58 @@
open struct
module A = Atomic
exception Got_buf of Buf.t
end
module List_with_len = struct
type +'a t =
| Nil
| Cons of int * 'a * 'a t
let empty : _ t = Nil
let[@inline] len = function
| Nil -> 0
| Cons (i, _, _) -> i
let[@inline] cons x self = Cons (len self + 1, x, self)
end
type t = {
max_len: int;
buf_size: int;
bufs: Buf.t List_with_len.t A.t;
}
let create ?(max_len = 64) ?(buf_size = 1 lsl 16) () : t =
let buf_size = min (1 lsl 22) (max buf_size (1 lsl 15)) in
{ max_len; buf_size; bufs = A.make List_with_len.empty }
let alloc (self : t) : Buf.t =
try
while
match A.get self.bufs with
| Nil -> false
| Cons (_, buf, tl) as old ->
if A.compare_and_set self.bufs old tl then
raise (Got_buf buf)
else
false
do
()
done;
Buf.create self.buf_size
with Got_buf b -> b
let recycle (self : t) (buf : Buf.t) : unit =
Buf.clear buf;
try
while
match A.get self.bufs with
| Cons (i, _, _) when i >= self.max_len -> raise Exit
| old ->
not (A.compare_and_set self.bufs old (List_with_len.cons buf old))
do
()
done
with Exit -> () (* do not recycle *)

View file

@ -3,4 +3,7 @@
(name trace_fuchsia_write)
(public_name trace-fuchsia.write)
(synopsis "Serialization part of trace-fuchsia")
(ocamlopt_flags :standard -S
;-dlambda
)
(libraries trace.core atomic threads))

View file

@ -0,0 +1,46 @@
type t = {
mutable buf: Buf.t;
mutable send_buf: Buf.t -> unit;
buf_pool: Buf_pool.t;
}
let create ~(buf_pool : Buf_pool.t) ~send_buf () : t =
let buf_size = buf_pool.buf_size in
let buf = Buf.create buf_size in
{ buf; send_buf; buf_pool }
open struct
let flush_ (self : t) : unit =
self.send_buf self.buf;
let buf = Buf_pool.alloc self.buf_pool in
self.buf <- buf
let[@inline never] cycle_buf (self : t) ~available : Buf.t =
flush_ self;
let buf = self.buf in
if Buf.available buf < available then
failwith "fuchsia: buffer is too small";
buf
end
let[@inline] flush (self : t) : unit = if Buf.size self.buf > 0 then flush_ self
(** Obtain a buffer with at least [available] bytes *)
let[@inline] get_buf (self : t) ~(available_word : int) : Buf.t =
let available = available_word lsl 3 in
if Buf.available self.buf >= available then
self.buf
else
cycle_buf self ~available
let into_buffer ~buf_pool (buffer : Buffer.t) : t =
let send_buf (buf : Buf.t) =
Buffer.add_subbytes buffer buf.buf 0 buf.offset
in
create ~buf_pool ~send_buf ()
let dispose (self : t) : unit =
flush_ self;
Buf_pool.recycle self.buf_pool self.buf;
self.buf <- Buf.empty

View file

@ -2,53 +2,17 @@
Reference: https://fuchsia.dev/fuchsia-src/reference/tracing/trace-format *)
module B = Bytes
module Util = Util
module Buf = Buf
module Output = Output
module Buf_pool = Buf_pool
open struct
let spf = Printf.sprintf
end
module Util = struct
(** How many bytes are missing for [n] to be a multiple of 8 *)
let[@inline] missing_to_round (n : int) : int = lnot (n - 1) land 0b111
(** Round up to a multiple of 8 *)
let[@inline] round_to_word (n : int) : int = n + (lnot (n - 1) land 0b111)
end
open Util
module Buf = struct
type t = {
buf: bytes;
mutable offset: int;
}
let create (n : int) : t =
let buf = Bytes.create (round_to_word n) in
{ buf; offset = 0 }
let[@inline] clear self = self.offset <- 0
let[@inline] add_i64 (self : t) (i : int64) : unit =
(* NOTE: we use LE, most systems are this way, even though fuchsia
says we should use the system's native endianess *)
Bytes.set_int64_le self.buf self.offset i;
self.offset <- self.offset + 8
let add_string (self : t) (s : string) : unit =
let len = String.length s in
Bytes.blit_string s 0 self.buf self.offset len;
self.offset <- self.offset + len;
(* add 0-padding *)
let missing = missing_to_round len in
Bytes.fill self.buf self.offset missing '\x00';
self.offset <- self.offset + missing
let to_string (self : t) : string = Bytes.sub_string self.buf 0 self.offset
end
type user_data = Trace_core.user_data
module I64 = struct
@ -85,6 +49,8 @@ module Thread_ref = struct
tid: int;
}
let inline ~pid ~tid : t = Inline { pid; tid }
let ref x : t =
if x = 0 || x > 255 then
invalid_arg "fuchsia: thread inline ref must be >0 < 256";
@ -108,7 +74,10 @@ module Metadata = struct
module Magic_record = struct
let value = 0x0016547846040010L
let size_word = 1
let encode (buf : Buf.t) = Buf.add_i64 buf value
let encode (out : Output.t) =
let buf = Output.get_buf out ~available_word:size_word in
Buf.add_i64 buf value
end
module Initialization_record = struct
@ -117,7 +86,8 @@ module Metadata = struct
(** Default: 1 tick = 1 ns *)
let default_ticks_per_sec = 1_000_000_000L
let encode (buf : Buf.t) ~ticks_per_secs () : unit =
let encode (out : Output.t) ~ticks_per_secs () : unit =
let buf = Output.get_buf out ~available_word:size_word in
let hd = I64.(1L lor (of_int size_word lsl 4)) in
Buf.add_i64 buf hd;
Buf.add_i64 buf ticks_per_secs
@ -126,8 +96,9 @@ module Metadata = struct
module Provider_info = struct
let size_word ~name () = 1 + (round_to_word (String.length name) lsr 3)
let encode buf ~(id : int) ~name () : unit =
let encode (out : Output.t) ~(id : int) ~name () : unit =
let size = size_word ~name () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
(of_int size lsl 4)
@ -216,17 +187,30 @@ end
module Arguments = struct
type t = Argument.t list
let[@inline] len (self : t) : int =
match self with
| [] -> 0
| [ _ ] -> 1
| _ :: _ :: tl -> 2 + List.length tl
let check_valid (self : t) =
let len = List.length self in
let len = len self in
if len > 15 then
invalid_arg (spf "fuchsia: can have at most 15 args, got %d" len);
List.iter Argument.check_valid self;
()
let[@inline] size_word (self : t) =
List.fold_left (fun n arg -> n + Argument.size_word arg) 0 self
match self with
| [] -> 0
| [ a ] -> Argument.size_word a
| a :: b :: tl ->
List.fold_left
(fun n arg -> n + Argument.size_word arg)
(Argument.size_word a + Argument.size_word b)
tl
let encode (buf : Buf.t) (self : t) =
let[@inline] encode (buf : Buf.t) (self : t) =
let rec aux buf l =
match l with
| [] -> ()
@ -234,7 +218,13 @@ module Arguments = struct
Argument.encode buf x;
aux buf tl
in
aux buf self
match self with
| [] -> ()
| [ x ] -> Argument.encode buf x
| x :: tl ->
Argument.encode buf x;
aux buf tl
end
(** record type = 3 *)
@ -242,9 +232,12 @@ module Thread_record = struct
let size_word : int = 3
(** Record that [Thread_ref.ref as_ref] represents the pair [pid, tid] *)
let encode (buf : Buf.t) ~as_ref ~pid ~tid () : unit =
let encode (out : Output.t) ~as_ref ~pid ~tid () : unit =
if as_ref <= 0 || as_ref > 255 then
invalid_arg "fuchsia: thread_record: invalid ref";
let buf = Output.get_buf out ~available_word:size_word in
let hd = I64.(3L lor (of_int size_word lsl 4) lor (of_int as_ref lsl 16)) in
Buf.add_i64 buf hd;
Buf.add_i64 buf (I64.of_int pid);
@ -253,21 +246,24 @@ end
(** record type = 4 *)
module Event = struct
(** type=0 *)
module Instant = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) / 8)
+ Arguments.size_word args
let encode (buf : Buf.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () :
unit =
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
(* set category = 0 *)
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (of_int (List.length args) lsl 20)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
@ -285,22 +281,132 @@ module Event = struct
()
end
(** type=1 *)
module Counter = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args + 1 (* counter id *)
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (1L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
(* just use 0 as counter id *)
Buf.add_i64 buf 0L;
()
end
(** type=2 *)
module Duration_begin = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (2L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
()
end
(** type=3 *)
module Duration_end = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (3L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
()
end
(** type=4 *)
module Duration_complete = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args + 1 (* end timestamp *)
let encode (buf : Buf.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~end_time_ns
~args () : unit =
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
~end_time_ns ~args () : unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
(* set category = 0 *)
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (4L lsl 16)
lor (of_int (List.length args) lsl 20)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in

View file

@ -0,0 +1,5 @@
(** How many bytes are missing for [n] to be a multiple of 8 *)
let[@inline] missing_to_round (n : int) : int = lnot (n - 1) land 0b111
(** Round up to a multiple of 8 *)
let[@inline] round_to_word (n : int) : int = n + (lnot (n - 1) land 0b111)