perf fuchsia: use a stack to hold in-flight spans, not a hashtable

This commit is contained in:
Simon Cruanes 2023-12-26 22:10:17 -05:00
parent 2e4971d23d
commit bc92d97a76
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
7 changed files with 163 additions and 63 deletions

View file

@ -52,6 +52,7 @@
(ocaml (>= 4.08)) (ocaml (>= 4.08))
(trace (= :version)) (trace (= :version))
(mtime (>= 2.0)) (mtime (>= 2.0))
base-bigarray
base-unix base-unix
dune) dune)
(tags (tags

View file

@ -51,18 +51,6 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
let st = { oc; buf_pool; events } in let st = { oc; buf_pool; events } in
bg_loop st bg_loop st
(* TODO:
(* write a message about us closing *)
Writer.emit_instant_event ~name:"tef-worker.exit"
~tid:(Thread.id @@ Thread.self ())
~ts:(now_us ()) ~args:[] writer;
(* warn if app didn't close all spans *)
if Span_tbl.length spans > 0 then
Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!"
(Span_tbl.length spans);
*)
(** Thread that simply regularly "ticks", sending events to (** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file, the background thread so it has a chance to write to the file,
and call [f()] *) and call [f()] *)

View file

@ -5,12 +5,6 @@ module Buf = FWrite.Buf
module Buf_pool = FWrite.Buf_pool module Buf_pool = FWrite.Buf_pool
module Output = FWrite.Output module Output = FWrite.Output
module Span_tbl = Hashtbl.Make (struct
include Int64
let hash : t -> int = Hashtbl.hash
end)
let on_tracing_error = let on_tracing_error =
ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s) ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s)

View file

@ -5,5 +5,5 @@
(public_name trace-fuchsia) (public_name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file") (synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(libraries trace.core trace.private.util thread-local-storage (libraries trace.core trace.private.util thread-local-storage
(re_export trace-fuchsia.write) (re_export trace-fuchsia.write) bigarray
mtime mtime.clock.os atomic unix threads)) mtime mtime.clock.os atomic unix threads))

View file

