mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
fuchsia: metadata events
This commit is contained in:
parent
f34671b05c
commit
713cf6b4cf
4 changed files with 204 additions and 78 deletions
|
|
@ -3,6 +3,7 @@ module Trace = Trace_core
|
|||
let ( let@ ) = ( @@ )
|
||||
|
||||
let work ~dom_idx ~n () : unit =
|
||||
Trace_core.set_thread_name (Printf.sprintf "worker%d" dom_idx);
|
||||
for _i = 1 to n do
|
||||
let%trace _sp = "outer" in
|
||||
Trace_core.add_data_to_span _sp [ "i", `Int _i ];
|
||||
|
|
@ -23,10 +24,15 @@ let main ~n ~j () : unit =
|
|||
let domains =
|
||||
Array.init j (fun dom_idx -> Domain.spawn (fun () -> work ~dom_idx ~n ()))
|
||||
in
|
||||
|
||||
let%trace () = "join" in
|
||||
Array.iter Domain.join domains
|
||||
|
||||
let () =
|
||||
let@ () = Trace_fuchsia.with_setup () in
|
||||
Trace_core.set_process_name "trace_fxt";
|
||||
|
||||
let%trace () = "main" in
|
||||
|
||||
let n = ref 10_000 in
|
||||
let j = ref 4 in
|
||||
|
|
|
|||
|
|
@ -64,11 +64,13 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
|
|||
*)
|
||||
|
||||
(** Thread that simply regularly "ticks", sending events to
|
||||
the background thread so it has a chance to write to the file *)
|
||||
let tick_thread events : unit =
|
||||
the background thread so it has a chance to write to the file,
|
||||
and call [f()] *)
|
||||
let tick_thread events ~f : unit =
|
||||
try
|
||||
while true do
|
||||
Thread.delay 0.5;
|
||||
B_queue.push events E_tick
|
||||
B_queue.push events E_tick;
|
||||
f ()
|
||||
done
|
||||
with B_queue.Closed -> ()
|
||||
|
|
|
|||
|
|
@ -1,34 +1,24 @@
|
|||
open Trace_core
|
||||
open Common_
|
||||
module TLS = Thread_local_storage
|
||||
module Int_map = Map.Make (Int)
|
||||
|
||||
let pid = Unix.getpid ()
|
||||
|
||||
type state = {
|
||||
active: bool A.t;
|
||||
events: Bg_thread.event B_queue.t;
|
||||
span_id_gen: int A.t; (** Used for async spans *)
|
||||
bg_thread: Thread.t;
|
||||
buf_pool: Buf_pool.t;
|
||||
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
|
||||
}
|
||||
|
||||
type span_info = {
|
||||
start_time_ns: int64;
|
||||
name: string;
|
||||
mutable data: (string * user_data) list;
|
||||
}
|
||||
|
||||
(* TODO:
|
||||
(** key used to carry a unique "id" for all spans in an async context *)
|
||||
let key_async_id : int Meta_map.Key.t = Meta_map.Key.create ()
|
||||
type async_span_info = {
|
||||
async_id: int;
|
||||
flavor: [ `Sync | `Async ] option;
|
||||
name: string;
|
||||
mutable data: (string * user_data) list;
|
||||
}
|
||||
|
||||
let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t =
|
||||
Meta_map.Key.create ()
|
||||
|
||||
let key_data : (string * user_data) list ref Meta_map.Key.t =
|
||||
Meta_map.Key.create ()
|
||||
*)
|
||||
let key_async_data : async_span_info Meta_map.Key.t = Meta_map.Key.create ()
|
||||
|
||||
open struct
|
||||
let state_id_ = A.make 0
|
||||
|
|
@ -46,6 +36,16 @@ type per_thread_state = {
|
|||
spans: span_info Span_tbl.t; (** In-flight spans *)
|
||||
}
|
||||
|
||||
type state = {
|
||||
active: bool A.t;
|
||||
events: Bg_thread.event B_queue.t;
|
||||
span_id_gen: int A.t; (** Used for async spans *)
|
||||
bg_thread: Thread.t;
|
||||
buf_pool: Buf_pool.t;
|
||||
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
|
||||
threads: per_thread_state Int_map.t A.t;
|
||||
}
|
||||
|
||||
let key_thread_local_st : per_thread_state TLS.key =
|
||||
TLS.new_key (fun () ->
|
||||
let tid = Thread.id @@ Thread.self () in
|
||||
|
|
@ -64,6 +64,12 @@ let out_of_st (st : state) : Output.t =
|
|||
try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ()
|
||||
))
|
||||
|
||||
let flush_all_ (st : state) =
|
||||
let outs = A.get st.threads in
|
||||
|
||||
Int_map.iter (fun _tid (tls : per_thread_state) -> ()) outs;
|
||||
()
|
||||
|
||||
module C (St : sig
|
||||
val st : state
|
||||
end)
|
||||
|
|
@ -85,16 +91,28 @@ struct
|
|||
self.thread_ref <- FWrite.Thread_ref.ref th_ref;
|
||||
FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid ()
|
||||
);
|
||||
|
||||
(* add to [st]'s list of threads *)
|
||||
while
|
||||
let old = A.get st.threads in
|
||||
not (A.compare_and_set st.threads old (Int_map.add self.tid self old))
|
||||
do
|
||||
()
|
||||
done;
|
||||
|
||||
()
|
||||
|
||||
(** Obtain the output for the current thread *)
|
||||
let[@inline] get_thread_output () : Output.t * per_thread_state =
|
||||
let st = TLS.get key_thread_local_st in
|
||||
if st.state_id != state_id || st.out == None then update_local_state st;
|
||||
Option.get st.out, st
|
||||
let tls = TLS.get key_thread_local_st in
|
||||
if tls.state_id != state_id || tls.out == None then update_local_state tls;
|
||||
Option.get tls.out, tls
|
||||
|
||||
let shutdown () =
|
||||
if A.exchange st.active false then (
|
||||
(* flush all outputs *)
|
||||
flush_all_ st;
|
||||
|
||||
B_queue.close st.events;
|
||||
(* wait for writer thread to be done. The writer thread will exit
|
||||
after processing remaining events because the queue is now closed *)
|
||||
|
|
@ -147,54 +165,40 @@ struct
|
|||
| None -> !on_tracing_error (spf "unknown span %Ld" span)
|
||||
| Some info -> info.data <- List.rev_append data info.data
|
||||
|
||||
let enter_manual_span ~(parent : explicit_span option) ~flavor
|
||||
~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span
|
||||
=
|
||||
assert false
|
||||
(* TODO:
|
||||
let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_
|
||||
~__FILE__:_ ~__LINE__:_ ~data name : explicit_span =
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
|
||||
(* get the id, or make a new one *)
|
||||
let id =
|
||||
let async_id =
|
||||
match parent with
|
||||
| Some m -> Meta_map.find_exn key_async_id m.meta
|
||||
| None -> A.fetch_and_add span_id_gen_ 1
|
||||
| Some m -> (Meta_map.find_exn key_async_data m.meta).async_id
|
||||
| None -> A.fetch_and_add st.span_id_gen 1
|
||||
in
|
||||
let time_us = now_us () in
|
||||
B_queue.push events
|
||||
(E_enter_manual_span
|
||||
{ id; time_us; tid = get_tid_ (); data; name; fun_name; flavor });
|
||||
|
||||
FWrite.Event.Async_begin.encode out ~name ~args:data ~t_ref:tls.thread_ref
|
||||
~time_ns ~async_id ();
|
||||
{
|
||||
span = 0L;
|
||||
meta =
|
||||
Meta_map.(
|
||||
empty |> add key_async_id id |> add key_async_data (name, flavor));
|
||||
empty |> add key_async_data { async_id; name; flavor; data = [] });
|
||||
}
|
||||
*)
|
||||
|
||||
let exit_manual_span (es : explicit_span) : unit = assert false
|
||||
(* TODO:
|
||||
let id = Meta_map.find_exn key_async_id es.meta in
|
||||
let name, flavor = Meta_map.find_exn key_async_data es.meta in
|
||||
let data =
|
||||
try !(Meta_map.find_exn key_data es.meta) with Not_found -> []
|
||||
let exit_manual_span (es : explicit_span) : unit =
|
||||
let { async_id; name; data; flavor = _ } =
|
||||
Meta_map.find_exn key_async_data es.meta
|
||||
in
|
||||
let time_us = now_us () in
|
||||
let tid = get_tid_ () in
|
||||
B_queue.push events
|
||||
(E_exit_manual_span { tid; id; name; time_us; data; flavor })
|
||||
*)
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
|
||||
let add_data_to_manual_span (es : explicit_span) data = assert false
|
||||
(* TODO:
|
||||
if data <> [] then (
|
||||
let data_ref, add =
|
||||
try Meta_map.find_exn key_data es.meta, false
|
||||
with Not_found -> ref [], true
|
||||
in
|
||||
let new_data = List.rev_append data !data_ref in
|
||||
data_ref := new_data;
|
||||
if add then es.meta <- Meta_map.add key_data data_ref es.meta
|
||||
)
|
||||
*)
|
||||
FWrite.Event.Async_end.encode out ~name ~t_ref:tls.thread_ref ~time_ns
|
||||
~args:data ~async_id ()
|
||||
|
||||
let add_data_to_manual_span (es : explicit_span) data =
|
||||
let m = Meta_map.find_exn key_async_data es.meta in
|
||||
m.data <- List.rev_append data m.data
|
||||
|
||||
let message ?span:_ ~data msg : unit =
|
||||
let out, tls = get_thread_output () in
|
||||
|
|
@ -216,14 +220,16 @@ struct
|
|||
~args:((name, `Int i) :: data)
|
||||
()
|
||||
|
||||
let name_process name : unit = ()
|
||||
(* TODO: B_queue.push events (E_name_process { name }) *)
|
||||
let name_process name : unit =
|
||||
let out, tls = get_thread_output () in
|
||||
FWrite.Kernel_object.(encode out ~name ~ty:ty_process ~kid:pid ~args:[] ())
|
||||
|
||||
let name_thread name : unit = ()
|
||||
(* TODO:
|
||||
let tid = get_tid_ () in
|
||||
B_queue.push events (E_name_thread { tid; name })
|
||||
*)
|
||||
let name_thread name : unit =
|
||||
let out, tls = get_thread_output () in
|
||||
FWrite.Kernel_object.(
|
||||
encode out ~name ~ty:ty_thread ~kid:tls.tid
|
||||
~args:[ "process", `Int pid ]
|
||||
())
|
||||
end
|
||||
|
||||
let create ~out () : collector =
|
||||
|
|
@ -233,7 +239,6 @@ let create ~out () : collector =
|
|||
let bg_thread =
|
||||
Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) ()
|
||||
in
|
||||
let _tick_thread = Thread.create Bg_thread.tick_thread events in
|
||||
|
||||
let st =
|
||||
{
|
||||
|
|
@ -243,9 +248,15 @@ let create ~out () : collector =
|
|||
events;
|
||||
span_id_gen = A.make 0;
|
||||
next_thread_ref = A.make 1;
|
||||
threads = A.make Int_map.empty;
|
||||
}
|
||||
in
|
||||
|
||||
let _tick_thread =
|
||||
Thread.create (fun () ->
|
||||
Bg_thread.tick_thread events ~f:(fun () -> flush_all_ st))
|
||||
in
|
||||
|
||||
(* write header *)
|
||||
let out = out_of_st st in
|
||||
FWrite.Metadata.Magic_record.encode out;
|
||||
|
|
|
|||
|
|
@ -424,4 +424,111 @@ module Event = struct
|
|||
Buf.add_i64 buf end_time_ns;
|
||||
()
|
||||
end
|
||||
|
||||
(** type=5 *)
|
||||
module Async_begin = struct
|
||||
let size_word ~name ~t_ref ~args () : int =
|
||||
1 + Thread_ref.size_word t_ref + 1
|
||||
(* timestamp *) + (round_to_word (String.length name) lsr 3)
|
||||
+ Arguments.size_word args + 1 (* async id *)
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
~(async_id : int) ~args () : unit =
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
4L
|
||||
lor (of_int size lsl 4)
|
||||
lor (5L lsl 16)
|
||||
lor (of_int (Arguments.len args) lsl 20)
|
||||
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
|
||||
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
|
||||
in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_i64 buf time_ns;
|
||||
|
||||
(match t_ref with
|
||||
| Thread_ref.Inline { pid; tid } ->
|
||||
Buf.add_i64 buf (I64.of_int pid);
|
||||
Buf.add_i64 buf (I64.of_int tid)
|
||||
| Thread_ref.Ref _ -> ());
|
||||
|
||||
Buf.add_string buf name;
|
||||
Arguments.encode buf args;
|
||||
Buf.add_i64 buf (I64.of_int async_id);
|
||||
()
|
||||
end
|
||||
|
||||
(** type=7 *)
|
||||
module Async_end = struct
|
||||
let size_word ~name ~t_ref ~args () : int =
|
||||
1 + Thread_ref.size_word t_ref + 1
|
||||
(* timestamp *) + (round_to_word (String.length name) lsr 3)
|
||||
+ Arguments.size_word args + 1 (* async id *)
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
~(async_id : int) ~args () : unit =
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
4L
|
||||
lor (of_int size lsl 4)
|
||||
lor (7L lsl 16)
|
||||
lor (of_int (Arguments.len args) lsl 20)
|
||||
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
|
||||
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
|
||||
in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_i64 buf time_ns;
|
||||
|
||||
(match t_ref with
|
||||
| Thread_ref.Inline { pid; tid } ->
|
||||
Buf.add_i64 buf (I64.of_int pid);
|
||||
Buf.add_i64 buf (I64.of_int tid)
|
||||
| Thread_ref.Ref _ -> ());
|
||||
|
||||
Buf.add_string buf name;
|
||||
Arguments.encode buf args;
|
||||
Buf.add_i64 buf (I64.of_int async_id);
|
||||
()
|
||||
end
|
||||
end
|
||||
|
||||
(** record type = 7 *)
|
||||
module Kernel_object = struct
|
||||
let size_word ~name ~args () : int =
|
||||
1 + 1
|
||||
(* id *) + (round_to_word (String.length name) lsr 3)
|
||||
+ Arguments.size_word args
|
||||
|
||||
(* see:
|
||||
https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/types.h;l=441?q=ZX_OBJ_TYPE&ss=fuchsia%2Ffuchsia
|
||||
*)
|
||||
|
||||
type ty = int
|
||||
|
||||
let ty_process : ty = 1
|
||||
let ty_thread : ty = 2
|
||||
|
||||
let encode (out : Output.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit =
|
||||
let size = size_word ~name ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
7L
|
||||
lor (of_int size lsl 4)
|
||||
lor (of_int ty lsl 16)
|
||||
lor (of_int (Arguments.len args) lsl 40)
|
||||
lor (of_int (Str_ref.inline (String.length name)) lsl 24))
|
||||
in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_i64 buf (I64.of_int kid);
|
||||
Buf.add_string buf name;
|
||||
Arguments.encode buf args;
|
||||
()
|
||||
end
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue