diff --git a/bench/trace_fx.ml b/bench/trace_fx.ml index aea9221..fdf65c9 100644 --- a/bench/trace_fx.ml +++ b/bench/trace_fx.ml @@ -3,6 +3,7 @@ module Trace = Trace_core let ( let@ ) = ( @@ ) let work ~dom_idx ~n () : unit = + Trace_core.set_thread_name (Printf.sprintf "worker%d" dom_idx); for _i = 1 to n do let%trace _sp = "outer" in Trace_core.add_data_to_span _sp [ "i", `Int _i ]; @@ -23,10 +24,15 @@ let main ~n ~j () : unit = let domains = Array.init j (fun dom_idx -> Domain.spawn (fun () -> work ~dom_idx ~n ())) in + + let%trace () = "join" in Array.iter Domain.join domains let () = let@ () = Trace_fuchsia.with_setup () in + Trace_core.set_process_name "trace_fxt"; + + let%trace () = "main" in let n = ref 10_000 in let j = ref 4 in diff --git a/src/fuchsia/bg_thread.ml b/src/fuchsia/bg_thread.ml index 1ac6aa0..8664452 100644 --- a/src/fuchsia/bg_thread.ml +++ b/src/fuchsia/bg_thread.ml @@ -64,11 +64,13 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit = *) (** Thread that simply regularly "ticks", sending events to - the background thread so it has a chance to write to the file *) -let tick_thread events : unit = + the background thread so it has a chance to write to the file, + and call [f()] *) +let tick_thread events ~f : unit = try while true do Thread.delay 0.5; - B_queue.push events E_tick + B_queue.push events E_tick; + f () done with B_queue.Closed -> () diff --git a/src/fuchsia/fcollector.ml b/src/fuchsia/fcollector.ml index ecfb046..5c661c2 100644 --- a/src/fuchsia/fcollector.ml +++ b/src/fuchsia/fcollector.ml @@ -1,34 +1,24 @@ open Trace_core open Common_ module TLS = Thread_local_storage +module Int_map = Map.Make (Int) let pid = Unix.getpid () -type state = { - active: bool A.t; - events: Bg_thread.event B_queue.t; - span_id_gen: int A.t; (** Used for async spans *) - bg_thread: Thread.t; - buf_pool: Buf_pool.t; - next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) -} - type span_info = { start_time_ns: int64; name: string; mutable data: (string * user_data) list; } -(* TODO: - (** key used to carry a unique "id" for all spans in an async context *) - let key_async_id : int Meta_map.Key.t = Meta_map.Key.create () +type async_span_info = { + async_id: int; + flavor: [ `Sync | `Async ] option; + name: string; + mutable data: (string * user_data) list; +} - let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t = - Meta_map.Key.create () - - let key_data : (string * user_data) list ref Meta_map.Key.t = - Meta_map.Key.create () -*) +let key_async_data : async_span_info Meta_map.Key.t = Meta_map.Key.create () open struct let state_id_ = A.make 0 @@ -46,6 +36,16 @@ type per_thread_state = { spans: span_info Span_tbl.t; (** In-flight spans *) } +type state = { + active: bool A.t; + events: Bg_thread.event B_queue.t; + span_id_gen: int A.t; (** Used for async spans *) + bg_thread: Thread.t; + buf_pool: Buf_pool.t; + next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) + threads: per_thread_state Int_map.t A.t; +} + let key_thread_local_st : per_thread_state TLS.key = TLS.new_key (fun () -> let tid = Thread.id @@ Thread.self () in @@ -64,6 +64,12 @@ let out_of_st (st : state) : Output.t = try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> () )) +let flush_all_ (st : state) = + let outs = A.get st.threads in + + Int_map.iter (fun _tid (tls : per_thread_state) -> ()) outs; + () + module C (St : sig val st : state end) @@ -85,16 +91,28 @@ struct self.thread_ref <- FWrite.Thread_ref.ref th_ref; FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid () ); + + (* add to [st]'s list of threads *) + while + let old = A.get st.threads in + not (A.compare_and_set st.threads old (Int_map.add self.tid self old)) + do + () + done; + () (** Obtain the output for the current thread *) let[@inline] get_thread_output () : Output.t * per_thread_state = - let st = TLS.get key_thread_local_st in - if st.state_id != state_id || st.out == None then update_local_state st; - Option.get st.out, st + let tls = TLS.get key_thread_local_st in + if tls.state_id != state_id || tls.out == None then update_local_state tls; + Option.get tls.out, tls let shutdown () = if A.exchange st.active false then ( + (* flush all outputs *) + flush_all_ st; + B_queue.close st.events; (* wait for writer thread to be done. The writer thread will exit after processing remaining events because the queue is now closed *) @@ -147,54 +165,40 @@ struct | None -> !on_tracing_error (spf "unknown span %Ld" span) | Some info -> info.data <- List.rev_append data info.data - let enter_manual_span ~(parent : explicit_span option) ~flavor - ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span - = - assert false - (* TODO: - (* get the id, or make a new one *) - let id = - match parent with - | Some m -> Meta_map.find_exn key_async_id m.meta - | None -> A.fetch_and_add span_id_gen_ 1 - in - let time_us = now_us () in - B_queue.push events - (E_enter_manual_span - { id; time_us; tid = get_tid_ (); data; name; fun_name; flavor }); - { - span = 0L; - meta = - Meta_map.( - empty |> add key_async_id id |> add key_async_data (name, flavor)); - } - *) + let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_ + ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in - let exit_manual_span (es : explicit_span) : unit = assert false - (* TODO: - let id = Meta_map.find_exn key_async_id es.meta in - let name, flavor = Meta_map.find_exn key_async_data es.meta in - let data = - try !(Meta_map.find_exn key_data es.meta) with Not_found -> [] - in - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events - (E_exit_manual_span { tid; id; name; time_us; data; flavor }) - *) + (* get the id, or make a new one *) + let async_id = + match parent with + | Some m -> (Meta_map.find_exn key_async_data m.meta).async_id + | None -> A.fetch_and_add st.span_id_gen 1 + in - let add_data_to_manual_span (es : explicit_span) data = assert false - (* TODO: - if data <> [] then ( - let data_ref, add = - try Meta_map.find_exn key_data es.meta, false - with Not_found -> ref [], true - in - let new_data = List.rev_append data !data_ref in - data_ref := new_data; - if add then es.meta <- Meta_map.add key_data data_ref es.meta - ) - *) + FWrite.Event.Async_begin.encode out ~name ~args:data ~t_ref:tls.thread_ref + ~time_ns ~async_id (); + { + span = 0L; + meta = + Meta_map.( + empty |> add key_async_data { async_id; name; flavor; data = [] }); + } + + let exit_manual_span (es : explicit_span) : unit = + let { async_id; name; data; flavor = _ } = + Meta_map.find_exn key_async_data es.meta + in + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + + FWrite.Event.Async_end.encode out ~name ~t_ref:tls.thread_ref ~time_ns + ~args:data ~async_id () + + let add_data_to_manual_span (es : explicit_span) data = + let m = Meta_map.find_exn key_async_data es.meta in + m.data <- List.rev_append data m.data let message ?span:_ ~data msg : unit = let out, tls = get_thread_output () in @@ -216,14 +220,16 @@ struct ~args:((name, `Int i) :: data) () - let name_process name : unit = () - (* TODO: B_queue.push events (E_name_process { name }) *) + let name_process name : unit = + let out, tls = get_thread_output () in + FWrite.Kernel_object.(encode out ~name ~ty:ty_process ~kid:pid ~args:[] ()) - let name_thread name : unit = () - (* TODO: - let tid = get_tid_ () in - B_queue.push events (E_name_thread { tid; name }) - *) + let name_thread name : unit = + let out, tls = get_thread_output () in + FWrite.Kernel_object.( + encode out ~name ~ty:ty_thread ~kid:tls.tid + ~args:[ "process", `Int pid ] + ()) end let create ~out () : collector = @@ -233,7 +239,6 @@ let create ~out () : collector = let bg_thread = Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) () in - let _tick_thread = Thread.create Bg_thread.tick_thread events in let st = { @@ -243,9 +248,15 @@ let create ~out () : collector = events; span_id_gen = A.make 0; next_thread_ref = A.make 1; + threads = A.make Int_map.empty; } in + let _tick_thread = + Thread.create (fun () -> + Bg_thread.tick_thread events ~f:(fun () -> flush_all_ st)) + in + (* write header *) let out = out_of_st st in FWrite.Metadata.Magic_record.encode out; diff --git a/src/fuchsia/write/trace_fuchsia_write.ml b/src/fuchsia/write/trace_fuchsia_write.ml index 8791f9a..723b3c4 100644 --- a/src/fuchsia/write/trace_fuchsia_write.ml +++ b/src/fuchsia/write/trace_fuchsia_write.ml @@ -424,4 +424,111 @@ module Event = struct Buf.add_i64 buf end_time_ns; () end + + (** type=5 *) + module Async_begin = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* async id *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + ~(async_id : int) ~args () : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (5L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + Buf.add_i64 buf (I64.of_int async_id); + () + end + + (** type=7 *) + module Async_end = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* async id *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + ~(async_id : int) ~args () : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (7L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + Buf.add_i64 buf (I64.of_int async_id); + () + end +end + +(** record type = 7 *) +module Kernel_object = struct + let size_word ~name ~args () : int = + 1 + 1 + (* id *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + + (* see: + https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/types.h;l=441?q=ZX_OBJ_TYPE&ss=fuchsia%2Ffuchsia + *) + + type ty = int + + let ty_process : ty = 1 + let ty_thread : ty = 2 + + let encode (out : Output.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit = + let size = size_word ~name ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 7L + lor (of_int size lsl 4) + lor (of_int ty lsl 16) + lor (of_int (Arguments.len args) lsl 40) + lor (of_int (Str_ref.inline (String.length name)) lsl 24)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf (I64.of_int kid); + Buf.add_string buf name; + Arguments.encode buf args; + () end