@ -5,11 +5,101 @@ module Int_map = Map.Make (Int)
let pid = Unix.getpid () let pid = Unix.getpid ()
type span_info = { (** Thread-local stack of span info *)
start_time_ns: int64; module Span_info_stack : sig
name: string; type t
mutable data: (string * user_data) list;
} val create : unit -> t
val push :
t ->
span ->
name:string ->
start_time_ns:int64 ->
data:(string * user_data) list ->
unit
val pop : t -> int64 * string * int64 * (string * user_data) list
val find_ : t -> span -> int option
val add_data : t -> int -> (string * user_data) list -> unit
end = struct
module BA = Bigarray
module BA1 = Bigarray.Array1
type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t
type t = {
mutable len: int;
mutable span: int64arr;
mutable start_time_ns: int64arr;
mutable name: string array;
mutable data: (string * user_data) list array;
}
let create () : t =
{
len = 0;
span = BA1.create BA.Int64 BA.C_layout 64;
start_time_ns = BA1.create BA.Int64 BA.C_layout 64;
name = Array.make 64 "";
data = Array.make 64 [];
}
let[@inline] cap self = Array.length self.name
let grow_ (self : t) : unit =
let new_cap = 2 * cap self in
let new_span = BA1.create BA.Int64 BA.C_layout new_cap in
BA1.blit self.span (BA1.sub new_span 0 self.len);
let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in
BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len);
let new_name = Array.make new_cap "" in
Array.blit self.name 0 new_name 0 self.len;
let new_data = Array.make new_cap [] in
Array.blit self.data 0 new_data 0 self.len;
self.span <- new_span;
self.start_time_ns <- new_startime_ns;
self.name <- new_name;
self.data <- new_data
let push (self : t) (span : int64) ~name ~start_time_ns ~data =
if cap self = self.len then grow_ self;
BA1.set self.span self.len span;
BA1.set self.start_time_ns self.len start_time_ns;
Array.set self.name self.len name;
Array.set self.data self.len data;
self.len <- self.len + 1
let pop (self : t) =
assert (self.len > 0);
self.len <- self.len - 1;
let span = BA1.get self.span self.len in
let name = self.name.(self.len) in
let start_time_ns = BA1.get self.start_time_ns self.len in
let data = self.data.(self.len) in
(* avoid holding onto old values *)
Array.set self.name self.len "";
Array.set self.data self.len [];
span, name, start_time_ns, data
let[@inline] add_data self i d : unit =
assert (i < self.len);
self.data.(i) <- List.rev_append d self.data.(i)
exception Found of int
let[@inline] find_ (self : t) span : _ option =
try
for i = self.len - 1 downto 0 do
if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i)
done;
None
with Found i -> Some i
end
type async_span_info = { type async_span_info = {
async_id: int; async_id: int;
@ -33,7 +123,7 @@ type per_thread_state = {
local_span_id_gen: int A.t; (** Used for thread-local spans *) local_span_id_gen: int A.t; (** Used for thread-local spans *)
mutable thread_ref: FWrite.Thread_ref.t; mutable thread_ref: FWrite.Thread_ref.t;
mutable out: Output.t option; mutable out: Output.t option;
spans: span_info Span_tbl.t; (** In-flight spans *) spans: Span_info_stack.t; (** In-flight spans *)
} }
type state = { type state = {
@ -43,9 +133,9 @@ type state = {
bg_thread: Thread.t; bg_thread: Thread.t;
buf_pool: Buf_pool.t; buf_pool: Buf_pool.t;
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
per_thread: per_thread_state Int_map.t A.t; per_thread: per_thread_state Int_map.t A.t array;
(** the state keeps tabs on thread-local state, so it can flush writers (** the state keeps tabs on thread-local state, so it can flush writers
at the end *) at the end. This is a tid-sharded array of maps. *)
} }
let key_thread_local_st : per_thread_state TLS.key = let key_thread_local_st : per_thread_state TLS.key =
@ -57,7 +147,7 @@ let key_thread_local_st : per_thread_state TLS.key =
thread_ref = FWrite.Thread_ref.inline ~pid ~tid; thread_ref = FWrite.Thread_ref.inline ~pid ~tid;
local_span_id_gen = A.make 0; local_span_id_gen = A.make 0;
out = None; out = None;
spans = Span_tbl.create 32; spans = Span_info_stack.create ();
}) })
let out_of_st (st : state) : Output.t = let out_of_st (st : state) : Output.t =
@ -74,7 +164,8 @@ struct
let state_id = 1 + A.fetch_and_add state_id_ 1 let state_id = 1 + A.fetch_and_add state_id_ 1
(** prepare the thread's state *) (** prepare the thread's state *)
let[@inline never] update_local_state (self : per_thread_state) : unit = let[@inline never] update_or_init_local_state (self : per_thread_state) : unit
=
(* get an output *) (* get an output *)
let out = out_of_st st in let out = out_of_st st in
self.out <- Some out; self.out <- Some out;
@ -87,17 +178,22 @@ struct
); );
(* add to [st]'s list of threads *) (* add to [st]'s list of threads *)
let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in
while while
let old = A.get st.per_thread in let old = A.get shard_of_per_thread in
not (A.compare_and_set st.per_thread old (Int_map.add self.tid self old)) not
(A.compare_and_set shard_of_per_thread old
(Int_map.add self.tid self old))
do do
() ()
done; done;
let on_exit _ = let on_exit _ =
while while
let old = A.get st.per_thread in let old = A.get shard_of_per_thread in
not (A.compare_and_set st.per_thread old (Int_map.remove self.tid old)) not
(A.compare_and_set shard_of_per_thread old
(Int_map.remove self.tid old))
do do
() ()
done; done;
@ -111,21 +207,29 @@ struct
(** Obtain the output for the current thread *) (** Obtain the output for the current thread *)
let[@inline] get_thread_output () : Output.t * per_thread_state = let[@inline] get_thread_output () : Output.t * per_thread_state =
let tls = TLS.get key_thread_local_st in let tls = TLS.get key_thread_local_st in
if tls.state_id != state_id || tls.out == None then update_local_state tls; if tls.state_id != state_id || tls.out == None then
Option.get tls.out, tls update_or_init_local_state tls;
let out =
match tls.out with
| None -> assert false
| Some o -> o
in
out, tls
let close_per_thread (tls : per_thread_state) =
Option.iter Output.flush tls.out
(** flush all outputs *)
let flush_all_outputs_ () =
Array.iter
(fun shard ->
let tls_l = A.get shard in
Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l)
st.per_thread
let shutdown () = let shutdown () =
if A.exchange st.active false then ( if A.exchange st.active false then (
(* flush all outputs *) flush_all_outputs_ ();
let tls_l = A.get st.per_thread in
(* FIXME: there's a potential race condition here. How to fix it
without overhead on every regular event? *)
Int_map.iter
(fun _tid (tls : per_thread_state) ->
Printf.eprintf "flush for %d\n%!" tls.tid;
Option.iter Output.flush tls.out)
tls_l;
B_queue.close st.events; B_queue.close st.events;
(* wait for writer thread to be done. The writer thread will exit (* wait for writer thread to be done. The writer thread will exit
@ -137,32 +241,34 @@ struct
let tls = TLS.get key_thread_local_st in let tls = TLS.get key_thread_local_st in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let time_ns = Time.now_ns () in let time_ns = Time.now_ns () in
Span_tbl.add tls.spans span { name; data; start_time_ns = time_ns }; Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns;
span span
let exit_span span : unit = let exit_span span : unit =
let out, tls = get_thread_output () in let out, tls = get_thread_output () in
let end_time_ns = Time.now_ns () in let end_time_ns = Time.now_ns () in
match Span_tbl.find_opt tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span) let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in
| Some info -> if span <> span' then
Span_tbl.remove tls.spans span; !on_tracing_error
FWrite.Event.Duration_complete.encode out ~name:info.name (spf "span mismatch: top is %Ld, expected %Ld" span' span)
~t_ref:tls.thread_ref ~time_ns:info.start_time_ns ~end_time_ns else
~args:info.data () FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref
~time_ns:start_time_ns ~end_time_ns ~args:data ()
let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f = let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f =
let out, tls = get_thread_output () in let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in let time_ns = Time.now_ns () in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let info = { start_time_ns = time_ns; data; name } in Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name;
Span_tbl.add tls.spans span info;
let[@inline] exit () : unit = let[@inline] exit () : unit =
let end_time_ns = Time.now_ns () in let end_time_ns = Time.now_ns () in
Span_tbl.remove tls.spans span;
let _span', _, _, data = Span_info_stack.pop tls.spans in
assert (span = _span');
FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns
~t_ref:tls.thread_ref ~args:info.data () ~t_ref:tls.thread_ref ~args:data ()
in in
try try
@ -175,9 +281,9 @@ struct
let add_data_to_span span data = let add_data_to_span span data =
let tls = TLS.get key_thread_local_st in let tls = TLS.get key_thread_local_st in
match Span_tbl.find_opt tls.spans span with match Span_info_stack.find_ tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span) | None -> !on_tracing_error (spf "unknown span %Ld" span)
| Some info -> info.data <- List.rev_append data info.data | Some idx -> Span_info_stack.add_data tls.spans idx data
let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_ let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_
~__FILE__:_ ~__LINE__:_ ~data name : explicit_span = ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span =
@ -262,7 +368,7 @@ let create ~out () : collector =
events; events;
span_id_gen = A.make 0; span_id_gen = A.make 0;
next_thread_ref = A.make 1; next_thread_ref = A.make 1;
per_thread = A.make Int_map.empty; per_thread = Array.init 16 (fun _ -> A.make Int_map.empty);
} }
in in

View file

@ -10,10 +10,20 @@ let create ~(buf_pool : Buf_pool.t) ~send_buf () : t =
{ buf; send_buf; buf_pool } { buf; send_buf; buf_pool }
open struct open struct
(* NOTE: there is a potential race condition if an output is
flushed from the main thread upon closing, while
the local thread is blissfully writing new records to it
as we're winding down the collector. This is trying to reduce
the likelyhood of a race happening. *)
let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t =
let old_buf = self.buf in
self.buf <- new_buf;
old_buf
let flush_ (self : t) : unit = let flush_ (self : t) : unit =
self.send_buf self.buf; let new_buf = Buf_pool.alloc self.buf_pool in
let buf = Buf_pool.alloc self.buf_pool in let old_buf = replace_buf_ self new_buf in
self.buf <- buf self.send_buf old_buf
let[@inline never] cycle_buf (self : t) ~available : Buf.t = let[@inline never] cycle_buf (self : t) ~available : Buf.t =
flush_ self; flush_ self;

View file

@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.08"} "ocaml" {>= "4.08"}
"trace" {= version} "trace" {= version}
"mtime" {>= "2.0"} "mtime" {>= "2.0"}
"base-bigarray"
"base-unix" "base-unix"
"dune" {>= "2.9"} "dune" {>= "2.9"}
"odoc" {with-doc} "odoc" {with-doc}