Merge pull request #24 from c-cube/wip-fuchsia

fuchsia trace format
This commit is contained in:
Simon Cruanes 2024-01-09 11:41:28 -05:00 committed by GitHub
commit 987b57191c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 1694 additions and 38 deletions

2
.gitignore vendored
View file

@ -2,3 +2,5 @@ _opam
_build
*.json
*.exe
perf.*
*.fxt

View file

@ -117,6 +117,21 @@ let f x y z =
raise e
```
Alternatively, a name can be provided for the span, which is useful if you want
to access it and use functions like `Trace.add_data_to_span`:
```ocaml
let%trace f x y z =
do_sth x;
do_sth y;
begin
let%trace _sp = "sub-span" in
do_sth z;
Trace.add_data_to_span _sp ["x", `Int 42]
end
```
### Dune configuration
In your `library` or `executable` stanza, add: `(preprocess (pps ppx_trace))`.

View file

@ -0,0 +1,58 @@
open Trace_fuchsia_write
module B = Benchmark
let pf = Printf.printf
let encode_1_span (out : Output.t) () =
Event.Duration_complete.encode out ~name:"span" ~t_ref:(Thread_ref.Ref 5)
~time_ns:100_000L ~end_time_ns:5_000_000L ~args:[] ()
let encode_3_span (out : Output.t) () =
Event.Duration_complete.encode out ~name:"outer" ~t_ref:(Thread_ref.Ref 5)
~time_ns:100_000L ~end_time_ns:5_000_000L ~args:[] ();
Event.Duration_complete.encode out ~name:"inner" ~t_ref:(Thread_ref.Ref 5)
~time_ns:180_000L ~end_time_ns:4_500_000L ~args:[] ();
Event.Instant.encode out ~name:"hello" ~time_ns:1_234_567L
~t_ref:(Thread_ref.Ref 5)
~args:[ "x", `Int 42 ]
()
let time_per_iter_ns (samples : B.t list) : float =
let n_iters = ref 0L in
let time = ref 0. in
List.iter
(fun (s : B.t) ->
n_iters := Int64.add !n_iters s.iters;
time := !time +. s.stime +. s.utime)
samples;
!time *. 1e9 /. Int64.to_float !n_iters
let () =
let buf_pool = Buf_pool.create () in
let out =
Output.create ~buf_pool
~send_buf:(fun buf -> Buf_pool.recycle buf_pool buf)
()
in
let samples = B.throughput1 4 ~name:"encode_1_span" (encode_1_span out) () in
B.print_gc samples;
let [ (_, samples) ] = samples [@@warning "-8"] in
let iter_per_ns = time_per_iter_ns samples in
pf "%.3f ns/iter\n" iter_per_ns;
()
let () =
let buf_pool = Buf_pool.create () in
let out =
Output.create ~buf_pool
~send_buf:(fun buf -> Buf_pool.recycle buf_pool buf)
()
in
let samples = B.throughput1 4 ~name:"encode_3_span" (encode_3_span out) () in
B.print_gc samples;
()

View file

@ -1,4 +1,16 @@
(executable
(name trace1)
(modules trace1)
(libraries trace.core trace-tef))
(executable
(name trace_fx)
(modules trace_fx)
(preprocess (pps ppx_trace))
(libraries trace.core trace-fuchsia))
(executable
(name bench_fuchsia_write)
(modules bench_fuchsia_write)
(libraries benchmark trace-fuchsia.write))

50
bench/trace_fx.ml Normal file
View file

@ -0,0 +1,50 @@
module Trace = Trace_core
let ( let@ ) = ( @@ )
let work ~dom_idx ~n () : unit =
Trace_core.set_thread_name (Printf.sprintf "worker%d" dom_idx);
for _i = 1 to n do
let%trace _sp = "outer" in
Trace_core.add_data_to_span _sp [ "i", `Int _i ];
for _k = 1 to 10 do
let%trace _sp = "inner" in
()
done;
(* Thread.delay 1e-6 *)
if dom_idx = 0 && _i mod 4096 = 0 then (
Trace_core.message "gc stats";
let stat = Gc.quick_stat () in
Trace_core.counter_float "gc.minor" (8. *. stat.minor_words);
Trace_core.counter_float "gc.major" (8. *. stat.major_words)
)
done
let main ~n ~j () : unit =
let domains =
Array.init j (fun dom_idx -> Domain.spawn (fun () -> work ~dom_idx ~n ()))
in
let%trace () = "join" in
Array.iter Domain.join domains
let () =
let@ () = Trace_fuchsia.with_setup () in
Trace_core.set_process_name "trace_fxt1";
Trace_core.set_thread_name "main";
let%trace () = "main" in
let n = ref 10_000 in
let j = ref 4 in
let args =
[
"-n", Arg.Set_int n, " number of iterations";
"-j", Arg.Set_int j, " set number of workers";
]
|> Arg.align
in
Arg.parse args ignore "bench1";
main ~n:!n ~j:!j ()

3
bench_fx.sh Executable file
View file

@ -0,0 +1,3 @@
#!/bin/sh
DUNE_OPTS="--profile=release --display=quiet"
exec dune exec $DUNE_OPTS bench/trace_fx.exe -- $@

2
dune
View file

@ -1,4 +1,4 @@
(env
(_ (flags :standard -strict-sequence -warn-error -a+8+26+27 -w +a-4-40-70)))
(_ (flags :standard -strict-sequence -warn-error -a+8+26+27 -w +a-4-40-42-44-70)))

View file

