add trace-tef, a simple backend

This commit is contained in:
Simon Cruanes 2023-06-08 22:31:32 -04:00
parent 39a14bbede
commit 07645ffa1a
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
10 changed files with 456 additions and 2 deletions

View file

@ -18,6 +18,18 @@
(ocaml (>= 4.05))
dune)
(tags
(tracing observability profiling)))
(trace tracing observability profiling)))
(package
(name trace-tef)
(synopsis "A simple backend for trace")
(depends
(ocaml (>= 4.05))
(trace (= :version))
mtime
base-unix
dune)
(tags
(trace tracing catapult)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

53
src/tef/b_queue.ml Normal file
View file

@ -0,0 +1,53 @@
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else (
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
)
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()

18
src/tef/b_queue.mli Normal file
View file

@ -0,0 +1,18 @@
(** Basic Blocking Queue *)
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)

6
src/tef/dune Normal file
View file

@ -0,0 +1,6 @@
(library
(name trace_tef)
(public_name trace-tef)
(synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process")
(libraries trace mtime mtime.clock.os unix threads))

272
src/tef/trace_tef.ml Normal file
View file

@ -0,0 +1,272 @@
open Trace
module A = Trace.Internal_.Atomic_
module Mock_mtime_ = struct
let enabled = ref false
let now = ref 0
let[@inline never] now_us () : float =
let x = !now in
incr now;
float_of_int x
end
let counter = Mtime_clock.counter ()
(** Now, in microseconds *)
let[@inline] now_us () : float =
if !Mock_mtime_.enabled then
Mock_mtime_.now_us ()
else (
let t = Mtime_clock.count counter in
Mtime.Span.to_float_ns t /. 1e3
)
let protect ~finally f =
try
let x = f () in
finally ();
x
with exn ->
let bt = Printexc.get_raw_backtrace () in
finally ();
Printexc.raise_with_backtrace exn bt
type event =
| E_message of {
(*
__FUNCTION__: string;
__FILE__: string;
__LINE__: int;
*)
tid: int;
msg: string;
time_us: float;
}
| E_define_span of {
(*
__FUNCTION__: string;
__FILE__: string;
__LINE__: int;
*)
tid: int;
name: string;
time_us: float;
id: span;
}
| E_exit_span of {
id: span;
time_us: float;
}
module Span_tbl = Hashtbl.Make (struct
include Int64
let hash : t -> int = Hashtbl.hash
end)
type span_info = {
(*
__FUNCTION__: string;
__FILE__: string;
__LINE__: int;
*)
tid: int;
name: string;
start_us: float;
}
module Writer = struct
type t = {
oc: out_channel;
mutable first: bool; (** first event? *)
must_close: bool;
pid: int;
}
let create ~out () : t =
let oc, must_close =
match out with
| `Stdout -> stdout, false
| `Stderr -> stderr, false
| `File path -> open_out path, true
in
let pid = Unix.getpid () in
output_char oc '[';
{ oc; first = true; pid; must_close }
let close (self : t) : unit =
output_char self.oc ']';
flush self.oc;
if self.must_close then close_out self.oc
let emit_sep_ (self : t) =
if self.first then
self.first <- false
else
output_string self.oc ",\n"
let char = output_char
let raw_string = output_string
let str_val oc (s : string) =
char oc '"';
let encode_char c =
match c with
| '"' -> raw_string oc {|\"|}
| '\\' -> raw_string oc {|\\|}
| '\n' -> raw_string oc {|\n|}
| '\b' -> raw_string oc {|\b|}
| '\r' -> raw_string oc {|\r|}
| '\t' -> raw_string oc {|\t|}
| _ when Char.code c <= 0x1f ->
raw_string oc {|\u00|};
Printf.fprintf oc "%02x" (Char.code c)
| c -> char oc c
in
String.iter encode_char s;
char oc '"'
(* emit args, if not empty. [ppv] is used to print values. *)
let emit_args_o_ ppv oc args : unit =
if args <> [] then (
Printf.fprintf oc {json|,"args": {|json};
List.iteri
(fun i (n, value) ->
if i > 0 then Printf.fprintf oc ",";
Printf.fprintf oc {json|"%s":%a|json} n ppv value)
args;
char oc '}'
)
let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit =
let dur = end_ -. start in
let ts = start in
emit_sep_ self;
Printf.fprintf self.oc
{json|{"pid": %d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json}
self.pid tid dur ts str_val name (emit_args_o_ str_val) args;
()
let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit =
emit_sep_ self;
Printf.fprintf self.oc
{json|{"pid": %d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json}
self.pid tid ts str_val name (emit_args_o_ str_val) args;
()
end
let bg_thread ~out (events : event B_queue.t) : unit =
let writer = Writer.create ~out () in
protect ~finally:(fun () -> Writer.close writer) @@ fun () ->
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
(* how to deal with an event *)
let handle_ev (ev : event) : unit =
match ev with
| E_message { (* __FUNCTION__; __FILE__; __LINE__; *) tid; msg; time_us } ->
Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:[] writer
| E_define_span
{ (* __FUNCTION__; __FILE__; __LINE__; *) tid; name; id; time_us } ->
(* save the span so we find it at exit *)
Span_tbl.add spans id
{
(* __FUNCTION__; __FILE__; __LINE__; *) tid;
name;
start_us = time_us;
}
| E_exit_span { id; time_us = stop_us } ->
(match Span_tbl.find_opt spans id with
| None -> (* bug! *) ()
| Some { (* __FUNCTION__; __FILE__; __LINE__; *) tid; name; start_us } ->
Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us
~args:[] writer)
in
try
while true do
let ev = B_queue.pop events in
handle_ev ev
done
with B_queue.Closed -> ()
type output =
[ `Stdout
| `Stderr
| `File of string
]
let collector ~out () : collector =
let module M = struct
let active = A.make true
(** generator for span ids *)
let span_id_gen_ = A.make 0
let enabled () = true
(* queue of messages to write *)
let events : event B_queue.t = B_queue.create ()
(* writer thread. It receives events and writes them to [oc]. *)
let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) ()
let shutdown () =
if A.exchange active false then (
Printf.eprintf "shutdown\n%!";
B_queue.close events;
Printf.eprintf "wait\n%!";
Thread.join t_write
)
let[@inline] get_tid_ () : int = Thread.id (Thread.self ())
let create_span ?__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ name : span =
let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in
let tid = get_tid_ () in
let time_us = now_us () in
B_queue.push events
(E_define_span
{
(* __FUNCTION__; __FILE__; __LINE__; *) tid;
name;
time_us;
id = span;
});
span
let exit_span span : unit =
let time_us = now_us () in
B_queue.push events (E_exit_span { id = span; time_us })
let message ?__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ msg : unit =
let time_us = now_us () in
let tid = get_tid_ () in
B_queue.push events
(E_message { (* __FUNCTION__; __FILE__; __LINE__; *) tid; time_us; msg })
end in
(module M)
let setup ?(out = `Env) () =
match out with
| `Stderr -> Trace.setup_collector @@ collector ~out:`Stderr ()
| `Stdout -> Trace.setup_collector @@ collector ~out:`Stdout ()
| `File path -> Trace.setup_collector @@ collector ~out:(`File path) ()
| `Env ->
(match Sys.getenv_opt "TRACE" with
| Some "1" ->
let path = "trace.json" in
let c = collector ~out:(`File path) () in
Trace.setup_collector c
| Some path ->
let c = collector ~out:(`File path) () in
Trace.setup_collector c
| None -> ())
let with_setup ?out f =
setup ?out ();
protect ~finally:Trace.shutdown f
module Internal_ = struct
let use_mock_mtime_ () = Mock_mtime_.enabled := true
end

44
src/tef/trace_tef.mli Normal file
View file

@ -0,0 +1,44 @@
open Trace
val collector :
out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace.collector
(** Make a collector that writes into the given output.
See {!setup} for more details. *)
type output =
[ `Stdout
| `Stderr
| `File of string
]
(** Output for tracing.
- [`Stdout] will enable tracing and print events on stdout
- [`Stderr] will enable tracing and print events on stderr
- [`File "foo"] will enable tracing and print events into file
named "foo"
*)
val setup : ?out:[ output | `Env ] -> unit -> unit
(** [setup ()] installs the collector depending on [out].
@param out can take different values:
- regular {!output} value to specify where events go
- [`Env] will enable tracing if the environment
variable "TRACE" is set.
If it's set to anything but "1", the value is taken
to be the file path into which to write.
If it's set to "1", then the file is "trace.json".
*)
val with_setup : ?out:[ output | `Env ] -> (unit -> 'a) -> 'a
(** Setup, and make sure to shutdown before exiting *)
(**/**)
module Internal_ : sig
val use_mock_mtime_ : unit -> unit
(* use fake, deterministic timestamps *)
end
(**/**)

View file

@ -64,3 +64,7 @@ let shutdown () =
match A.exchange collector None with
| None -> ()
| Some (module C) -> C.shutdown ()
module Internal_ = struct
module Atomic_ = Atomic_
end

View file

@ -23,6 +23,8 @@ val with_ :
val message :
?__FUNCTION__:string -> __FILE__:string -> __LINE__:int -> string -> unit
(* TODO: counter/plot/metric *)
val messagef :
?__FUNCTION__:string ->
__FILE__:string ->
@ -40,3 +42,12 @@ val setup_collector : collector -> unit
collector. *)
val shutdown : unit -> unit
(**/**)
(* no guarantee of stability *)
module Internal_ : sig
module Atomic_ = Atomic_
end
(**/**)

34
trace-tef.opam Normal file
View file

@ -0,0 +1,34 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "A simple backend for trace"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["trace" "tracing" "catapult"]
homepage: "https://github.com/c-cube/trace"
bug-reports: "https://github.com/c-cube/trace/issues"
depends: [
"ocaml" {>= "4.05"}
"trace" {= version}
"mtime"
"base-unix"
"dune" {>= "2.9"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"--promote-install-files=false"
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
["dune" "install" "-p" name "--create-install-files" name]
]
dev-repo: "git+https://github.com/c-cube/trace.git"

View file

@ -4,7 +4,7 @@ synopsis: "A stub for tracing/observability"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["tracing" "observability" "profiling"]
tags: ["trace" "tracing" "observability" "profiling"]
homepage: "https://github.com/c-cube/trace"
bug-reports: "https://github.com/c-cube/trace/issues"
depends: [