mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
feat fuchsia: full revamp of the library, modularized
- separate exporter, writer, subscriber - use the subscriber span tbl to keep track of context - use a `Buf_chain.t` to keep multiple buffers in use, and keep a set of ready buffers - batch write the ready buffers and then recycle them
This commit is contained in:
parent
a4779227fa
commit
190f70d7c9
20 changed files with 613 additions and 729 deletions
|
|
@ -1,62 +0,0 @@
|
|||
open Common_
|
||||
|
||||
type out =
|
||||
[ `Stdout
|
||||
| `Stderr
|
||||
| `File of string
|
||||
]
|
||||
|
||||
type event =
|
||||
| E_write_buf of Buf.t
|
||||
| E_tick
|
||||
|
||||
type state = {
|
||||
buf_pool: Buf_pool.t;
|
||||
oc: out_channel;
|
||||
events: event B_queue.t;
|
||||
}
|
||||
|
||||
let with_out_ (out : out) f =
|
||||
let oc, must_close =
|
||||
match out with
|
||||
| `Stdout -> stdout, false
|
||||
| `Stderr -> stderr, false
|
||||
| `File path -> open_out path, true
|
||||
in
|
||||
|
||||
if must_close then (
|
||||
let finally () = close_out_noerr oc in
|
||||
Fun.protect ~finally (fun () -> f oc)
|
||||
) else
|
||||
f oc
|
||||
|
||||
let handle_ev (self : state) (ev : event) : unit =
|
||||
match ev with
|
||||
| E_tick -> flush self.oc
|
||||
| E_write_buf buf ->
|
||||
output self.oc buf.buf 0 buf.offset;
|
||||
Buf_pool.recycle self.buf_pool buf
|
||||
|
||||
let bg_loop (self : state) : unit =
|
||||
let continue = ref true in
|
||||
|
||||
while !continue do
|
||||
match B_queue.pop_all self.events with
|
||||
| exception B_queue.Closed -> continue := false
|
||||
| evs -> List.iter (handle_ev self) evs
|
||||
done
|
||||
|
||||
let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
|
||||
let@ oc = with_out_ out in
|
||||
let st = { oc; buf_pool; events } in
|
||||
bg_loop st
|
||||
|
||||
(** Thread that simply regularly "ticks", sending events to the background
|
||||
thread so it has a chance to write to the file, and call [f()] *)
|
||||
let tick_thread events : unit =
|
||||
try
|
||||
while true do
|
||||
Thread.delay 0.5;
|
||||
B_queue.push events E_tick
|
||||
done
|
||||
with B_queue.Closed -> ()
|
||||
|
|
@ -8,12 +8,14 @@ type t = {
|
|||
let empty : t = { buf = Bytes.empty; offset = 0 }
|
||||
|
||||
let create (n : int) : t =
|
||||
(* multiple of 8-bytes size *)
|
||||
let buf = Bytes.create (round_to_word n) in
|
||||
{ buf; offset = 0 }
|
||||
|
||||
let[@inline] clear self = self.offset <- 0
|
||||
let[@inline] available self = Bytes.length self.buf - self.offset
|
||||
let[@inline] size self = self.offset
|
||||
let[@inline] is_empty self = self.offset = 0
|
||||
|
||||
(* see below: we assume little endian *)
|
||||
let () = assert (not Sys.big_endian)
|
||||
140
src/fuchsia/buf_chain.ml
Normal file
140
src/fuchsia/buf_chain.ml
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
(** A set of buffers in use, and a set of ready buffers *)
|
||||
|
||||
open Common_
|
||||
|
||||
(** Buffers in use *)
|
||||
type buffers =
|
||||
| B_one of { mutable buf: Buf.t }
|
||||
| B_many of Buf.t Lock.t array
|
||||
(** mask(thread id) -> buffer. This reduces contention *)
|
||||
|
||||
type t = {
|
||||
bufs: buffers;
|
||||
has_ready: bool A.t;
|
||||
ready: Buf.t Queue.t Lock.t;
|
||||
(** Buffers that are full (enough) and must be written *)
|
||||
buf_pool: Buf_pool.t;
|
||||
}
|
||||
(** A set of buffers, some of which are ready to be written *)
|
||||
|
||||
open struct
|
||||
let shard_log = 4
|
||||
let shard = 1 lsl shard_log
|
||||
let shard_mask = shard - 1
|
||||
end
|
||||
|
||||
let create ~(sharded : bool) ~(buf_pool : Buf_pool.t) () : t =
|
||||
let bufs =
|
||||
if sharded then (
|
||||
let bufs =
|
||||
Array.init shard (fun _ -> Lock.create @@ Buf_pool.alloc buf_pool)
|
||||
in
|
||||
B_many bufs
|
||||
) else
|
||||
B_one { buf = Buf_pool.alloc buf_pool }
|
||||
in
|
||||
{
|
||||
bufs;
|
||||
buf_pool;
|
||||
has_ready = A.make false;
|
||||
ready = Lock.create @@ Queue.create ();
|
||||
}
|
||||
|
||||
open struct
|
||||
let put_in_ready (self : t) buf : unit =
|
||||
if Buf.size buf > 0 then (
|
||||
let@ q = Lock.with_ self.ready in
|
||||
Atomic.set self.has_ready true;
|
||||
Queue.push buf q
|
||||
)
|
||||
|
||||
let assert_available buf ~available =
|
||||
if Buf.available buf < available then (
|
||||
let msg =
|
||||
Printf.sprintf
|
||||
"fuchsia: buffer is too small (available: %d bytes, needed: %d bytes)"
|
||||
(Buf.available buf) available
|
||||
in
|
||||
failwith msg
|
||||
)
|
||||
end
|
||||
|
||||
(** Move all non-empty buffers to [ready] *)
|
||||
let ready_all_non_empty (self : t) : unit =
|
||||
let@ q = Lock.with_ self.ready in
|
||||
match self.bufs with
|
||||
| B_one r ->
|
||||
if not (Buf.is_empty r.buf) then (
|
||||
Queue.push r.buf q;
|
||||
A.set self.has_ready true;
|
||||
r.buf <- Buf.empty
|
||||
)
|
||||
| B_many bufs ->
|
||||
Array.iter
|
||||
(fun buf ->
|
||||
Lock.update buf (fun buf ->
|
||||
if Buf.size buf > 0 then (
|
||||
Queue.push buf q;
|
||||
A.set self.has_ready true;
|
||||
Buf.empty
|
||||
) else
|
||||
buf))
|
||||
bufs
|
||||
|
||||
let[@inline] has_ready self : bool = A.get self.has_ready
|
||||
|
||||
(** Get access to ready buffers, then clean them up automatically *)
|
||||
let pop_ready (self : t) ~(f : Buf.t Queue.t -> 'a) : 'a =
|
||||
let@ q = Lock.with_ self.ready in
|
||||
let res = f q in
|
||||
|
||||
(* clear queue *)
|
||||
Queue.iter (Buf_pool.recycle self.buf_pool) q;
|
||||
Queue.clear q;
|
||||
A.set self.has_ready false;
|
||||
res
|
||||
|
||||
(** Maximum size available, in words, for a single message *)
|
||||
let[@inline] max_size_word (_self : t) : int = fuchsia_buf_size lsr 3
|
||||
|
||||
(** Obtain a buffer with at least [available_word] 64-bit words *)
|
||||
let with_buf (self : t) ~(available_word : int) (f : Buf.t -> 'a) : 'a =
|
||||
let available = available_word lsl 3 in
|
||||
match self.bufs with
|
||||
| B_one r ->
|
||||
if Buf.available r.buf < available_word then (
|
||||
put_in_ready self r.buf;
|
||||
r.buf <- Buf_pool.alloc self.buf_pool
|
||||
);
|
||||
assert_available r.buf ~available;
|
||||
f r.buf
|
||||
| B_many bufs ->
|
||||
let tid = Thread.(id (self ())) in
|
||||
let masked_tid = tid land shard_mask in
|
||||
let buf_lock = bufs.(masked_tid) in
|
||||
let@ buf = Lock.with_ buf_lock in
|
||||
let buf =
|
||||
if Buf.available buf < available then (
|
||||
put_in_ready self buf;
|
||||
let new_buf = Buf_pool.alloc self.buf_pool in
|
||||
assert_available new_buf ~available;
|
||||
Lock.set_while_locked buf_lock new_buf;
|
||||
new_buf
|
||||
) else
|
||||
buf
|
||||
in
|
||||
f buf
|
||||
|
||||
(** Dispose of resources (here, recycle buffers) *)
|
||||
let dispose (self : t) : unit =
|
||||
match self.bufs with
|
||||
| B_one r ->
|
||||
Buf_pool.recycle self.buf_pool r.buf;
|
||||
r.buf <- Buf.empty
|
||||
| B_many bufs ->
|
||||
Array.iter
|
||||
(fun buf_lock ->
|
||||
let@ buf = Lock.with_ buf_lock in
|
||||
Buf_pool.recycle self.buf_pool buf;
|
||||
Lock.set_while_locked buf_lock Buf.empty)
|
||||
bufs
|
||||
23
src/fuchsia/buf_pool.ml
Normal file
23
src/fuchsia/buf_pool.ml
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
open Common_
|
||||
open Trace_private_util
|
||||
|
||||
type t = Buf.t Rpool.t
|
||||
|
||||
let create ?(max_size = 64) () : t =
|
||||
Rpool.create ~max_size ~clear:Buf.clear
|
||||
~create:(fun () -> Buf.create fuchsia_buf_size)
|
||||
()
|
||||
|
||||
let alloc = Rpool.alloc
|
||||
let[@inline] recycle self buf = if buf != Buf.empty then Rpool.recycle self buf
|
||||
|
||||
let with_ (self : t) f =
|
||||
let x = alloc self in
|
||||
try
|
||||
let res = f x in
|
||||
recycle self x;
|
||||
res
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
recycle self x;
|
||||
Printexc.raise_with_backtrace e bt
|
||||
|
|
@ -1,12 +1,22 @@
|
|||
module A = Trace_core.Internal_.Atomic_
|
||||
module FWrite = Trace_fuchsia_write
|
||||
module B_queue = Trace_private_util.B_queue
|
||||
module Buf = FWrite.Buf
|
||||
module Buf_pool = FWrite.Buf_pool
|
||||
module Output = FWrite.Output
|
||||
module Sub = Trace_subscriber
|
||||
|
||||
let on_tracing_error =
|
||||
ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s)
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
let spf = Printf.sprintf
|
||||
|
||||
let with_lock lock f =
|
||||
Mutex.lock lock;
|
||||
try
|
||||
let res = f () in
|
||||
Mutex.unlock lock;
|
||||
res
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Mutex.unlock lock;
|
||||
Printexc.raise_with_backtrace e bt
|
||||
|
||||
(** Buffer size we use. *)
|
||||
let fuchsia_buf_size = 1 lsl 16
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@
|
|||
(libraries
|
||||
trace.core
|
||||
trace.private.util
|
||||
trace.subscriber
|
||||
thread-local-storage
|
||||
(re_export trace-fuchsia.write)
|
||||
bigarray
|
||||
mtime
|
||||
mtime.clock.os
|
||||
|
|
|
|||
61
src/fuchsia/exporter.ml
Normal file
61
src/fuchsia/exporter.ml
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
(** An exporter, takes buffers with fuchsia events, and writes them somewhere *)
|
||||
|
||||
open Common_
|
||||
|
||||
type t = {
|
||||
write_bufs: Buf.t Queue.t -> unit;
|
||||
(** Takes buffers and writes them somewhere. The buffers are only valid
|
||||
during this call and must not be stored. The queue must not be
|
||||
modified. *)
|
||||
flush: unit -> unit; (** Force write *)
|
||||
close: unit -> unit; (** Close underlying resources *)
|
||||
}
|
||||
(** An exporter, takes buffers and writes them somewhere. This should be
|
||||
thread-safe if used in a threaded environment. *)
|
||||
|
||||
open struct
|
||||
let with_lock lock f =
|
||||
Mutex.lock lock;
|
||||
try
|
||||
let res = f () in
|
||||
Mutex.unlock lock;
|
||||
res
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Mutex.unlock lock;
|
||||
Printexc.raise_with_backtrace e bt
|
||||
end
|
||||
|
||||
(** Export to the channel
|
||||
@param close_channel if true, closing the exporter will close the channel *)
|
||||
let of_out_channel ~close_channel oc : t =
|
||||
let lock = Mutex.create () in
|
||||
let closed = ref false in
|
||||
let flush () =
|
||||
let@ () = with_lock lock in
|
||||
flush oc
|
||||
in
|
||||
let close () =
|
||||
let@ () = with_lock lock in
|
||||
if not !closed then (
|
||||
closed := true;
|
||||
if close_channel then close_out_noerr oc
|
||||
)
|
||||
in
|
||||
let write_bufs bufs =
|
||||
if not (Queue.is_empty bufs) then
|
||||
let@ () = with_lock lock in
|
||||
Queue.iter (fun (buf : Buf.t) -> output oc buf.buf 0 buf.offset) bufs
|
||||
in
|
||||
{ flush; close; write_bufs }
|
||||
|
||||
let of_buffer (buffer : Buffer.t) : t =
|
||||
let buffer = Lock.create buffer in
|
||||
let write_bufs bufs =
|
||||
if not (Queue.is_empty bufs) then
|
||||
let@ buffer = Lock.with_ buffer in
|
||||
Queue.iter
|
||||
(fun (buf : Buf.t) -> Buffer.add_subbytes buffer buf.buf 0 buf.offset)
|
||||
bufs
|
||||
in
|
||||
{ flush = ignore; close = ignore; write_bufs }
|
||||
|
|
@ -1,430 +0,0 @@
|
|||
open Trace_core
|
||||
open Common_
|
||||
module TLS = Thread_local_storage
|
||||
module Int_map = Map.Make (Int)
|
||||
|
||||
let pid = Unix.getpid ()
|
||||
|
||||
module Mock_ = struct
|
||||
let enabled = ref false
|
||||
let now = ref 0
|
||||
|
||||
(* used to mock timing *)
|
||||
let get_now_ns () : float =
|
||||
let x = !now in
|
||||
incr now;
|
||||
float_of_int x *. 1000.
|
||||
|
||||
let get_tid_ () : int = 3
|
||||
end
|
||||
|
||||
(** Thread-local stack of span info *)
|
||||
module Span_info_stack : sig
|
||||
type t
|
||||
|
||||
val create : unit -> t
|
||||
|
||||
val push :
|
||||
t ->
|
||||
span ->
|
||||
name:string ->
|
||||
start_time_ns:int64 ->
|
||||
data:(string * user_data) list ->
|
||||
unit
|
||||
|
||||
val pop : t -> int64 * string * int64 * (string * user_data) list
|
||||
val find_ : t -> span -> int option
|
||||
val add_data : t -> int -> (string * user_data) list -> unit
|
||||
end = struct
|
||||
module BA = Bigarray
|
||||
module BA1 = Bigarray.Array1
|
||||
|
||||
type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t
|
||||
|
||||
type t = {
|
||||
mutable len: int;
|
||||
mutable span: int64arr;
|
||||
mutable start_time_ns: int64arr;
|
||||
mutable name: string array;
|
||||
mutable data: (string * user_data) list array;
|
||||
}
|
||||
|
||||
let init_size_ = 1
|
||||
|
||||
let create () : t =
|
||||
{
|
||||
len = 0;
|
||||
span = BA1.create BA.Int64 BA.C_layout init_size_;
|
||||
start_time_ns = BA1.create BA.Int64 BA.C_layout init_size_;
|
||||
name = Array.make init_size_ "";
|
||||
data = Array.make init_size_ [];
|
||||
}
|
||||
|
||||
let[@inline] cap self = Array.length self.name
|
||||
|
||||
let grow_ (self : t) : unit =
|
||||
let new_cap = 2 * cap self in
|
||||
let new_span = BA1.create BA.Int64 BA.C_layout new_cap in
|
||||
BA1.blit self.span (BA1.sub new_span 0 self.len);
|
||||
let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in
|
||||
BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len);
|
||||
let new_name = Array.make new_cap "" in
|
||||
Array.blit self.name 0 new_name 0 self.len;
|
||||
let new_data = Array.make new_cap [] in
|
||||
Array.blit self.data 0 new_data 0 self.len;
|
||||
self.span <- new_span;
|
||||
self.start_time_ns <- new_startime_ns;
|
||||
self.name <- new_name;
|
||||
self.data <- new_data
|
||||
|
||||
let push (self : t) (span : int64) ~name ~start_time_ns ~data =
|
||||
if cap self = self.len then grow_ self;
|
||||
BA1.set self.span self.len span;
|
||||
BA1.set self.start_time_ns self.len start_time_ns;
|
||||
Array.set self.name self.len name;
|
||||
Array.set self.data self.len data;
|
||||
self.len <- self.len + 1
|
||||
|
||||
let pop (self : t) =
|
||||
assert (self.len > 0);
|
||||
self.len <- self.len - 1;
|
||||
|
||||
let span = BA1.get self.span self.len in
|
||||
let name = self.name.(self.len) in
|
||||
let start_time_ns = BA1.get self.start_time_ns self.len in
|
||||
let data = self.data.(self.len) in
|
||||
|
||||
(* avoid holding onto old values *)
|
||||
Array.set self.name self.len "";
|
||||
Array.set self.data self.len [];
|
||||
|
||||
span, name, start_time_ns, data
|
||||
|
||||
let[@inline] add_data self i d : unit =
|
||||
assert (i < self.len);
|
||||
self.data.(i) <- List.rev_append d self.data.(i)
|
||||
|
||||
exception Found of int
|
||||
|
||||
let[@inline] find_ (self : t) span : _ option =
|
||||
try
|
||||
for i = self.len - 1 downto 0 do
|
||||
if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i)
|
||||
done;
|
||||
|
||||
None
|
||||
with Found i -> Some i
|
||||
end
|
||||
|
||||
type async_span_info = {
|
||||
flavor: [ `Sync | `Async ] option;
|
||||
name: string;
|
||||
mutable data: (string * user_data) list;
|
||||
}
|
||||
|
||||
let key_async_data : async_span_info Meta_map.key = Meta_map.Key.create ()
|
||||
|
||||
open struct
|
||||
let state_id_ = A.make 0
|
||||
|
||||
(* re-raise exception with its backtrace *)
|
||||
external reraise : exn -> 'a = "%reraise"
|
||||
end
|
||||
|
||||
type per_thread_state = {
|
||||
tid: int;
|
||||
state_id: int; (** ID of the current collector state *)
|
||||
local_span_id_gen: int A.t; (** Used for thread-local spans *)
|
||||
mutable thread_ref: FWrite.Thread_ref.t;
|
||||
mutable out: Output.t option;
|
||||
spans: Span_info_stack.t; (** In-flight spans *)
|
||||
}
|
||||
|
||||
type state = {
|
||||
active: bool A.t;
|
||||
events: Bg_thread.event B_queue.t;
|
||||
span_id_gen: int A.t; (** Used for async spans *)
|
||||
bg_thread: Thread.t;
|
||||
buf_pool: Buf_pool.t;
|
||||
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
|
||||
per_thread: per_thread_state Int_map.t A.t array;
|
||||
(** the state keeps tabs on thread-local state, so it can flush writers at
|
||||
the end. This is a tid-sharded array of maps. *)
|
||||
}
|
||||
|
||||
let[@inline] mk_trace_id (self : state) : trace_id =
|
||||
let n = A.fetch_and_add self.span_id_gen 1 in
|
||||
let b = Bytes.create 8 in
|
||||
Bytes.set_int64_le b 0 (Int64.of_int n);
|
||||
Bytes.unsafe_to_string b
|
||||
|
||||
let key_thread_local_st : per_thread_state TLS.t = TLS.create ()
|
||||
|
||||
let[@inline never] mk_thread_local_st () =
|
||||
let tid = Thread.id @@ Thread.self () in
|
||||
let st =
|
||||
{
|
||||
tid;
|
||||
state_id = A.get state_id_;
|
||||
thread_ref = FWrite.Thread_ref.inline ~pid ~tid;
|
||||
local_span_id_gen = A.make 0;
|
||||
out = None;
|
||||
spans = Span_info_stack.create ();
|
||||
}
|
||||
in
|
||||
TLS.set key_thread_local_st st;
|
||||
st
|
||||
|
||||
let[@inline] get_thread_local_st () =
|
||||
match TLS.get_opt key_thread_local_st with
|
||||
| Some k -> k
|
||||
| None -> mk_thread_local_st ()
|
||||
|
||||
let out_of_st (st : state) : Output.t =
|
||||
FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf ->
|
||||
try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ())
|
||||
|
||||
module C
|
||||
(St : sig
|
||||
val st : state
|
||||
end)
|
||||
() =
|
||||
struct
|
||||
open St
|
||||
|
||||
let state_id = 1 + A.fetch_and_add state_id_ 1
|
||||
|
||||
(** prepare the thread's state *)
|
||||
let[@inline never] update_or_init_local_state (self : per_thread_state) : unit
|
||||
=
|
||||
(* get an output *)
|
||||
let out = out_of_st st in
|
||||
self.out <- Some out;
|
||||
|
||||
(* try to allocate a thread ref for current thread *)
|
||||
let th_ref = A.fetch_and_add st.next_thread_ref 1 in
|
||||
if th_ref <= 0xff then (
|
||||
self.thread_ref <- FWrite.Thread_ref.ref th_ref;
|
||||
FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid ()
|
||||
);
|
||||
|
||||
(* add to [st]'s list of threads *)
|
||||
let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in
|
||||
while
|
||||
let old = A.get shard_of_per_thread in
|
||||
not
|
||||
(A.compare_and_set shard_of_per_thread old
|
||||
(Int_map.add self.tid self old))
|
||||
do
|
||||
()
|
||||
done;
|
||||
|
||||
let on_exit _ =
|
||||
while
|
||||
let old = A.get shard_of_per_thread in
|
||||
not
|
||||
(A.compare_and_set shard_of_per_thread old
|
||||
(Int_map.remove self.tid old))
|
||||
do
|
||||
()
|
||||
done;
|
||||
Option.iter Output.flush self.out
|
||||
in
|
||||
|
||||
(* after thread exits, flush output and remove from global list *)
|
||||
Gc.finalise on_exit (Thread.self ());
|
||||
()
|
||||
|
||||
(** Obtain the output for the current thread *)
|
||||
let[@inline] get_thread_output () : Output.t * per_thread_state =
|
||||
let tls = get_thread_local_st () in
|
||||
if tls.state_id != state_id || tls.out == None then
|
||||
update_or_init_local_state tls;
|
||||
let out =
|
||||
match tls.out with
|
||||
| None -> assert false
|
||||
| Some o -> o
|
||||
in
|
||||
out, tls
|
||||
|
||||
let close_per_thread (tls : per_thread_state) =
|
||||
Option.iter Output.flush tls.out
|
||||
|
||||
(** flush all outputs *)
|
||||
let flush_all_outputs_ () =
|
||||
Array.iter
|
||||
(fun shard ->
|
||||
let tls_l = A.get shard in
|
||||
Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l)
|
||||
st.per_thread
|
||||
|
||||
let shutdown () =
|
||||
if A.exchange st.active false then (
|
||||
flush_all_outputs_ ();
|
||||
|
||||
B_queue.close st.events;
|
||||
(* wait for writer thread to be done. The writer thread will exit
|
||||
after processing remaining events because the queue is now closed *)
|
||||
Thread.join st.bg_thread
|
||||
)
|
||||
|
||||
let enter_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : span =
|
||||
let tls = get_thread_local_st () in
|
||||
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
|
||||
let time_ns = Time.now_ns () in
|
||||
Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns;
|
||||
span
|
||||
|
||||
let exit_span span : unit =
|
||||
let out, tls = get_thread_output () in
|
||||
let end_time_ns = Time.now_ns () in
|
||||
|
||||
let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in
|
||||
if span <> span' then
|
||||
!on_tracing_error
|
||||
(spf "span mismatch: top is %Ld, expected %Ld" span' span)
|
||||
else
|
||||
FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref
|
||||
~time_ns:start_time_ns ~end_time_ns ~args:data ()
|
||||
|
||||
let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f =
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
|
||||
Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name;
|
||||
|
||||
let[@inline] exit () : unit =
|
||||
let end_time_ns = Time.now_ns () in
|
||||
|
||||
let _span', _, _, data = Span_info_stack.pop tls.spans in
|
||||
assert (span = _span');
|
||||
FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns
|
||||
~t_ref:tls.thread_ref ~args:data ()
|
||||
in
|
||||
|
||||
try
|
||||
let x = f span in
|
||||
exit ();
|
||||
x
|
||||
with exn ->
|
||||
exit ();
|
||||
reraise exn
|
||||
|
||||
let add_data_to_span span data =
|
||||
let tls = get_thread_local_st () in
|
||||
match Span_info_stack.find_ tls.spans span with
|
||||
| None -> !on_tracing_error (spf "unknown span %Ld" span)
|
||||
| Some idx -> Span_info_stack.add_data tls.spans idx data
|
||||
|
||||
let enter_manual_span ~(parent : explicit_span_ctx option) ~flavor
|
||||
~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span =
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
|
||||
(* get the id, or make a new one *)
|
||||
let trace_id =
|
||||
match parent with
|
||||
| Some m -> m.trace_id
|
||||
| None -> mk_trace_id st
|
||||
in
|
||||
|
||||
FWrite.Event.Async_begin.encode out ~name ~args:data ~t_ref:tls.thread_ref
|
||||
~time_ns ~async_id:trace_id ();
|
||||
{
|
||||
span = 0L;
|
||||
trace_id;
|
||||
meta = Meta_map.(empty |> add key_async_data { name; flavor; data = [] });
|
||||
}
|
||||
|
||||
let exit_manual_span (es : explicit_span) : unit =
|
||||
let { name; data; flavor = _ } = Meta_map.find_exn key_async_data es.meta in
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
|
||||
FWrite.Event.Async_end.encode out ~name ~t_ref:tls.thread_ref ~time_ns
|
||||
~args:data ~async_id:es.trace_id ()
|
||||
|
||||
let add_data_to_manual_span (es : explicit_span) data =
|
||||
let m = Meta_map.find_exn key_async_data es.meta in
|
||||
m.data <- List.rev_append data m.data
|
||||
|
||||
let message ?span:_ ~data msg : unit =
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
FWrite.Event.Instant.encode out ~name:msg ~time_ns ~t_ref:tls.thread_ref
|
||||
~args:data ()
|
||||
|
||||
let counter_float ~data name f =
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref
|
||||
~args:((name, `Float f) :: data)
|
||||
()
|
||||
|
||||
let counter_int ~data name i =
|
||||
let out, tls = get_thread_output () in
|
||||
let time_ns = Time.now_ns () in
|
||||
FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref
|
||||
~args:((name, `Int i) :: data)
|
||||
()
|
||||
|
||||
let name_process name : unit =
|
||||
let out, _tls = get_thread_output () in
|
||||
FWrite.Kernel_object.(encode out ~name ~ty:ty_process ~kid:pid ~args:[] ())
|
||||
|
||||
let name_thread name : unit =
|
||||
let out, tls = get_thread_output () in
|
||||
FWrite.Kernel_object.(
|
||||
encode out ~name ~ty:ty_thread ~kid:tls.tid
|
||||
~args:[ "process", `Kid pid ]
|
||||
())
|
||||
|
||||
let extension_event _ = ()
|
||||
end
|
||||
|
||||
let create ~out () : collector =
|
||||
let buf_pool = Buf_pool.create () in
|
||||
let events = B_queue.create () in
|
||||
|
||||
let bg_thread =
|
||||
Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) ()
|
||||
in
|
||||
|
||||
let st =
|
||||
{
|
||||
active = A.make true;
|
||||
buf_pool;
|
||||
bg_thread;
|
||||
events;
|
||||
span_id_gen = A.make 0;
|
||||
next_thread_ref = A.make 1;
|
||||
per_thread = Array.init 16 (fun _ -> A.make Int_map.empty);
|
||||
}
|
||||
in
|
||||
|
||||
let _tick_thread = Thread.create (fun () -> Bg_thread.tick_thread events) in
|
||||
|
||||
(* write header *)
|
||||
let out = out_of_st st in
|
||||
FWrite.Metadata.Magic_record.encode out;
|
||||
FWrite.Metadata.Initialization_record.(
|
||||
encode out ~ticks_per_secs:default_ticks_per_sec ());
|
||||
FWrite.Metadata.Provider_info.encode out ~id:0 ~name:"ocaml-trace" ();
|
||||
Output.flush out;
|
||||
Output.dispose out;
|
||||
|
||||
let module Coll =
|
||||
C
|
||||
(struct
|
||||
let st = st
|
||||
end)
|
||||
()
|
||||
in
|
||||
(module Coll)
|
||||
|
||||
module Internal_ = struct
|
||||
let mock_all_ () =
|
||||
Mock_.enabled := true;
|
||||
Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns;
|
||||
Sub.Private_.get_tid_ := Some Mock_.get_tid_
|
||||
end
|
||||
|
|
@ -1,11 +0,0 @@
|
|||
open Trace_core
|
||||
|
||||
val create : out:Bg_thread.out -> unit -> collector
|
||||
|
||||
(**/**)
|
||||
|
||||
module Internal_ : sig
|
||||
val mock_all_ : unit -> unit
|
||||
end
|
||||
|
||||
(**/**)
|
||||
28
src/fuchsia/lock.ml
Normal file
28
src/fuchsia/lock.ml
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
type 'a t = {
|
||||
mutex: Mutex.t;
|
||||
mutable content: 'a;
|
||||
}
|
||||
|
||||
let create content : _ t = { mutex = Mutex.create (); content }
|
||||
|
||||
let with_ (self : _ t) f =
|
||||
Mutex.lock self.mutex;
|
||||
try
|
||||
let x = f self.content in
|
||||
Mutex.unlock self.mutex;
|
||||
x
|
||||
with e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
Mutex.unlock self.mutex;
|
||||
Printexc.raise_with_backtrace e bt
|
||||
|
||||
let[@inline] update self f = with_ self (fun x -> self.content <- f x)
|
||||
|
||||
let[@inline] update_map l f =
|
||||
with_ l (fun x ->
|
||||
let x', y = f x in
|
||||
l.content <- x';
|
||||
y)
|
||||
|
||||
let[@inline] set_while_locked (self : 'a t) (x : 'a) = self.content <- x
|
||||
|
||||
10
src/fuchsia/lock.mli
Normal file
10
src/fuchsia/lock.mli
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
type 'a t
|
||||
(** A value protected by a mutex *)
|
||||
|
||||
val create : 'a -> 'a t
|
||||
val with_ : 'a t -> ('a -> 'b) -> 'b
|
||||
val update : 'a t -> ('a -> 'a) -> unit
|
||||
val update_map : 'a t -> ('a -> 'a * 'b) -> 'b
|
||||
|
||||
val set_while_locked : 'a t -> 'a -> unit
|
||||
(** Change the value while inside [with_] or similar. *)
|
||||
173
src/fuchsia/subscriber.ml
Normal file
173
src/fuchsia/subscriber.ml
Normal file
|
|
@ -0,0 +1,173 @@
|
|||
open Common_
|
||||
open Trace_core
|
||||
module Span_tbl = Trace_subscriber.Span_tbl
|
||||
|
||||
let on_tracing_error = on_tracing_error
|
||||
|
||||
type span_info = {
|
||||
tid: int;
|
||||
name: string;
|
||||
start_ns: int64;
|
||||
mutable data: (string * Sub.user_data) list;
|
||||
(* NOTE: thread safety: this is supposed to only be modified by the thread
|
||||
that's running this (synchronous, stack-abiding) span. *)
|
||||
}
|
||||
(** Information we store about a span begin event, to emit a complete event when
|
||||
we meet the corresponding span end event *)
|
||||
|
||||
type t = {
|
||||
active: bool A.t;
|
||||
pid: int;
|
||||
spans: span_info Span_tbl.t;
|
||||
buf_chain: Buf_chain.t;
|
||||
exporter: Exporter.t;
|
||||
}
|
||||
(** Subscriber state *)
|
||||
|
||||
open struct
|
||||
(** Write the buffers that are ready *)
|
||||
let[@inline] write_ready_ (self : t) =
|
||||
if Buf_chain.has_ready self.buf_chain then
|
||||
Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs
|
||||
|
||||
let print_non_closed_spans_warning spans =
|
||||
let module Str_set = Set.Make (String) in
|
||||
let spans = Span_tbl.to_list spans in
|
||||
if spans <> [] then (
|
||||
!on_tracing_error
|
||||
@@ Printf.sprintf "trace-tef: warning: %d spans were not closed\n"
|
||||
(List.length spans);
|
||||
let names =
|
||||
List.fold_left
|
||||
(fun set (_, span) -> Str_set.add span.name set)
|
||||
Str_set.empty spans
|
||||
in
|
||||
Str_set.iter
|
||||
(fun name ->
|
||||
!on_tracing_error @@ Printf.sprintf " span %S was not closed\n" name)
|
||||
names;
|
||||
flush stderr
|
||||
)
|
||||
end
|
||||
|
||||
let close (self : t) : unit =
|
||||
if A.exchange self.active false then (
|
||||
Buf_chain.ready_all_non_empty self.buf_chain;
|
||||
write_ready_ self;
|
||||
self.exporter.close ();
|
||||
|
||||
print_non_closed_spans_warning self.spans
|
||||
)
|
||||
|
||||
let[@inline] active self = A.get self.active
|
||||
|
||||
let flush (self : t) : unit =
|
||||
Buf_chain.ready_all_non_empty self.buf_chain;
|
||||
write_ready_ self;
|
||||
self.exporter.flush ()
|
||||
|
||||
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
||||
let buf_chain = Buf_chain.create ~sharded:true ~buf_pool () in
|
||||
{ active = A.make true; buf_chain; exporter; pid; spans = Span_tbl.create () }
|
||||
|
||||
module Callbacks = struct
|
||||
type st = t
|
||||
|
||||
let on_init (self : st) ~time_ns:_ =
|
||||
Writer.Metadata.Magic_record.encode self.buf_chain;
|
||||
Writer.Metadata.Initialization_record.(
|
||||
encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
|
||||
Writer.Metadata.Provider_info.encode self.buf_chain ~id:0
|
||||
~name:"ocaml-trace" ();
|
||||
(* make sure we write these immediately so they're not out of order *)
|
||||
Buf_chain.ready_all_non_empty self.buf_chain;
|
||||
|
||||
write_ready_ self
|
||||
|
||||
let on_shutdown (self : st) ~time_ns:_ = close self
|
||||
|
||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
||||
Writer.Kernel_object.(
|
||||
encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
|
||||
write_ready_ self
|
||||
|
||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
||||
Writer.Kernel_object.(
|
||||
encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
|
||||
~args:[ "process", A_kid (Int64.of_int self.pid) ]
|
||||
());
|
||||
write_ready_ self
|
||||
|
||||
(* add function name, if provided, to the metadata *)
|
||||
let add_fun_name_ fun_name data : _ list =
|
||||
match fun_name with
|
||||
| None -> data
|
||||
| Some f -> ("function", Sub.U_string f) :: data
|
||||
|
||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
||||
let data = add_fun_name_ fun_name data in
|
||||
let info = { tid; name; start_ns = time_ns; data } in
|
||||
(* save the span so we find it at exit *)
|
||||
Span_tbl.add self.spans span info
|
||||
|
||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
||||
match Span_tbl.find_exn self.spans span with
|
||||
| exception Not_found ->
|
||||
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
|
||||
| { tid; name; start_ns; data } ->
|
||||
Span_tbl.remove self.spans span;
|
||||
Writer.(
|
||||
Event.Duration_complete.encode self.buf_chain ~name
|
||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||
~time_ns:start_ns ~end_time_ns:time_ns ~args:(args_of_user_data data)
|
||||
());
|
||||
write_ready_ self
|
||||
|
||||
let on_add_data (self : st) ~data span =
|
||||
if data <> [] then (
|
||||
try
|
||||
let info = Span_tbl.find_exn self.spans span in
|
||||
info.data <- List.rev_append data info.data
|
||||
with Not_found ->
|
||||
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
|
||||
)
|
||||
|
||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
||||
Writer.(
|
||||
Event.Instant.encode self.buf_chain
|
||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||
~name:msg ~time_ns ~args:(args_of_user_data data) ());
|
||||
write_ready_ self
|
||||
|
||||
let on_counter (self : st) ~time_ns ~tid ~data ~name n : unit =
|
||||
Writer.(
|
||||
Event.Counter.encode self.buf_chain
|
||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||
~name ~time_ns
|
||||
~args:((name, A_float n) :: args_of_user_data data)
|
||||
());
|
||||
write_ready_ self
|
||||
|
||||
let on_enter_manual_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_
|
||||
~time_ns ~tid ~parent:_ ~data ~name ~flavor:_ ~trace_id _span : unit =
|
||||
Writer.(
|
||||
Event.Async_begin.encode self.buf_chain ~name
|
||||
~args:(args_of_user_data data)
|
||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||
~time_ns ~async_id:trace_id ());
|
||||
write_ready_ self
|
||||
|
||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor:_
|
||||
~trace_id (_ : span) : unit =
|
||||
Writer.(
|
||||
Event.Async_end.encode self.buf_chain ~name ~args:(args_of_user_data data)
|
||||
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
|
||||
~time_ns ~async_id:trace_id ());
|
||||
write_ready_ self
|
||||
|
||||
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
|
||||
end
|
||||
|
||||
let subscriber (self : t) : Sub.t =
|
||||
Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }
|
||||
20
src/fuchsia/subscriber.mli
Normal file
20
src/fuchsia/subscriber.mli
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
type t
|
||||
(** Main subscriber state. *)
|
||||
|
||||
val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t
|
||||
(** Create a subscriber state. *)
|
||||
|
||||
val flush : t -> unit
|
||||
val close : t -> unit
|
||||
val active : t -> bool
|
||||
|
||||
module Callbacks : Trace_subscriber.Callbacks.S with type st = t
|
||||
|
||||
val subscriber : t -> Trace_subscriber.t
|
||||
(** Subscriber that writes json into this writer *)
|
||||
|
||||
(**/**)
|
||||
|
||||
val on_tracing_error : (string -> unit) ref
|
||||
|
||||
(**/**)
|
||||
|
|
@ -1,31 +1,50 @@
|
|||
open Common_
|
||||
module Buf = Buf
|
||||
module Buf_chain = Buf_chain
|
||||
module Buf_pool = Buf_pool
|
||||
module Exporter = Exporter
|
||||
module Subscriber = Subscriber
|
||||
module Writer = Writer
|
||||
|
||||
type output =
|
||||
[ `Stdout
|
||||
| `Stderr
|
||||
| `File of string
|
||||
[ `File of string
|
||||
| `Exporter of Exporter.t
|
||||
]
|
||||
|
||||
let collector = Fcollector.create
|
||||
let get_out_ (out : [< output ]) : Exporter.t =
|
||||
match out with
|
||||
| `File path ->
|
||||
let oc = open_out path in
|
||||
Exporter.of_out_channel ~close_channel:true oc
|
||||
| `Exporter e -> e
|
||||
|
||||
let subscriber ~out () : Sub.t =
|
||||
let exporter = get_out_ out in
|
||||
let pid =
|
||||
if !Trace_subscriber.Private_.mock then
|
||||
2
|
||||
else
|
||||
Unix.getpid ()
|
||||
in
|
||||
let sub = Subscriber.create ~pid ~exporter () in
|
||||
Subscriber.subscriber sub
|
||||
|
||||
let collector ~out () = Sub.collector @@ subscriber ~out ()
|
||||
|
||||
let setup ?(out = `Env) () =
|
||||
match out with
|
||||
| `Stderr -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr ()
|
||||
| `Stdout -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout ()
|
||||
| `File path ->
|
||||
Trace_core.setup_collector @@ Fcollector.create ~out:(`File path) ()
|
||||
| `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) ()
|
||||
| `Exporter _ as out ->
|
||||
let sub = subscriber ~out () in
|
||||
Trace_core.setup_collector @@ Sub.collector sub
|
||||
| `Env ->
|
||||
(match Sys.getenv_opt "TRACE" with
|
||||
| Some ("1" | "true") ->
|
||||
let path = "trace.fxt" in
|
||||
let c = Fcollector.create ~out:(`File path) () in
|
||||
let c = collector ~out:(`File path) () in
|
||||
Trace_core.setup_collector c
|
||||
| Some "stdout" ->
|
||||
Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout ()
|
||||
| Some "stderr" ->
|
||||
Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr ()
|
||||
| Some path ->
|
||||
let c = Fcollector.create ~out:(`File path) () in
|
||||
let c = collector ~out:(`File path) () in
|
||||
Trace_core.setup_collector c
|
||||
| None -> ())
|
||||
|
||||
|
|
@ -33,7 +52,24 @@ let with_setup ?out () f =
|
|||
setup ?out ();
|
||||
Fun.protect ~finally:Trace_core.shutdown f
|
||||
|
||||
module Mock_ = struct
|
||||
let now = ref 0
|
||||
|
||||
(* used to mock timing *)
|
||||
let get_now_ns () : int64 =
|
||||
let x = !now in
|
||||
incr now;
|
||||
Int64.(mul (of_int x) 1000L)
|
||||
|
||||
let get_tid_ () : int = 3
|
||||
end
|
||||
|
||||
module Internal_ = struct
|
||||
let mock_all_ = Fcollector.Internal_.mock_all_
|
||||
let mock_all_ () =
|
||||
Sub.Private_.mock := true;
|
||||
Sub.Private_.get_now_ns_ := Mock_.get_now_ns;
|
||||
Sub.Private_.get_tid_ := Mock_.get_tid_;
|
||||
()
|
||||
|
||||
let on_tracing_error = on_tracing_error
|
||||
end
|
||||
|
|
|
|||
|
|
@ -6,22 +6,23 @@
|
|||
trace format}. This reduces the tracing overhead compared to [trace-tef],
|
||||
at the expense of simplicity. *)
|
||||
|
||||
val collector :
|
||||
out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector
|
||||
(** Make a collector that writes into the given output. See {!setup} for more
|
||||
details. *)
|
||||
module Buf = Buf
|
||||
module Buf_chain = Buf_chain
|
||||
module Buf_pool = Buf_pool
|
||||
module Exporter = Exporter
|
||||
module Subscriber = Subscriber
|
||||
module Writer = Writer
|
||||
|
||||
type output =
|
||||
[ `Stdout
|
||||
| `Stderr
|
||||
| `File of string
|
||||
[ `File of string
|
||||
| `Exporter of Exporter.t
|
||||
]
|
||||
(** Output for tracing.
|
||||
|
||||
- [`Stdout] will enable tracing and print events on stdout
|
||||
- [`Stderr] will enable tracing and print events on stderr
|
||||
- [`File "foo"] will enable tracing and print events into file named "foo"
|
||||
*)
|
||||
val subscriber : out:[< output ] -> unit -> Trace_subscriber.t
|
||||
|
||||
val collector : out:[< output ] -> unit -> Trace_core.collector
|
||||
(** Make a collector that writes into the given output. See {!setup} for more
|
||||
details. *)
|
||||
|
||||
val setup : ?out:[ output | `Env ] -> unit -> unit
|
||||
(** [setup ()] installs the collector depending on [out].
|
||||
|
|
@ -32,12 +33,10 @@ val setup : ?out:[ output | `Env ] -> unit -> unit
|
|||
- [`Env] will enable tracing if the environment variable "TRACE" is set.
|
||||
|
||||
- If it's set to "1", then the file is "trace.fxt".
|
||||
- If it's set to "stdout", then logging happens on stdout (since 0.2)
|
||||
- If it's set to "stderr", then logging happens on stdout (since 0.2)
|
||||
- Otherwise, if it's set to a non empty string, the value is taken to be the
|
||||
file path into which to write. *)
|
||||
|
||||
val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
|
||||
val with_setup : ?out:[< output | `Env > `Env ] -> unit -> (unit -> 'a) -> 'a
|
||||
(** [with_setup () f] (optionally) sets a collector up, calls [f()], and makes
|
||||
sure to shutdown before exiting. *)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,58 +0,0 @@
|
|||
open struct
|
||||
module A = Trace_core.Internal_.Atomic_
|
||||
|
||||
exception Got_buf of Buf.t
|
||||
end
|
||||
|
||||
module List_with_len = struct
|
||||
type +'a t =
|
||||
| Nil
|
||||
| Cons of int * 'a * 'a t
|
||||
|
||||
let empty : _ t = Nil
|
||||
|
||||
let[@inline] len = function
|
||||
| Nil -> 0
|
||||
| Cons (i, _, _) -> i
|
||||
|
||||
let[@inline] cons x self = Cons (len self + 1, x, self)
|
||||
end
|
||||
|
||||
type t = {
|
||||
max_len: int;
|
||||
buf_size: int;
|
||||
bufs: Buf.t List_with_len.t A.t;
|
||||
}
|
||||
|
||||
let create ?(max_len = 64) ?(buf_size = 1 lsl 16) () : t =
|
||||
let buf_size = min (1 lsl 22) (max buf_size (1 lsl 15)) in
|
||||
{ max_len; buf_size; bufs = A.make List_with_len.empty }
|
||||
|
||||
let alloc (self : t) : Buf.t =
|
||||
try
|
||||
while
|
||||
match A.get self.bufs with
|
||||
| Nil -> false
|
||||
| Cons (_, buf, tl) as old ->
|
||||
if A.compare_and_set self.bufs old tl then
|
||||
raise (Got_buf buf)
|
||||
else
|
||||
false
|
||||
do
|
||||
()
|
||||
done;
|
||||
Buf.create self.buf_size
|
||||
with Got_buf b -> b
|
||||
|
||||
let recycle (self : t) (buf : Buf.t) : unit =
|
||||
Buf.clear buf;
|
||||
try
|
||||
while
|
||||
match A.get self.bufs with
|
||||
| Cons (i, _, _) when i >= self.max_len -> raise Exit
|
||||
| old ->
|
||||
not (A.compare_and_set self.bufs old (List_with_len.cons buf old))
|
||||
do
|
||||
()
|
||||
done
|
||||
with Exit -> () (* do not recycle *)
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
(library
|
||||
(name trace_fuchsia_write)
|
||||
(public_name trace-fuchsia.write)
|
||||
(synopsis "Serialization part of trace-fuchsia")
|
||||
(ocamlopt_flags
|
||||
:standard
|
||||
;-dlambda
|
||||
)
|
||||
(libraries trace.core threads))
|
||||
|
|
@ -1,65 +0,0 @@
|
|||
type t = {
|
||||
mutable buf: Buf.t;
|
||||
mutable send_buf: Buf.t -> unit;
|
||||
buf_pool: Buf_pool.t;
|
||||
}
|
||||
|
||||
let create ~(buf_pool : Buf_pool.t) ~send_buf () : t =
|
||||
let buf_size = buf_pool.buf_size in
|
||||
let buf = Buf.create buf_size in
|
||||
{ buf; send_buf; buf_pool }
|
||||
|
||||
open struct
|
||||
(* NOTE: there is a potential race condition if an output is
|
||||
flushed from the main thread upon closing, while
|
||||
the local thread is blissfully writing new records to it
|
||||
as we're winding down the collector. This is trying to reduce
|
||||
the likelyhood of a race happening. *)
|
||||
let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t =
|
||||
let old_buf = self.buf in
|
||||
self.buf <- new_buf;
|
||||
old_buf
|
||||
|
||||
let flush_ (self : t) : unit =
|
||||
let new_buf = Buf_pool.alloc self.buf_pool in
|
||||
let old_buf = replace_buf_ self new_buf in
|
||||
self.send_buf old_buf
|
||||
|
||||
let[@inline never] cycle_buf (self : t) ~available : Buf.t =
|
||||
flush_ self;
|
||||
let buf = self.buf in
|
||||
|
||||
if Buf.available buf < available then (
|
||||
let msg =
|
||||
Printf.sprintf
|
||||
"fuchsia: buffer is too small (available: %d bytes, needed: %d bytes)"
|
||||
(Buf.available buf) available
|
||||
in
|
||||
failwith msg
|
||||
);
|
||||
buf
|
||||
end
|
||||
|
||||
let[@inline] flush (self : t) : unit = if Buf.size self.buf > 0 then flush_ self
|
||||
|
||||
(** Maximum size available, in words, for a single message *)
|
||||
let[@inline] max_size_word (self : t) : int = self.buf_pool.buf_size lsr 3
|
||||
|
||||
(** Obtain a buffer with at least [available] bytes *)
|
||||
let[@inline] get_buf (self : t) ~(available_word : int) : Buf.t =
|
||||
let available = available_word lsl 3 in
|
||||
if Buf.available self.buf >= available then
|
||||
self.buf
|
||||
else
|
||||
cycle_buf self ~available
|
||||
|
||||
let into_buffer ~buf_pool (buffer : Buffer.t) : t =
|
||||
let send_buf (buf : Buf.t) =
|
||||
Buffer.add_subbytes buffer buf.buf 0 buf.offset
|
||||
in
|
||||
create ~buf_pool ~send_buf ()
|
||||
|
||||
let dispose (self : t) : unit =
|
||||
flush_ self;
|
||||
Buf_pool.recycle self.buf_pool self.buf;
|
||||
self.buf <- Buf.empty
|
||||
|
|
@ -2,14 +2,10 @@
|
|||
|
||||
Reference: https://fuchsia.dev/fuchsia-src/reference/tracing/trace-format *)
|
||||
|
||||
open Common_
|
||||
module Util = Util
|
||||
module Buf = Buf
|
||||
module Output = Output
|
||||
module Buf_pool = Buf_pool
|
||||
|
||||
open struct
|
||||
let spf = Printf.sprintf
|
||||
|
||||
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
|
||||
if id == Trace_core.Collector.dummy_trace_id then
|
||||
0L
|
||||
|
|
@ -19,7 +15,27 @@ end
|
|||
|
||||
open Util
|
||||
|
||||
type user_data = Trace_core.user_data
|
||||
type user_data = Sub.user_data =
|
||||
| U_bool of bool
|
||||
| U_float of float
|
||||
| U_int of int
|
||||
| U_none
|
||||
| U_string of string
|
||||
|
||||
type arg =
|
||||
| A_bool of bool
|
||||
| A_float of float
|
||||
| A_int of int
|
||||
| A_none
|
||||
| A_string of string
|
||||
| A_kid of int64
|
||||
|
||||
(* NOTE: only works because [user_data] is a prefix of [arg] and is immutable *)
|
||||
let arg_of_user_data : user_data -> arg = Obj.magic
|
||||
|
||||
(* NOTE: only works because [user_data] is a prefix of [arg] and is immutable *)
|
||||
let args_of_user_data : (string * user_data) list -> (string * arg) list =
|
||||
Obj.magic
|
||||
|
||||
module I64 = struct
|
||||
include Int64
|
||||
|
|
@ -111,8 +127,8 @@ module Metadata = struct
|
|||
let value = 0x0016547846040010L
|
||||
let size_word = 1
|
||||
|
||||
let encode (out : Output.t) =
|
||||
let buf = Output.get_buf out ~available_word:size_word in
|
||||
let encode (bufs : Buf_chain.t) =
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size_word in
|
||||
Buf.add_i64 buf value
|
||||
end
|
||||
|
||||
|
|
@ -122,8 +138,8 @@ module Metadata = struct
|
|||
(** Default: 1 tick = 1 ns *)
|
||||
let default_ticks_per_sec = 1_000_000_000L
|
||||
|
||||
let encode (out : Output.t) ~ticks_per_secs () : unit =
|
||||
let buf = Output.get_buf out ~available_word:size_word in
|
||||
let encode (bufs : Buf_chain.t) ~ticks_per_secs () : unit =
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size_word in
|
||||
let hd = I64.(1L lor (of_int size_word lsl 4)) in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_i64 buf ticks_per_secs
|
||||
|
|
@ -132,10 +148,10 @@ module Metadata = struct
|
|||
module Provider_info = struct
|
||||
let size_word ~name () = 1 + str_len_word name
|
||||
|
||||
let encode (out : Output.t) ~(id : int) ~name () : unit =
|
||||
let encode (bufs : Buf_chain.t) ~(id : int) ~name () : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
let hd =
|
||||
I64.(
|
||||
(of_int size lsl 4)
|
||||
|
|
@ -152,29 +168,29 @@ module Metadata = struct
|
|||
end
|
||||
|
||||
module Argument = struct
|
||||
type 'a t = string * ([< user_data | `Kid of int ] as 'a)
|
||||
type t = string * arg
|
||||
|
||||
let check_valid_ : _ t -> unit = function
|
||||
| _, `String s -> assert (String.length s < max_str_len)
|
||||
let check_valid_ : t -> unit = function
|
||||
| _, A_string s -> assert (String.length s < max_str_len)
|
||||
| _ -> ()
|
||||
|
||||
let[@inline] is_i32_ (i : int) : bool = Int32.(to_int (of_int i) = i)
|
||||
|
||||
let size_word (self : _ t) =
|
||||
let size_word (self : t) =
|
||||
let name, data = self in
|
||||
match data with
|
||||
| `None | `Bool _ -> 1 + str_len_word name
|
||||
| `Int i when is_i32_ i -> 1 + str_len_word name
|
||||
| `Int _ -> (* int64 *) 2 + str_len_word name
|
||||
| `Float _ -> 2 + str_len_word name
|
||||
| `String s -> 1 + str_len_word_maybe_too_big s + str_len_word name
|
||||
| `Kid _ -> 2 + str_len_word name
|
||||
| A_none | A_bool _ -> 1 + str_len_word name
|
||||
| A_int i when is_i32_ i -> 1 + str_len_word name
|
||||
| A_int _ -> (* int64 *) 2 + str_len_word name
|
||||
| A_float _ -> 2 + str_len_word name
|
||||
| A_string s -> 1 + str_len_word_maybe_too_big s + str_len_word name
|
||||
| A_kid _ -> 2 + str_len_word name
|
||||
|
||||
open struct
|
||||
external int_of_bool : bool -> int = "%identity"
|
||||
end
|
||||
|
||||
let encode (buf : Buf.t) (self : _ t) : unit =
|
||||
let encode (buf : Buf.t) (self : t) : unit =
|
||||
let name, data = self in
|
||||
let name = truncate_string name in
|
||||
let size = size_word self in
|
||||
|
|
@ -187,26 +203,26 @@ module Argument = struct
|
|||
in
|
||||
|
||||
match data with
|
||||
| `None ->
|
||||
| A_none ->
|
||||
let hd = hd_arg_size in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name
|
||||
| `Int i when is_i32_ i ->
|
||||
| A_int i when is_i32_ i ->
|
||||
let hd = I64.(1L lor hd_arg_size lor (of_int i lsl 32)) in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name
|
||||
| `Int i ->
|
||||
| A_int i ->
|
||||
(* int64 *)
|
||||
let hd = I64.(3L lor hd_arg_size) in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name;
|
||||
Buf.add_i64 buf (I64.of_int i)
|
||||
| `Float f ->
|
||||
| A_float f ->
|
||||
let hd = I64.(5L lor hd_arg_size) in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name;
|
||||
Buf.add_i64 buf (I64.bits_of_float f)
|
||||
| `String s ->
|
||||
| A_string s ->
|
||||
let s = truncate_string s in
|
||||
let hd =
|
||||
I64.(
|
||||
|
|
@ -216,35 +232,35 @@ module Argument = struct
|
|||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name;
|
||||
Buf.add_string buf s
|
||||
| `Bool b ->
|
||||
| A_bool b ->
|
||||
let hd = I64.(9L lor hd_arg_size lor (of_int (int_of_bool b) lsl 16)) in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name
|
||||
| `Kid kid ->
|
||||
| A_kid kid ->
|
||||
(* int64 *)
|
||||
let hd = I64.(8L lor hd_arg_size) in
|
||||
Buf.add_i64 buf hd;
|
||||
Buf.add_string buf name;
|
||||
Buf.add_i64 buf (I64.of_int kid)
|
||||
Buf.add_i64 buf kid
|
||||
end
|
||||
|
||||
module Arguments = struct
|
||||
type 'a t = 'a Argument.t list
|
||||
type t = Argument.t list
|
||||
|
||||
let[@inline] len (self : _ t) : int =
|
||||
let[@inline] len (self : t) : int =
|
||||
match self with
|
||||
| [] -> 0
|
||||
| [ _ ] -> 1
|
||||
| _ :: _ :: tl -> 2 + List.length tl
|
||||
|
||||
let check_valid (self : _ t) =
|
||||
let check_valid (self : t) =
|
||||
let len = len self in
|
||||
if len > 15 then
|
||||
invalid_arg (spf "fuchsia: can have at most 15 args, got %d" len);
|
||||
List.iter Argument.check_valid_ self;
|
||||
()
|
||||
|
||||
let[@inline] size_word (self : _ t) =
|
||||
let[@inline] size_word (self : t) =
|
||||
match self with
|
||||
| [] -> 0
|
||||
| [ a ] -> Argument.size_word a
|
||||
|
|
@ -254,7 +270,7 @@ module Arguments = struct
|
|||
(Argument.size_word a + Argument.size_word b)
|
||||
tl
|
||||
|
||||
let[@inline] encode (buf : Buf.t) (self : _ t) =
|
||||
let[@inline] encode (buf : Buf.t) (self : t) =
|
||||
let rec aux buf l =
|
||||
match l with
|
||||
| [] -> ()
|
||||
|
|
@ -276,11 +292,11 @@ module Thread_record = struct
|
|||
let size_word : int = 3
|
||||
|
||||
(** Record that [Thread_ref.ref as_ref] represents the pair [pid, tid] *)
|
||||
let encode (out : Output.t) ~as_ref ~pid ~tid () : unit =
|
||||
let encode (bufs : Buf_chain.t) ~as_ref ~pid ~tid () : unit =
|
||||
if as_ref <= 0 || as_ref > 255 then
|
||||
invalid_arg "fuchsia: thread_record: invalid ref";
|
||||
|
||||
let buf = Output.get_buf out ~available_word:size_word in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size_word in
|
||||
|
||||
let hd = I64.(3L lor (of_int size_word lsl 4) lor (of_int as_ref lsl 16)) in
|
||||
Buf.add_i64 buf hd;
|
||||
|
|
@ -296,11 +312,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
|
||||
: unit =
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args
|
||||
() : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
(* set category = 0 *)
|
||||
let hd =
|
||||
|
|
@ -331,11 +347,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args + 1 (* counter id *)
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
|
||||
: unit =
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args
|
||||
() : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
|
|
@ -368,11 +384,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
|
||||
: unit =
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args
|
||||
() : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
|
|
@ -403,11 +419,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args ()
|
||||
: unit =
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args
|
||||
() : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
|
|
@ -438,11 +454,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args + 1 (* end timestamp *)
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
~end_time_ns ~args () : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
(* set category = 0 *)
|
||||
let hd =
|
||||
|
|
@ -475,11 +491,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args + 1 (* async id *)
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
~(async_id : Trace_core.trace_id) ~args () : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
|
|
@ -511,11 +527,11 @@ module Event = struct
|
|||
1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name
|
||||
+ Arguments.size_word args + 1 (* async id *)
|
||||
|
||||
let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns
|
||||
~(async_id : Trace_core.trace_id) ~args () : unit =
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~t_ref ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
|
|
@ -556,10 +572,11 @@ module Kernel_object = struct
|
|||
let ty_process : ty = 1
|
||||
let ty_thread : ty = 2
|
||||
|
||||
let encode (out : Output.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit =
|
||||
let encode (bufs : Buf_chain.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit
|
||||
=
|
||||
let name = truncate_string name in
|
||||
let size = size_word ~name ~args () in
|
||||
let buf = Output.get_buf out ~available_word:size in
|
||||
let@ buf = Buf_chain.with_buf bufs ~available_word:size in
|
||||
|
||||
let hd =
|
||||
I64.(
|
||||
Loading…
Add table
Reference in a new issue