@ -41,9 +41,21 @@
(trace (= :version))
(mtime (>= 2.0))
base-unix
atomic
dune)
(tags
(trace tracing catapult)))
(package
(name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(depends
(ocaml (>= 4.08))
(trace (= :version))
(mtime (>= 2.0))
base-bigarray
base-unix
dune)
(tags
(trace tracing fuchsia)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

63
src/fuchsia/bg_thread.ml Normal file
View file

@ -0,0 +1,63 @@
open Common_
type out =
[ `Stdout
| `Stderr
| `File of string
]
type event =
| E_write_buf of Buf.t
| E_tick
type state = {
buf_pool: Buf_pool.t;
oc: out_channel;
events: event B_queue.t;
}
let with_out_ (out : out) f =
let oc, must_close =
match out with
| `Stdout -> stdout, false
| `Stderr -> stderr, false
| `File path -> open_out path, true
in
if must_close then (
let finally () = close_out_noerr oc in
Fun.protect ~finally (fun () -> f oc)
) else
f oc
let handle_ev (self : state) (ev : event) : unit =
match ev with
| E_tick -> flush self.oc
| E_write_buf buf ->
output self.oc buf.buf 0 buf.offset;
Buf_pool.recycle self.buf_pool buf
let bg_loop (self : state) : unit =
let continue = ref true in
while !continue do
match B_queue.pop_all self.events with
| exception B_queue.Closed -> continue := false
| evs -> List.iter (handle_ev self) evs
done
let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
let@ oc = with_out_ out in
let st = { oc; buf_pool; events } in
bg_loop st
(** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file,
and call [f()] *)
let tick_thread events : unit =
try
while true do
Thread.delay 0.5;
B_queue.push events E_tick
done
with B_queue.Closed -> ()

12
src/fuchsia/common_.ml Normal file
View file

@ -0,0 +1,12 @@
module A = Trace_core.Internal_.Atomic_
module FWrite = Trace_fuchsia_write
module B_queue = Trace_private_util.B_queue
module Buf = FWrite.Buf
module Buf_pool = FWrite.Buf_pool
module Output = FWrite.Output
let on_tracing_error =
ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s)
let ( let@ ) = ( @@ )
let spf = Printf.sprintf

9
src/fuchsia/dune Normal file
View file

@ -0,0 +1,9 @@
(library
(name trace_fuchsia)
(public_name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(libraries trace.core trace.private.util thread-local-storage
(re_export trace-fuchsia.write) bigarray
mtime mtime.clock.os atomic unix threads))

395
src/fuchsia/fcollector.ml Normal file
View file

@ -0,0 +1,395 @@
open Trace_core
open Common_
module TLS = Thread_local_storage
module Int_map = Map.Make (Int)
let pid = Unix.getpid ()
(** Thread-local stack of span info *)
module Span_info_stack : sig
type t
val create : unit -> t
val push :
t ->
span ->
name:string ->
start_time_ns:int64 ->
data:(string * user_data) list ->
unit
val pop : t -> int64 * string * int64 * (string * user_data) list
val find_ : t -> span -> int option
val add_data : t -> int -> (string * user_data) list -> unit
end = struct
module BA = Bigarray
module BA1 = Bigarray.Array1
type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t
type t = {
mutable len: int;
mutable span: int64arr;
mutable start_time_ns: int64arr;
mutable name: string array;
mutable data: (string * user_data) list array;
}
let init_size_ = 1
let create () : t =
{
len = 0;
span = BA1.create BA.Int64 BA.C_layout init_size_;
start_time_ns = BA1.create BA.Int64 BA.C_layout init_size_;
name = Array.make init_size_ "";
data = Array.make init_size_ [];
}
let[@inline] cap self = Array.length self.name
let grow_ (self : t) : unit =
let new_cap = 2 * cap self in
let new_span = BA1.create BA.Int64 BA.C_layout new_cap in
BA1.blit self.span (BA1.sub new_span 0 self.len);
let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in
BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len);
let new_name = Array.make new_cap "" in
Array.blit self.name 0 new_name 0 self.len;
let new_data = Array.make new_cap [] in
Array.blit self.data 0 new_data 0 self.len;
self.span <- new_span;
self.start_time_ns <- new_startime_ns;
self.name <- new_name;
self.data <- new_data
let push (self : t) (span : int64) ~name ~start_time_ns ~data =
if cap self = self.len then grow_ self;
BA1.set self.span self.len span;
BA1.set self.start_time_ns self.len start_time_ns;
Array.set self.name self.len name;
Array.set self.data self.len data;
self.len <- self.len + 1
let pop (self : t) =
assert (self.len > 0);
self.len <- self.len - 1;
let span = BA1.get self.span self.len in
let name = self.name.(self.len) in
let start_time_ns = BA1.get self.start_time_ns self.len in
let data = self.data.(self.len) in
(* avoid holding onto old values *)
Array.set self.name self.len "";
Array.set self.data self.len [];
span, name, start_time_ns, data
let[@inline] add_data self i d : unit =
assert (i < self.len);
self.data.(i) <- List.rev_append d self.data.(i)
exception Found of int
let[@inline] find_ (self : t) span : _ option =
try
for i = self.len - 1 downto 0 do
if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i)
done;
None
with Found i -> Some i
end
type async_span_info = {
async_id: int;
flavor: [ `Sync | `Async ] option;
name: string;
mutable data: (string * user_data) list;
}
let key_async_data : async_span_info Meta_map.Key.t = Meta_map.Key.create ()
open struct
let state_id_ = A.make 0
(* re-raise exception with its backtrace *)
external reraise : exn -> 'a = "%reraise"
end
type per_thread_state = {
tid: int;
state_id: int; (** ID of the current collector state *)
local_span_id_gen: int A.t; (** Used for thread-local spans *)
mutable thread_ref: FWrite.Thread_ref.t;
mutable out: Output.t option;
spans: Span_info_stack.t; (** In-flight spans *)
}
type state = {
active: bool A.t;
events: Bg_thread.event B_queue.t;
span_id_gen: int A.t; (** Used for async spans *)
bg_thread: Thread.t;
buf_pool: Buf_pool.t;
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
per_thread: per_thread_state Int_map.t A.t array;
(** the state keeps tabs on thread-local state, so it can flush writers
at the end. This is a tid-sharded array of maps. *)
}
let key_thread_local_st : per_thread_state TLS.key =
TLS.new_key (fun () ->
let tid = Thread.id @@ Thread.self () in
{
tid;
state_id = A.get state_id_;
thread_ref = FWrite.Thread_ref.inline ~pid ~tid;
local_span_id_gen = A.make 0;
out = None;
spans = Span_info_stack.create ();
})
let out_of_st (st : state) : Output.t =
FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf ->
try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ())
module C (St : sig
val st : state
end)
() =
struct
open St
let state_id = 1 + A.fetch_and_add state_id_ 1
(** prepare the thread's state *)
let[@inline never] update_or_init_local_state (self : per_thread_state) : unit
=
(* get an output *)
let out = out_of_st st in
self.out <- Some out;
(* try to allocate a thread ref for current thread *)
let th_ref = A.fetch_and_add st.next_thread_ref 1 in
if th_ref <= 0xff then (
self.thread_ref <- FWrite.Thread_ref.ref th_ref;
FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid ()
);
(* add to [st]'s list of threads *)
let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in
while
let old = A.get shard_of_per_thread in
not
(A.compare_and_set shard_of_per_thread old
(Int_map.add self.tid self old))
do
()
done;
let on_exit _ =
while
let old = A.get shard_of_per_thread in
not
(A.compare_and_set shard_of_per_thread old
(Int_map.remove self.tid old))
do
()
done;
Option.iter Output.flush self.out
in
(* after thread exits, flush output and remove from global list *)
Gc.finalise on_exit (Thread.self ());
()
(** Obtain the output for the current thread *)
let[@inline] get_thread_output () : Output.t * per_thread_state =
let tls = TLS.get key_thread_local_st in
if tls.state_id != state_id || tls.out == None then
update_or_init_local_state tls;
let out =
match tls.out with
| None -> assert false
| Some o -> o
in
out, tls
let close_per_thread (tls : per_thread_state) =
Option.iter Output.flush tls.out
(** flush all outputs *)
let flush_all_outputs_ () =
Array.iter
(fun shard ->
let tls_l = A.get shard in
Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l)
st.per_thread
let shutdown () =
if A.exchange st.active false then (
flush_all_outputs_ ();
B_queue.close st.events;
(* wait for writer thread to be done. The writer thread will exit
after processing remaining events because the queue is now closed *)
Thread.join st.bg_thread
)
let enter_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : span =
let tls = TLS.get key_thread_local_st in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let time_ns = Time.now_ns () in
Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns;
span
let exit_span span : unit =
let out, tls = get_thread_output () in
let end_time_ns = Time.now_ns () in
let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in
if span <> span' then
!on_tracing_error
(spf "span mismatch: top is %Ld, expected %Ld" span' span)
else
FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref
~time_ns:start_time_ns ~end_time_ns ~args:data ()
let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name;
let[@inline] exit () : unit =
let end_time_ns = Time.now_ns () in
let _span', _, _, data = Span_info_stack.pop tls.spans in
assert (span = _span');
FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns
~t_ref:tls.thread_ref ~args:data ()
in
try
let x = f span in
exit ();
x
with exn ->
exit ();
reraise exn
let add_data_to_span span data =
let tls = TLS.get key_thread_local_st in
match Span_info_stack.find_ tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span)
| Some idx -> Span_info_stack.add_data tls.spans idx data
let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_
~__FILE__:_ ~__LINE__:_ ~data name : explicit_span =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
(* get the id, or make a new one *)
let async_id =
match parent with
| Some m -> (Meta_map.find_exn key_async_data m.meta).async_id
| None -> A.fetch_and_add st.span_id_gen 1
in
FWrite.Event.Async_begin.encode out ~name ~args:data ~t_ref:tls.thread_ref
~time_ns ~async_id ();
{
span = 0L;
meta =
Meta_map.(
empty |> add key_async_data { async_id; name; flavor; data = [] });
}
let exit_manual_span (es : explicit_span) : unit =
let { async_id; name; data; flavor = _ } =
Meta_map.find_exn key_async_data es.meta
in
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Async_end.encode out ~name ~t_ref:tls.thread_ref ~time_ns
~args:data ~async_id ()
let add_data_to_manual_span (es : explicit_span) data =
let m = Meta_map.find_exn key_async_data es.meta in
m.data <- List.rev_append data m.data
let message ?span:_ ~data msg : unit =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Instant.encode out ~name:msg ~time_ns ~t_ref:tls.thread_ref
~args:data ()
let counter_float ~data name f =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref
~args:((name, `Float f) :: data)
()
let counter_int ~data name i =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref
~args:((name, `Int i) :: data)
()
let name_process name : unit =
let out, _tls = get_thread_output () in
FWrite.Kernel_object.(encode out ~name ~ty:ty_process ~kid:pid ~args:[] ())
let name_thread name : unit =
let out, tls = get_thread_output () in
FWrite.Kernel_object.(
encode out ~name ~ty:ty_thread ~kid:tls.tid
~args:[ "process", `Kid pid ]
())
end
let create ~out () : collector =
let buf_pool = Buf_pool.create () in
let events = B_queue.create () in
let bg_thread =
Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) ()
in
let st =
{
active = A.make true;
buf_pool;
bg_thread;
events;
span_id_gen = A.make 0;
next_thread_ref = A.make 1;
per_thread = Array.init 16 (fun _ -> A.make Int_map.empty);
}
in
let _tick_thread = Thread.create (fun () -> Bg_thread.tick_thread events) in
(* write header *)
let out = out_of_st st in
FWrite.Metadata.Magic_record.encode out;
FWrite.Metadata.Initialization_record.(
encode out ~ticks_per_secs:default_ticks_per_sec ());
FWrite.Metadata.Provider_info.encode out ~id:0 ~name:"ocaml-trace" ();
Output.flush out;
Output.dispose out;
let module Coll =
C
(struct
let st = st
end)
()
in
(module Coll)

View file

@ -0,0 +1,3 @@
open Trace_core
val create : out:Bg_thread.out -> unit -> collector

6
src/fuchsia/time.ml Normal file
View file

@ -0,0 +1,6 @@
let counter = Mtime_clock.counter ()
(** Now, in nanoseconds *)
let[@inline] now_ns () : int64 =
let t = Mtime_clock.count counter in
Mtime.Span.to_uint64_ns t

View file

@ -0,0 +1,38 @@
open Common_
type output =
[ `Stdout
| `Stderr
| `File of string
]
let collector = Fcollector.create
let setup ?(out = `Env) () =
match out with
| `Stderr -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr ()
| `Stdout -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout ()
| `File path ->
Trace_core.setup_collector @@ Fcollector.create ~out:(`File path) ()
| `Env ->
(match Sys.getenv_opt "TRACE" with
| Some ("1" | "true") ->
let path = "trace.fxt" in
let c = Fcollector.create ~out:(`File path) () in
Trace_core.setup_collector c
| Some "stdout" ->
Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout ()
| Some "stderr" ->
Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr ()
| Some path ->
let c = Fcollector.create ~out:(`File path) () in
Trace_core.setup_collector c
| None -> ())
let with_setup ?out () f =
setup ?out ();
Fun.protect ~finally:Trace_core.shutdown f
module Internal_ = struct
let on_tracing_error = on_tracing_error
end

View file

@ -0,0 +1,46 @@
val collector :
out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector
(** Make a collector that writes into the given output.
See {!setup} for more details. *)
type output =
[ `Stdout
| `Stderr
| `File of string
]
(** Output for tracing.
- [`Stdout] will enable tracing and print events on stdout
- [`Stderr] will enable tracing and print events on stderr
- [`File "foo"] will enable tracing and print events into file
named "foo"
*)
val setup : ?out:[ output | `Env ] -> unit -> unit
(** [setup ()] installs the collector depending on [out].
@param out can take different values:
- regular {!output} value to specify where events go
- [`Env] will enable tracing if the environment
variable "TRACE" is set.
- If it's set to "1", then the file is "trace.fxt".
- If it's set to "stdout", then logging happens on stdout (since 0.2)
- If it's set to "stderr", then logging happens on stdout (since 0.2)
- Otherwise, if it's set to a non empty string, the value is taken
to be the file path into which to write.
*)
val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
(** [with_setup () f] (optionally) sets a collector up, calls [f()],
and makes sure to shutdown before exiting.
since 0.2 a () argument was added.
*)
(**/**)
module Internal_ : sig
val on_tracing_error : (string -> unit) ref
end
(**/**)

42
src/fuchsia/write/buf.ml Normal file
View file

@ -0,0 +1,42 @@
open Util
type t = {
buf: bytes;
mutable offset: int;
}
let empty : t = { buf = Bytes.empty; offset = 0 }
let create (n : int) : t =
let buf = Bytes.create (round_to_word n) in
{ buf; offset = 0 }
let[@inline] clear self = self.offset <- 0
let[@inline] available self = Bytes.length self.buf - self.offset
let[@inline] size self = self.offset
(* see below: we assume little endian *)
let () = assert (not Sys.big_endian)
let[@inline] add_i64 (self : t) (i : int64) : unit =
(* NOTE: we use LE, most systems are this way, even though fuchsia
says we should use the system's native endianess *)
Bytes.set_int64_le self.buf self.offset i;
self.offset <- self.offset + 8
let[@inline] add_string (self : t) (s : string) : unit =
let len = String.length s in
let missing = missing_to_round len in
(* bound check *)
assert (len + missing + self.offset <= Bytes.length self.buf);
Bytes.unsafe_blit_string s 0 self.buf self.offset len;
self.offset <- self.offset + len;
(* add 0-padding *)
if missing != 0 then (
Bytes.unsafe_fill self.buf self.offset missing '\x00';
self.offset <- self.offset + missing
)
let to_string (self : t) : string = Bytes.sub_string self.buf 0 self.offset

View file

@ -0,0 +1,58 @@
open struct
module A = Trace_core.Internal_.Atomic_
exception Got_buf of Buf.t
end
module List_with_len = struct
type +'a t =
| Nil
| Cons of int * 'a * 'a t
let empty : _ t = Nil
let[@inline] len = function
| Nil -> 0
| Cons (i, _, _) -> i
let[@inline] cons x self = Cons (len self + 1, x, self)
end
type t = {
max_len: int;
buf_size: int;
bufs: Buf.t List_with_len.t A.t;
}
let create ?(max_len = 64) ?(buf_size = 1 lsl 16) () : t =
let buf_size = min (1 lsl 22) (max buf_size (1 lsl 15)) in
{ max_len; buf_size; bufs = A.make List_with_len.empty }
let alloc (self : t) : Buf.t =
try
while
match A.get self.bufs with
| Nil -> false
| Cons (_, buf, tl) as old ->
if A.compare_and_set self.bufs old tl then
raise (Got_buf buf)
else
false
do
()
done;
Buf.create self.buf_size
with Got_buf b -> b
let recycle (self : t) (buf : Buf.t) : unit =
Buf.clear buf;
try
while
match A.get self.bufs with
| Cons (i, _, _) when i >= self.max_len -> raise Exit
| old ->
not (A.compare_and_set self.bufs old (List_with_len.cons buf old))
do
()
done
with Exit -> () (* do not recycle *)

9
src/fuchsia/write/dune Normal file
View file

@ -0,0 +1,9 @@
(library
(name trace_fuchsia_write)
(public_name trace-fuchsia.write)
(synopsis "Serialization part of trace-fuchsia")
(ocamlopt_flags :standard -S
;-dlambda
)
(libraries trace.core threads))

View file

@ -0,0 +1,56 @@
type t = {
mutable buf: Buf.t;
mutable send_buf: Buf.t -> unit;
buf_pool: Buf_pool.t;
}
let create ~(buf_pool : Buf_pool.t) ~send_buf () : t =
let buf_size = buf_pool.buf_size in
let buf = Buf.create buf_size in
{ buf; send_buf; buf_pool }
open struct
(* NOTE: there is a potential race condition if an output is
flushed from the main thread upon closing, while
the local thread is blissfully writing new records to it
as we're winding down the collector. This is trying to reduce
the likelyhood of a race happening. *)
let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t =
let old_buf = self.buf in
self.buf <- new_buf;
old_buf
let flush_ (self : t) : unit =
let new_buf = Buf_pool.alloc self.buf_pool in
let old_buf = replace_buf_ self new_buf in
self.send_buf old_buf
let[@inline never] cycle_buf (self : t) ~available : Buf.t =
flush_ self;
let buf = self.buf in
if Buf.available buf < available then
failwith "fuchsia: buffer is too small";
buf
end
let[@inline] flush (self : t) : unit = if Buf.size self.buf > 0 then flush_ self
(** Obtain a buffer with at least [available] bytes *)
let[@inline] get_buf (self : t) ~(available_word : int) : Buf.t =
let available = available_word lsl 3 in
if Buf.available self.buf >= available then
self.buf
else
cycle_buf self ~available
let into_buffer ~buf_pool (buffer : Buffer.t) : t =
let send_buf (buf : Buf.t) =
Buffer.add_subbytes buffer buf.buf 0 buf.offset
in
create ~buf_pool ~send_buf ()
let dispose (self : t) : unit =
flush_ self;
Buf_pool.recycle self.buf_pool self.buf;
self.buf <- Buf.empty

View file

@ -0,0 +1,541 @@
(** Write fuchsia events into buffers.
Reference: https://fuchsia.dev/fuchsia-src/reference/tracing/trace-format *)
module Util = Util
module Buf = Buf
module Output = Output
module Buf_pool = Buf_pool
open struct
let spf = Printf.sprintf
end
open Util
type user_data = Trace_core.user_data
module I64 = struct
include Int64
let ( + ) = add
let ( - ) = sub
let ( = ) = equal
let ( land ) = logand
let ( lor ) = logor
let lnot = lognot
let ( lsl ) = shift_left
let ( lsr ) = shift_right_logical
let ( asr ) = shift_right
end
module Str_ref = struct
type t = int
(** 16 bits *)
let inline (size : int) : t =
if size > 32_000 then invalid_arg "fuchsia: max length of strings is 20_000";
if size = 0 then
0
else
(1 lsl 15) lor size
end
module Thread_ref = struct
type t =
| Ref of int
| Inline of {
pid: int;
tid: int;
}
let inline ~pid ~tid : t = Inline { pid; tid }
let ref x : t =
if x = 0 || x > 255 then
invalid_arg "fuchsia: thread inline ref must be >0 < 256";
Ref x
let size_word (self : t) : int =
match self with
| Ref _ -> 0
| Inline _ -> 2
(** 8-bit int for the reference *)
let as_i8 (self : t) : int =
match self with
| Ref i -> i
| Inline _ -> 0
end
(** record type = 0 *)
module Metadata = struct
(** First record in the trace *)
module Magic_record = struct
let value = 0x0016547846040010L
let size_word = 1
let encode (out : Output.t) =
let buf = Output.get_buf out ~available_word:size_word in
Buf.add_i64 buf value
end
module Initialization_record = struct
let size_word = 2
(** Default: 1 tick = 1 ns *)
let default_ticks_per_sec = 1_000_000_000L
let encode (out : Output.t) ~ticks_per_secs () : unit =
let buf = Output.get_buf out ~available_word:size_word in
let hd = I64.(1L lor (of_int size_word lsl 4)) in
Buf.add_i64 buf hd;
Buf.add_i64 buf ticks_per_secs
end
module Provider_info = struct
let size_word ~name () = 1 + (round_to_word (String.length name) lsr 3)
let encode (out : Output.t) ~(id : int) ~name () : unit =
let size = size_word ~name () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
(of_int size lsl 4)
lor (1L lsl 16)
lor (of_int id lsl 20)
lor (of_int (Str_ref.inline (String.length name)) lsl 52))
in
Buf.add_i64 buf hd;
Buf.add_string buf name
end
module Provider_section = struct end
module Trace_info = struct end
end
module Argument = struct
type 'a t = string * ([< user_data | `Kid of int ] as 'a)
let check_valid _ = ()
(* TODO: check string length *)
let[@inline] is_i32_ (i : int) : bool = Int32.(to_int (of_int i) = i)
let size_word (self : _ t) =
let name, data = self in
match data with
| `None | `Bool _ -> 1 + (round_to_word (String.length name) lsr 3)
| `Int i when is_i32_ i -> 1 + (round_to_word (String.length name) lsr 3)
| `Int _ -> (* int64 *) 2 + (round_to_word (String.length name) lsr 3)
| `Float _ -> 2 + (round_to_word (String.length name) lsr 3)
| `String s ->
1
+ (round_to_word (String.length s) lsr 3)
+ (round_to_word (String.length name) lsr 3)
| `Kid _ -> 2 + (round_to_word (String.length name) lsr 3)
open struct
external int_of_bool : bool -> int = "%identity"
end
let encode (buf : Buf.t) (self : _ t) : unit =
let name, data = self in
let size = size_word self in
(* part of header with argument name + size *)
let hd_arg_size =
I64.(
(of_int size lsl 4)
lor (of_int (Str_ref.inline (String.length name)) lsl 16))
in
match data with
| `None ->
let hd = hd_arg_size in
Buf.add_i64 buf hd;
Buf.add_string buf name
| `Int i when is_i32_ i ->
let hd = I64.(1L lor hd_arg_size lor (of_int i lsl 32)) in
Buf.add_i64 buf hd;
Buf.add_string buf name
| `Int i ->
(* int64 *)
let hd = I64.(3L lor hd_arg_size) in
Buf.add_i64 buf hd;
Buf.add_string buf name;
Buf.add_i64 buf (I64.of_int i)
| `Float f ->
let hd = I64.(5L lor hd_arg_size) in
Buf.add_i64 buf hd;
Buf.add_string buf name;
Buf.add_i64 buf (I64.bits_of_float f)
| `String s ->
let hd =
I64.(
6L lor hd_arg_size
lor (of_int (Str_ref.inline (String.length s)) lsl 32))
in
Buf.add_i64 buf hd;
Buf.add_string buf name;
Buf.add_string buf s
| `Bool b ->
let hd = I64.(9L lor hd_arg_size lor (of_int (int_of_bool b) lsl 16)) in
Buf.add_i64 buf hd;
Buf.add_string buf name
| `Kid kid ->
(* int64 *)
let hd = I64.(8L lor hd_arg_size) in
Buf.add_i64 buf hd;
Buf.add_string buf name;
Buf.add_i64 buf (I64.of_int kid)
end
module Arguments = struct
type 'a t = 'a Argument.t list
let[@inline] len (self : _ t) : int =
match self with
| [] -> 0
| [ _ ] -> 1
| _ :: _ :: tl -> 2 + List.length tl
let check_valid (self : _ t) =
let len = len self in
if len > 15 then
invalid_arg (spf "fuchsia: can have at most 15 args, got %d" len);
List.iter Argument.check_valid self;
()
let[@inline] size_word (self : _ t) =
match self with
| [] -> 0
| [ a ] -> Argument.size_word a
| a :: b :: tl ->
List.fold_left
(fun n arg -> n + Argument.size_word arg)
(Argument.size_word a + Argument.size_word b)
tl
let[@inline] encode (buf : Buf.t) (self : _ t) =
let rec aux buf l =
match l with
| [] -> ()
| x :: tl ->
Argument.encode buf x;
aux buf tl
in
match self with
| [] -> ()
| [ x ] -> Argument.encode buf x
| x :: tl ->
Argument.encode buf x;
aux buf tl
end
(** record type = 3 *)
module Thread_record = struct
let size_word : int = 3
(** Record that [Thread_ref.ref as_ref] represents the pair [pid, tid] *)
let encode (out : Output.t) ~as_ref ~pid ~tid () : unit =
if as_ref <= 0 || as_ref > 255 then
invalid_arg "fuchsia: thread_record: invalid ref";
let buf = Output.get_buf out ~available_word:size_word in
let hd = I64.(3L lor (of_int size_word lsl 4) lor (of_int as_ref lsl 16)) in
Buf.add_i64 buf hd;
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
end
(** record type = 4 *)
module Event = struct
(** type=0 *)
module Instant = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) / 8)
+ Arguments.size_word args
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
(* set category = 0 *)
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
()
end
(** type=1 *)
module Counter = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args + 1 (* counter id *)
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (1L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
(* just use 0 as counter id *)
Buf.add_i64 buf 0L;
()
end
(** type=2 *)
module Duration_begin = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (2L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
()
end
(** type=3 *)
module Duration_end = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
: unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (3L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
()
end
(** type=4 *)
module Duration_complete = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args + 1 (* end timestamp *)
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
~end_time_ns ~args () : unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
(* set category = 0 *)
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (4L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
Buf.add_i64 buf end_time_ns;
()
end
(** type=5 *)
module Async_begin = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args + 1 (* async id *)
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
~(async_id : int) ~args () : unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (5L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
Buf.add_i64 buf (I64.of_int async_id);
()
end
(** type=7 *)
module Async_end = struct
let size_word ~name ~t_ref ~args () : int =
1 + Thread_ref.size_word t_ref + 1
(* timestamp *) + (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args + 1 (* async id *)
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
~(async_id : int) ~args () : unit =
let size = size_word ~name ~t_ref ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
4L
lor (of_int size lsl 4)
lor (7L lsl 16)
lor (of_int (Arguments.len args) lsl 20)
lor (of_int (Thread_ref.as_i8 t_ref) lsl 24)
lor (of_int (Str_ref.inline (String.length name)) lsl 48))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf time_ns;
(match t_ref with
| Thread_ref.Inline { pid; tid } ->
Buf.add_i64 buf (I64.of_int pid);
Buf.add_i64 buf (I64.of_int tid)
| Thread_ref.Ref _ -> ());
Buf.add_string buf name;
Arguments.encode buf args;
Buf.add_i64 buf (I64.of_int async_id);
()
end
end
(** record type = 7 *)
module Kernel_object = struct
let size_word ~name ~args () : int =
1 + 1
+ (round_to_word (String.length name) lsr 3)
+ Arguments.size_word args
(* see:
https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/types.h;l=441?q=ZX_OBJ_TYPE&ss=fuchsia%2Ffuchsia
*)
type ty = int
let ty_process : ty = 1
let ty_thread : ty = 2
let encode (out : Output.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit =
let size = size_word ~name ~args () in
let buf = Output.get_buf out ~available_word:size in
let hd =
I64.(
7L
lor (of_int size lsl 4)
lor (of_int ty lsl 16)
lor (of_int (Arguments.len args) lsl 40)
lor (of_int (Str_ref.inline (String.length name)) lsl 24))
in
Buf.add_i64 buf hd;
Buf.add_i64 buf (I64.of_int kid);
Buf.add_string buf name;
Arguments.encode buf args;
()
end

View file

@ -0,0 +1,5 @@
(** How many bytes are missing for [n] to be a multiple of 8 *)
let[@inline] missing_to_round (n : int) : int = lnot (n - 1) land 0b111
(** Round up to a multiple of 8 *)
let[@inline] round_to_word (n : int) : int = n + (lnot (n - 1) land 0b111)

View file

@ -8,19 +8,30 @@ let location_errorf ~loc fmt =
(** {2 let expression} *)
let expand_let ~ctxt (name : string) body =
let expand_let ~ctxt (var : [ `Var of label loc | `Unit ]) (name : string) body
=
let loc = Expansion_context.Extension.extension_point_loc ctxt in
Ast_builder.Default.(
let var_pat =
match var with
| `Var v -> ppat_var ~loc:v.loc v
| `Unit -> ppat_var ~loc { loc; txt = "_trace_span" }
in
let var_exp =
match var with
| `Var v -> pexp_ident ~loc:v.loc { txt = lident v.txt; loc = v.loc }
| `Unit -> [%expr _trace_span]
in
[%expr
let _trace_span =
let [%p var_pat] =
Trace_core.enter_span ~__FILE__ ~__LINE__ [%e estring ~loc name]
in
try
let res = [%e body] in
Trace_core.exit_span _trace_span;
Trace_core.exit_span [%e var_exp];
res
with exn ->
Trace_core.exit_span _trace_span;
Trace_core.exit_span [%e var_exp];
raise exn])
let extension_let =
@ -29,7 +40,13 @@ let extension_let =
single_expr_payload
(pexp_let nonrecursive
(value_binding
~pat:(ppat_construct (lident (string "()")) none)
~pat:
(let pat_var = ppat_var __' |> map ~f:(fun f v -> f (`Var v)) in
let pat_unit =
as__ @@ ppat_construct (lident (string "()")) none
|> map ~f:(fun f _ -> f `Unit)
in
alt pat_var pat_unit)
~expr:(estring __)
^:: nil)
__))

View file

@ -3,7 +3,4 @@
(name trace_tef)
(public_name trace-tef)
(synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process")
(libraries trace.core mtime mtime.clock.os atomic unix threads
(select relax_.ml from
(base-domain -> relax_.real.ml)
( -> relax_.dummy.ml))))
(libraries trace.core trace.private.util mtime mtime.clock.os unix threads))

View file

@ -1 +0,0 @@
let cpu_relax = Domain.cpu_relax

View file

@ -1,4 +1,5 @@
open Trace_core
open Trace_private_util
module A = Trace_core.Internal_.Atomic_
module Mock_ = struct
@ -14,7 +15,7 @@ end
let counter = Mtime_clock.counter ()
(** Now, in microseconds *)
let now_us () : float =
let[@inline] now_us () : float =
if !Mock_.enabled then
Mock_.now_us ()
else (
@ -22,16 +23,6 @@ let now_us () : float =
Mtime.Span.to_float_ns t /. 1e3
)
let protect ~finally f =
try
let x = f () in
finally ();
x
with exn ->
let bt = Printexc.get_raw_backtrace () in
finally ();
Printexc.raise_with_backtrace exn bt
let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s)
type event =
@ -144,7 +135,7 @@ module Writer = struct
let with_ ~out f =
let writer = create ~out () in
protect ~finally:(fun () -> close writer) (fun () -> f writer)
Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer)
let[@inline] flush (self : t) : unit = flush self.oc
@ -499,7 +490,7 @@ let setup ?(out = `Env) () =
let with_setup ?out () f =
setup ?out ();
protect ~finally:Trace_core.shutdown f
Fun.protect ~finally:Trace_core.shutdown f
module Internal_ = struct
let mock_all_ () = Mock_.enabled := true

View file

@ -1,9 +1,11 @@
module A = Trace_core.Internal_.Atomic_
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Mpsc_bag.t;
mutable closed: bool;
consumer_waiting: bool Atomic.t;
consumer_waiting: bool A.t;
}
exception Closed
@ -14,7 +16,7 @@ let create () : _ t =
cond = Condition.create ();
q = Mpsc_bag.create ();
closed = false;
consumer_waiting = Atomic.make false;
consumer_waiting = A.make false;
}
let close (self : _ t) =
@ -29,7 +31,7 @@ let push (self : _ t) x : unit =
if self.closed then raise Closed;
Mpsc_bag.add self.q x;
if self.closed then raise Closed;
if Atomic.get self.consumer_waiting then (
if A.get self.consumer_waiting then (
(* wakeup consumer *)
Mutex.lock self.mutex;
Condition.broadcast self.cond;
@ -42,14 +44,14 @@ let rec pop_all (self : 'a t) : 'a list =
| None ->
if self.closed then raise Closed;
Mutex.lock self.mutex;
Atomic.set self.consumer_waiting true;
A.set self.consumer_waiting true;
(* check again, a producer might have pushed an element since we
last checked. However if we still find
nothing, because this comes after [consumer_waiting:=true],
any producer arriving after that will know to wake us up. *)
(match Mpsc_bag.pop_all self.q with
| Some l ->
Atomic.set self.consumer_waiting false;
A.set self.consumer_waiting false;
Mutex.unlock self.mutex;
l
| None ->
@ -58,6 +60,6 @@ let rec pop_all (self : 'a t) : 'a list =
raise Closed
);
Condition.wait self.cond self.mutex;
Atomic.set self.consumer_waiting false;
A.set self.consumer_waiting false;
Mutex.unlock self.mutex;
pop_all self)

View file

@ -1 +1,2 @@
let cpu_relax () = ()
let n_domains () = 1

2
src/util/domain_util.mli Normal file
View file

@ -0,0 +1,2 @@
val cpu_relax : unit -> unit
val n_domains : unit -> int

View file

@ -0,0 +1,2 @@
let cpu_relax = Domain.cpu_relax
let n_domains = Domain.recommended_domain_count

9
src/util/dune Normal file
View file

@ -0,0 +1,9 @@
(library
(public_name trace.private.util)
(synopsis "internal utilities for trace. No guarantees of stability.")
(name trace_private_util)
(libraries trace.core mtime mtime.clock.os unix threads
(select domain_util.ml from
(base-domain -> domain_util.real.ml)
( -> domain_util.dummy.ml))))

View file

@ -1,7 +1,9 @@
type 'a t = { bag: 'a list Atomic.t } [@@unboxed]
module A = Trace_core.Internal_.Atomic_
type 'a t = { bag: 'a list A.t } [@@unboxed]
let create () =
let bag = Atomic.make [] in
let bag = A.make [] in
{ bag }
module Backoff = struct
@ -11,20 +13,20 @@ module Backoff = struct
let once (b : t) : t =
for _i = 1 to b do
Relax_.cpu_relax ()
Domain_util.cpu_relax ()
done;
min (b * 2) 256
end
let rec add backoff t x =
let before = Atomic.get t.bag in
let before = A.get t.bag in
let after = x :: before in
if not (Atomic.compare_and_set t.bag before after) then
if not (A.compare_and_set t.bag before after) then
add (Backoff.once backoff) t x
let[@inline] add t x = add Backoff.default t x
let[@inline] pop_all t : _ list option =
match Atomic.exchange t.bag [] with
match A.exchange t.bag [] with
| [] -> None
| l -> Some (List.rev l)

5
test/fuchsia/write/dune Normal file
View file

@ -0,0 +1,5 @@
(tests
(names t1 t2)
(package trace-fuchsia)
(libraries trace-fuchsia.write))

65
test/fuchsia/write/t1.ml Normal file
View file

@ -0,0 +1,65 @@
open Trace_fuchsia_write
module Str_ = struct
open String
let to_hex (s : string) : string =
let i_to_hex (i : int) =
if i < 10 then
Char.chr (i + Char.code '0')
else
Char.chr (i - 10 + Char.code 'a')
in
let res = Bytes.create (2 * length s) in
for i = 0 to length s - 1 do
let n = Char.code (get s i) in
Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4));
Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f))
done;
Bytes.unsafe_to_string res
let of_hex_exn (s : string) : string =
let n_of_c = function
| '0' .. '9' as c -> Char.code c - Char.code '0'
| 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a'
| 'A' .. 'F' as c -> 10 + Char.code c - Char.code 'A'
| _ -> invalid_arg "string: invalid hex"
in
if String.length s mod 2 <> 0 then
invalid_arg "string: hex sequence must be of even length";
let res = Bytes.make (String.length s / 2) '\x00' in
for i = 0 to (String.length s / 2) - 1 do
let n1 = n_of_c (String.get s (2 * i)) in
let n2 = n_of_c (String.get s ((2 * i) + 1)) in
let n = (n1 lsl 4) lor n2 in
Bytes.set res i (Char.chr n)
done;
Bytes.unsafe_to_string res
end
let () =
let l = List.init 100 (fun i -> Util.round_to_word i) in
assert (List.for_all (fun x -> x mod 8 = 0) l)
let () =
assert (Str_ref.inline 0 = 0b0000_0000_0000_0000);
assert (Str_ref.inline 1 = 0b1000_0000_0000_0001);
assert (Str_ref.inline 6 = 0b1000_0000_0000_0110);
assert (Str_ref.inline 31999 = 0b1111_1100_1111_1111);
()
let () =
let buf = Buf.create 128 in
Buf.add_i64 buf 42L;
assert (Buf.to_string buf = "\x2a\x00\x00\x00\x00\x00\x00\x00")
let () =
let buf = Buf.create 128 in
Buf.add_string buf "";
assert (Buf.to_string buf = "")
let () =
let buf = Buf.create 128 in
Buf.add_string buf "hello";
assert (Buf.to_string buf = "hello\x00\x00\x00")

View file

@ -0,0 +1,4 @@
first trace
100004467854160033000500000000000100000000000000560000000000000054001005000005804e61bc000000000068656c6c6f000000210001802a0000007800000000000000
second trace
1000044678541600210000000000000000ca9a3b00000000330005000000000001000000000000005600000000000000300011000000b0006f63616d6c2d747261636500000000004400040500000580a0860100000000006f75746572000000404b4c0000000000440004050000058020bf020000000000696e6e657200000020aa440000000000540010050000058087d612000000000068656c6c6f000000210001802a0000007800000000000000

89
test/fuchsia/write/t2.ml Normal file
View file

@ -0,0 +1,89 @@
open Trace_fuchsia_write
let pf = Printf.printf
module Str_ = struct
open String
let to_hex (s : string) : string =
let i_to_hex (i : int) =
if i < 10 then
Char.chr (i + Char.code '0')
else
Char.chr (i - 10 + Char.code 'a')
in
let res = Bytes.create (2 * length s) in
for i = 0 to length s - 1 do
let n = Char.code (get s i) in
Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4));
Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f))
done;
Bytes.unsafe_to_string res
let of_hex_exn (s : string) : string =
let n_of_c = function
| '0' .. '9' as c -> Char.code c - Char.code '0'
| 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a'
| 'A' .. 'F' as c -> 10 + Char.code c - Char.code 'A'
| _ -> invalid_arg "string: invalid hex"
in
if String.length s mod 2 <> 0 then
invalid_arg "string: hex sequence must be of even length";
let res = Bytes.make (String.length s / 2) '\x00' in
for i = 0 to (String.length s / 2) - 1 do
let n1 = n_of_c (String.get s (2 * i)) in
let n2 = n_of_c (String.get s ((2 * i) + 1)) in
let n = (n1 lsl 4) lor n2 in
Bytes.set res i (Char.chr n)
done;
Bytes.unsafe_to_string res
end
let with_buf_output (f : Output.t -> unit) : string =
let buf_pool = Buf_pool.create () in
let buffer = Buffer.create 32 in
let out = Output.into_buffer ~buf_pool buffer in
f out;
Output.flush out;
Buffer.contents buffer
let () = pf "first trace\n"
let () =
let str =
with_buf_output (fun out ->
Metadata.Magic_record.encode out;
Thread_record.encode out ~as_ref:5 ~pid:1 ~tid:86 ();
Event.Instant.encode out ~name:"hello" ~time_ns:1234_5678L
~t_ref:(Thread_ref.Ref 5)
~args:[ "x", `Int 42 ]
())
in
pf "%s\n" (Str_.to_hex str)
let () = pf "second trace\n"
let () =
let str =
with_buf_output (fun out ->
Metadata.Magic_record.encode out;
Metadata.Initialization_record.(
encode out ~ticks_per_secs:default_ticks_per_sec ());
Thread_record.encode out ~as_ref:5 ~pid:1 ~tid:86 ();
Metadata.Provider_info.encode out ~id:1 ~name:"ocaml-trace" ();
Event.Duration_complete.encode out ~name:"outer"
~t_ref:(Thread_ref.Ref 5) ~time_ns:100_000L ~end_time_ns:5_000_000L
~args:[] ();
Event.Duration_complete.encode out ~name:"inner"
~t_ref:(Thread_ref.Ref 5) ~time_ns:180_000L ~end_time_ns:4_500_000L
~args:[] ();
Event.Instant.encode out ~name:"hello" ~time_ns:1_234_567L
~t_ref:(Thread_ref.Ref 5)
~args:[ "x", `Int 42 ]
())
in
(let oc = open_out "foo.fxt" in
output_string oc str;
close_out oc);
pf "%s\n" (Str_.to_hex str)

37
trace-fuchsia.opam Normal file
View file

@ -0,0 +1,37 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.5"
synopsis:
"A high-performance backend for trace, emitting a Fuchsia trace into a file"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["trace" "tracing" "fuchsia"]
homepage: "https://github.com/c-cube/ocaml-trace"
bug-reports: "https://github.com/c-cube/ocaml-trace/issues"
depends: [
"ocaml" {>= "4.08"}
"trace" {= version}
"mtime" {>= "2.0"}
"base-bigarray"
"base-unix"
"dune" {>= "2.9"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"--promote-install-files=false"
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
["dune" "install" "-p" name "--create-install-files" name]
]
dev-repo: "git+https://github.com/c-cube/ocaml-trace.git"

View file

@ -14,7 +14,6 @@ depends: [
"trace" {= version}
"mtime" {>= "2.0"}
"base-unix"
"atomic"
"dune" {>= "2.9"}
"odoc" {with-doc}
]