mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
wip: lwt exporter
This commit is contained in:
parent
c2a1ee5904
commit
6fb5cf2ae6
9 changed files with 319 additions and 0 deletions
|
|
@ -72,6 +72,7 @@
|
||||||
(>= 2.0))
|
(>= 2.0))
|
||||||
base-unix
|
base-unix
|
||||||
dune)
|
dune)
|
||||||
|
(depopts lwt)
|
||||||
(tags
|
(tags
|
||||||
(trace tracing catapult TEF chrome-format chrome-trace json)))
|
(trace tracing catapult TEF chrome-format chrome-trace json)))
|
||||||
|
|
||||||
|
|
|
||||||
5
src/tef/lwt/common_.ml
Normal file
5
src/tef/lwt/common_.ml
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
module Sub = Trace_subscriber
|
||||||
|
module A = Trace_core.Internal_.Atomic_
|
||||||
|
include Lwt.Syntax
|
||||||
|
|
||||||
|
let ( let@ ) = ( @@ )
|
||||||
15
src/tef/lwt/dune
Normal file
15
src/tef/lwt/dune
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
|
||||||
|
(library
|
||||||
|
(name trace_tef_lwt)
|
||||||
|
(public_name trace-tef.lwt)
|
||||||
|
(optional) ; lwt
|
||||||
|
(libraries
|
||||||
|
trace.core
|
||||||
|
trace.private.util
|
||||||
|
trace.subscriber
|
||||||
|
trace-tef
|
||||||
|
mtime
|
||||||
|
mtime.clock.os
|
||||||
|
unix
|
||||||
|
lwt
|
||||||
|
lwt.unix))
|
||||||
88
src/tef/lwt/exporter.ml
Normal file
88
src/tef/lwt/exporter.ml
Normal file
|
|
@ -0,0 +1,88 @@
|
||||||
|
(** An exporter, takes JSON objects and writes them somewhere *)
|
||||||
|
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
type t = {
|
||||||
|
on_json: Buffer.t -> unit;
|
||||||
|
(** Takes a buffer and writes it somewhere. The buffer is only valid
|
||||||
|
during this call and must not be stored. *)
|
||||||
|
flush: unit -> unit; (** Force write *)
|
||||||
|
close: unit -> unit; (** Close underlying resources *)
|
||||||
|
}
|
||||||
|
(** An exporter, takes JSON objects and writes them somewhere.
|
||||||
|
|
||||||
|
This should be thread-safe if used in a threaded environment. *)
|
||||||
|
|
||||||
|
open struct
|
||||||
|
let with_lock lock f =
|
||||||
|
Mutex.lock lock;
|
||||||
|
try
|
||||||
|
let res = f () in
|
||||||
|
Mutex.unlock lock;
|
||||||
|
res
|
||||||
|
with e ->
|
||||||
|
let bt = Printexc.get_raw_backtrace () in
|
||||||
|
Mutex.unlock lock;
|
||||||
|
Printexc.raise_with_backtrace e bt
|
||||||
|
end
|
||||||
|
|
||||||
|
(** Export to the lwt channel *)
|
||||||
|
let of_lwt_output_channel ~close_channel ~jsonl (oc: Lwt_io.output_channel) : t
|
||||||
|
|
||||||
|
(** Export to the channel
|
||||||
|
@param jsonl
|
||||||
|
if true, export as a JSON object per line, otherwise export as a single
|
||||||
|
big JSON array.
|
||||||
|
@param close_channel if true, closing the exporter will close the channel *)
|
||||||
|
let of_out_channel ~close_channel ~jsonl oc : t =
|
||||||
|
let lock = Mutex.create () in
|
||||||
|
let first = ref true in
|
||||||
|
let closed = ref false in
|
||||||
|
let flush () =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
flush oc
|
||||||
|
in
|
||||||
|
let close () =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
if not !closed then (
|
||||||
|
closed := true;
|
||||||
|
if not jsonl then output_char oc ']';
|
||||||
|
if close_channel then close_out_noerr oc
|
||||||
|
)
|
||||||
|
in
|
||||||
|
let on_json buf =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
if not jsonl then
|
||||||
|
if !first then (
|
||||||
|
if not jsonl then output_char oc '[';
|
||||||
|
first := false
|
||||||
|
) else
|
||||||
|
output_string oc ",\n";
|
||||||
|
Buffer.output_buffer oc buf;
|
||||||
|
if jsonl then output_char oc '\n'
|
||||||
|
in
|
||||||
|
{ flush; close; on_json }
|
||||||
|
|
||||||
|
let of_buffer ~jsonl (buf : Buffer.t) : t =
|
||||||
|
let lock = Mutex.create () in
|
||||||
|
let first = ref true in
|
||||||
|
let closed = ref false in
|
||||||
|
let close () =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
if not !closed then (
|
||||||
|
closed := true;
|
||||||
|
if not jsonl then Buffer.add_char buf ']'
|
||||||
|
)
|
||||||
|
in
|
||||||
|
let on_json json =
|
||||||
|
let@ () = with_lock lock in
|
||||||
|
if not jsonl then
|
||||||
|
if !first then (
|
||||||
|
if not jsonl then Buffer.add_char buf '[';
|
||||||
|
first := false
|
||||||
|
) else
|
||||||
|
Buffer.add_string buf ",\n";
|
||||||
|
Buffer.add_buffer buf json;
|
||||||
|
if jsonl then Buffer.add_char buf '\n'
|
||||||
|
in
|
||||||
|
{ flush = ignore; close; on_json }
|
||||||
1
src/tef/lwt/suscriber.ml
Normal file
1
src/tef/lwt/suscriber.ml
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
include Trace_tef.Subscriber
|
||||||
142
src/tef/lwt/trace_tef_lwt.ml
Normal file
142
src/tef/lwt/trace_tef_lwt.ml
Normal file
|
|
@ -0,0 +1,142 @@
|
||||||
|
open Trace_core
|
||||||
|
module Subscriber = Subscriber
|
||||||
|
module Exporter = Exporter
|
||||||
|
module Writer = Writer
|
||||||
|
|
||||||
|
let block_signals () =
|
||||||
|
try
|
||||||
|
ignore
|
||||||
|
(Unix.sigprocmask SIG_BLOCK
|
||||||
|
[
|
||||||
|
Sys.sigterm;
|
||||||
|
Sys.sigpipe;
|
||||||
|
Sys.sigint;
|
||||||
|
Sys.sigchld;
|
||||||
|
Sys.sigalrm;
|
||||||
|
Sys.sigusr1;
|
||||||
|
Sys.sigusr2;
|
||||||
|
]
|
||||||
|
: _ list)
|
||||||
|
with _ -> ()
|
||||||
|
|
||||||
|
(** Thread that simply regularly "ticks", sending events to the background
|
||||||
|
thread so it has a chance to write to the file *)
|
||||||
|
let tick_thread (sub : Subscriber.t) : unit =
|
||||||
|
block_signals ();
|
||||||
|
while Subscriber.active sub do
|
||||||
|
Thread.delay 0.5;
|
||||||
|
Subscriber.flush sub
|
||||||
|
done
|
||||||
|
|
||||||
|
type output =
|
||||||
|
[ `Stdout
|
||||||
|
| `Stderr
|
||||||
|
| `File of string
|
||||||
|
]
|
||||||
|
|
||||||
|
let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () :
|
||||||
|
Trace_subscriber.t =
|
||||||
|
let jsonl = mode = `Jsonl in
|
||||||
|
let oc, must_close =
|
||||||
|
match out with
|
||||||
|
| `Stdout -> stdout, false
|
||||||
|
| `Stderr -> stderr, false
|
||||||
|
| `File path -> open_out path, true
|
||||||
|
| `File_append path ->
|
||||||
|
open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true
|
||||||
|
| `Output oc -> oc, false
|
||||||
|
in
|
||||||
|
let pid =
|
||||||
|
if !Trace_subscriber.Private_.mock then
|
||||||
|
2
|
||||||
|
else
|
||||||
|
Unix.getpid ()
|
||||||
|
in
|
||||||
|
|
||||||
|
let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in
|
||||||
|
let exporter =
|
||||||
|
{
|
||||||
|
exporter with
|
||||||
|
close =
|
||||||
|
(fun () ->
|
||||||
|
exporter.close ();
|
||||||
|
finally ());
|
||||||
|
}
|
||||||
|
in
|
||||||
|
let sub = Subscriber.create ~pid ~exporter () in
|
||||||
|
let _t_tick : Thread.t = Thread.create tick_thread sub in
|
||||||
|
Subscriber.subscriber sub
|
||||||
|
|
||||||
|
let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out ()
|
||||||
|
: collector =
|
||||||
|
let sub = subscriber_ ~finally ~mode ~out () in
|
||||||
|
Trace_subscriber.collector sub
|
||||||
|
|
||||||
|
let[@inline] subscriber ~out () : Trace_subscriber.t =
|
||||||
|
subscriber_ ~finally:ignore ~mode:`Single ~out ()
|
||||||
|
|
||||||
|
let[@inline] collector ~out () : collector =
|
||||||
|
collector_ ~finally:ignore ~mode:`Single ~out ()
|
||||||
|
|
||||||
|
open struct
|
||||||
|
let register_atexit =
|
||||||
|
let has_registered = ref false in
|
||||||
|
fun () ->
|
||||||
|
if not !has_registered then (
|
||||||
|
has_registered := true;
|
||||||
|
at_exit Trace_core.shutdown
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
let setup ?(out = `Env) () =
|
||||||
|
register_atexit ();
|
||||||
|
match out with
|
||||||
|
| `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr ()
|
||||||
|
| `Stdout -> Trace_core.setup_collector @@ collector ~out:`Stdout ()
|
||||||
|
| `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) ()
|
||||||
|
| `Env ->
|
||||||
|
(match Sys.getenv_opt "TRACE" with
|
||||||
|
| Some ("1" | "true") ->
|
||||||
|
let path = "trace.json" in
|
||||||
|
let c = collector ~out:(`File path) () in
|
||||||
|
Trace_core.setup_collector c
|
||||||
|
| Some "stdout" -> Trace_core.setup_collector @@ collector ~out:`Stdout ()
|
||||||
|
| Some "stderr" -> Trace_core.setup_collector @@ collector ~out:`Stderr ()
|
||||||
|
| Some path ->
|
||||||
|
let c = collector ~out:(`File path) () in
|
||||||
|
Trace_core.setup_collector c
|
||||||
|
| None -> ())
|
||||||
|
|
||||||
|
let with_setup ?out () f =
|
||||||
|
setup ?out ();
|
||||||
|
Fun.protect ~finally:Trace_core.shutdown f
|
||||||
|
|
||||||
|
module Mock_ = struct
|
||||||
|
let now = ref 0
|
||||||
|
|
||||||
|
(* used to mock timing *)
|
||||||
|
let get_now_ns () : int64 =
|
||||||
|
let x = !now in
|
||||||
|
incr now;
|
||||||
|
Int64.(mul (of_int x) 1000L)
|
||||||
|
|
||||||
|
let get_tid_ () : int = 3
|
||||||
|
end
|
||||||
|
|
||||||
|
module Private_ = struct
|
||||||
|
let mock_all_ () =
|
||||||
|
Trace_subscriber.Private_.mock := true;
|
||||||
|
Trace_subscriber.Private_.get_now_ns_ := Mock_.get_now_ns;
|
||||||
|
Trace_subscriber.Private_.get_tid_ := Mock_.get_tid_;
|
||||||
|
()
|
||||||
|
|
||||||
|
let on_tracing_error = Subscriber.on_tracing_error
|
||||||
|
|
||||||
|
let subscriber_jsonl ~finally ~out () =
|
||||||
|
subscriber_ ~finally ~mode:`Jsonl ~out ()
|
||||||
|
|
||||||
|
let collector_jsonl ~finally ~out () : collector =
|
||||||
|
collector_ ~finally ~mode:`Jsonl ~out ()
|
||||||
|
|
||||||
|
module Event = Event
|
||||||
|
end
|
||||||
66
src/tef/lwt/trace_tef_lwt.mli
Normal file
66
src/tef/lwt/trace_tef_lwt.mli
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
module Subscriber = Subscriber
|
||||||
|
module Exporter = Exporter
|
||||||
|
module Writer = Writer
|
||||||
|
|
||||||
|
type output =
|
||||||
|
[ `Stdout
|
||||||
|
| `Stderr
|
||||||
|
| `File of string
|
||||||
|
]
|
||||||
|
(** Output for tracing.
|
||||||
|
|
||||||
|
- [`Stdout] will enable tracing and print events on stdout
|
||||||
|
- [`Stderr] will enable tracing and print events on stderr
|
||||||
|
- [`File "foo"] will enable tracing and print events into file named "foo"
|
||||||
|
*)
|
||||||
|
|
||||||
|
val subscriber : out:[< output ] -> unit -> Trace_subscriber.t
|
||||||
|
(** A subscriber emitting TEF traces into [out].
|
||||||
|
@since 0.8 *)
|
||||||
|
|
||||||
|
val collector : out:[< output ] -> unit -> Trace_core.collector
|
||||||
|
(** Make a collector that writes into the given output. See {!setup} for more
|
||||||
|
details. *)
|
||||||
|
|
||||||
|
val setup : ?out:[ output | `Env ] -> unit -> unit
|
||||||
|
(** [setup ()] installs the collector depending on [out].
|
||||||
|
|
||||||
|
@param out
|
||||||
|
can take different values:
|
||||||
|
- regular {!output} value to specify where events go
|
||||||
|
- [`Env] will enable tracing if the environment variable "TRACE" is set.
|
||||||
|
|
||||||
|
- If it's set to "1", then the file is "trace.json".
|
||||||
|
- If it's set to "stdout", then logging happens on stdout (since 0.2)
|
||||||
|
- If it's set to "stderr", then logging happens on stdout (since 0.2)
|
||||||
|
- Otherwise, if it's set to a non empty string, the value is taken to be the
|
||||||
|
file path into which to write. *)
|
||||||
|
|
||||||
|
val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
|
||||||
|
(** [with_setup () f] (optionally) sets a collector up, calls [f()], and makes
|
||||||
|
sure to shutdown before exiting. since 0.2 a () argument was added. *)
|
||||||
|
|
||||||
|
(**/**)
|
||||||
|
|
||||||
|
module Private_ : sig
|
||||||
|
val mock_all_ : unit -> unit
|
||||||
|
(** use fake, deterministic timestamps, TID, PID *)
|
||||||
|
|
||||||
|
val on_tracing_error : (string -> unit) ref
|
||||||
|
|
||||||
|
val subscriber_jsonl :
|
||||||
|
finally:(unit -> unit) ->
|
||||||
|
out:[ `File_append of string | `Output of out_channel ] ->
|
||||||
|
unit ->
|
||||||
|
Trace_subscriber.t
|
||||||
|
|
||||||
|
val collector_jsonl :
|
||||||
|
finally:(unit -> unit) ->
|
||||||
|
out:[ `File_append of string | `Output of out_channel ] ->
|
||||||
|
unit ->
|
||||||
|
Trace_core.collector
|
||||||
|
|
||||||
|
module Event = Event
|
||||||
|
end
|
||||||
|
|
||||||
|
(**/**)
|
||||||
1
src/tef/lwt/writer.ml
Normal file
1
src/tef/lwt/writer.ml
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
include Trace_tef.Writer
|
||||||
Loading…
Add table
Reference in a new issue