diff --git a/dune-project b/dune-project index 44135a7..ca2dd7d 100644 --- a/dune-project +++ b/dune-project @@ -72,6 +72,7 @@ (>= 2.0)) base-unix dune) + (depopts lwt) (tags (trace tracing catapult TEF chrome-format chrome-trace json))) diff --git a/src/tef/emit_tef.ml b/src/tef/emit_tef.ml deleted file mode 100644 index e69de29..0000000 diff --git a/src/tef/lwt/common_.ml b/src/tef/lwt/common_.ml new file mode 100644 index 0000000..a613fa1 --- /dev/null +++ b/src/tef/lwt/common_.ml @@ -0,0 +1,5 @@ +module Sub = Trace_subscriber +module A = Trace_core.Internal_.Atomic_ +include Lwt.Syntax + +let ( let@ ) = ( @@ ) diff --git a/src/tef/lwt/dune b/src/tef/lwt/dune new file mode 100644 index 0000000..37177ce --- /dev/null +++ b/src/tef/lwt/dune @@ -0,0 +1,15 @@ + +(library + (name trace_tef_lwt) + (public_name trace-tef.lwt) + (optional) ; lwt + (libraries + trace.core + trace.private.util + trace.subscriber + trace-tef + mtime + mtime.clock.os + unix + lwt + lwt.unix)) diff --git a/src/tef/lwt/exporter.ml b/src/tef/lwt/exporter.ml new file mode 100644 index 0000000..a324f3c --- /dev/null +++ b/src/tef/lwt/exporter.ml @@ -0,0 +1,88 @@ +(** An exporter, takes JSON objects and writes them somewhere *) + +open Common_ + +type t = { + on_json: Buffer.t -> unit; + (** Takes a buffer and writes it somewhere. The buffer is only valid + during this call and must not be stored. *) + flush: unit -> unit; (** Force write *) + close: unit -> unit; (** Close underlying resources *) +} +(** An exporter, takes JSON objects and writes them somewhere. + + This should be thread-safe if used in a threaded environment. *) + +open struct + let with_lock lock f = + Mutex.lock lock; + try + let res = f () in + Mutex.unlock lock; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + Mutex.unlock lock; + Printexc.raise_with_backtrace e bt +end + +(** Export to the lwt channel *) +let of_lwt_output_channel ~close_channel ~jsonl (oc: Lwt_io.output_channel) : t + +(** Export to the channel + @param jsonl + if true, export as a JSON object per line, otherwise export as a single + big JSON array. + @param close_channel if true, closing the exporter will close the channel *) +let of_out_channel ~close_channel ~jsonl oc : t = + let lock = Mutex.create () in + let first = ref true in + let closed = ref false in + let flush () = + let@ () = with_lock lock in + flush oc + in + let close () = + let@ () = with_lock lock in + if not !closed then ( + closed := true; + if not jsonl then output_char oc ']'; + if close_channel then close_out_noerr oc + ) + in + let on_json buf = + let@ () = with_lock lock in + if not jsonl then + if !first then ( + if not jsonl then output_char oc '['; + first := false + ) else + output_string oc ",\n"; + Buffer.output_buffer oc buf; + if jsonl then output_char oc '\n' + in + { flush; close; on_json } + +let of_buffer ~jsonl (buf : Buffer.t) : t = + let lock = Mutex.create () in + let first = ref true in + let closed = ref false in + let close () = + let@ () = with_lock lock in + if not !closed then ( + closed := true; + if not jsonl then Buffer.add_char buf ']' + ) + in + let on_json json = + let@ () = with_lock lock in + if not jsonl then + if !first then ( + if not jsonl then Buffer.add_char buf '['; + first := false + ) else + Buffer.add_string buf ",\n"; + Buffer.add_buffer buf json; + if jsonl then Buffer.add_char buf '\n' + in + { flush = ignore; close; on_json } diff --git a/src/tef/lwt/suscriber.ml b/src/tef/lwt/suscriber.ml new file mode 100644 index 0000000..df70340 --- /dev/null +++ b/src/tef/lwt/suscriber.ml @@ -0,0 +1 @@ +include Trace_tef.Subscriber diff --git a/src/tef/lwt/trace_tef_lwt.ml b/src/tef/lwt/trace_tef_lwt.ml new file mode 100644 index 0000000..39b9c06 --- /dev/null +++ b/src/tef/lwt/trace_tef_lwt.ml @@ -0,0 +1,142 @@ +open Trace_core +module Subscriber = Subscriber +module Exporter = Exporter +module Writer = Writer + +let block_signals () = + try + ignore + (Unix.sigprocmask SIG_BLOCK + [ + Sys.sigterm; + Sys.sigpipe; + Sys.sigint; + Sys.sigchld; + Sys.sigalrm; + Sys.sigusr1; + Sys.sigusr2; + ] + : _ list) + with _ -> () + +(** Thread that simply regularly "ticks", sending events to the background + thread so it has a chance to write to the file *) +let tick_thread (sub : Subscriber.t) : unit = + block_signals (); + while Subscriber.active sub do + Thread.delay 0.5; + Subscriber.flush sub + done + +type output = + [ `Stdout + | `Stderr + | `File of string + ] + +let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : + Trace_subscriber.t = + let jsonl = mode = `Jsonl in + let oc, must_close = + match out with + | `Stdout -> stdout, false + | `Stderr -> stderr, false + | `File path -> open_out path, true + | `File_append path -> + open_out_gen [ Open_creat; Open_wronly; Open_append ] 0o644 path, true + | `Output oc -> oc, false + in + let pid = + if !Trace_subscriber.Private_.mock then + 2 + else + Unix.getpid () + in + + let exporter = Exporter.of_out_channel oc ~jsonl ~close_channel:must_close in + let exporter = + { + exporter with + close = + (fun () -> + exporter.close (); + finally ()); + } + in + let sub = Subscriber.create ~pid ~exporter () in + let _t_tick : Thread.t = Thread.create tick_thread sub in + Subscriber.subscriber sub + +let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out () + : collector = + let sub = subscriber_ ~finally ~mode ~out () in + Trace_subscriber.collector sub + +let[@inline] subscriber ~out () : Trace_subscriber.t = + subscriber_ ~finally:ignore ~mode:`Single ~out () + +let[@inline] collector ~out () : collector = + collector_ ~finally:ignore ~mode:`Single ~out () + +open struct + let register_atexit = + let has_registered = ref false in + fun () -> + if not !has_registered then ( + has_registered := true; + at_exit Trace_core.shutdown + ) +end + +let setup ?(out = `Env) () = + register_atexit (); + match out with + | `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr () + | `Stdout -> Trace_core.setup_collector @@ collector ~out:`Stdout () + | `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) () + | `Env -> + (match Sys.getenv_opt "TRACE" with + | Some ("1" | "true") -> + let path = "trace.json" in + let c = collector ~out:(`File path) () in + Trace_core.setup_collector c + | Some "stdout" -> Trace_core.setup_collector @@ collector ~out:`Stdout () + | Some "stderr" -> Trace_core.setup_collector @@ collector ~out:`Stderr () + | Some path -> + let c = collector ~out:(`File path) () in + Trace_core.setup_collector c + | None -> ()) + +let with_setup ?out () f = + setup ?out (); + Fun.protect ~finally:Trace_core.shutdown f + +module Mock_ = struct + let now = ref 0 + + (* used to mock timing *) + let get_now_ns () : int64 = + let x = !now in + incr now; + Int64.(mul (of_int x) 1000L) + + let get_tid_ () : int = 3 +end + +module Private_ = struct + let mock_all_ () = + Trace_subscriber.Private_.mock := true; + Trace_subscriber.Private_.get_now_ns_ := Mock_.get_now_ns; + Trace_subscriber.Private_.get_tid_ := Mock_.get_tid_; + () + + let on_tracing_error = Subscriber.on_tracing_error + + let subscriber_jsonl ~finally ~out () = + subscriber_ ~finally ~mode:`Jsonl ~out () + + let collector_jsonl ~finally ~out () : collector = + collector_ ~finally ~mode:`Jsonl ~out () + + module Event = Event +end diff --git a/src/tef/lwt/trace_tef_lwt.mli b/src/tef/lwt/trace_tef_lwt.mli new file mode 100644 index 0000000..9cf8dd1 --- /dev/null +++ b/src/tef/lwt/trace_tef_lwt.mli @@ -0,0 +1,66 @@ +module Subscriber = Subscriber +module Exporter = Exporter +module Writer = Writer + +type output = + [ `Stdout + | `Stderr + | `File of string + ] +(** Output for tracing. + + - [`Stdout] will enable tracing and print events on stdout + - [`Stderr] will enable tracing and print events on stderr + - [`File "foo"] will enable tracing and print events into file named "foo" +*) + +val subscriber : out:[< output ] -> unit -> Trace_subscriber.t +(** A subscriber emitting TEF traces into [out]. + @since 0.8 *) + +val collector : out:[< output ] -> unit -> Trace_core.collector +(** Make a collector that writes into the given output. See {!setup} for more + details. *) + +val setup : ?out:[ output | `Env ] -> unit -> unit +(** [setup ()] installs the collector depending on [out]. + + @param out + can take different values: + - regular {!output} value to specify where events go + - [`Env] will enable tracing if the environment variable "TRACE" is set. + + - If it's set to "1", then the file is "trace.json". + - If it's set to "stdout", then logging happens on stdout (since 0.2) + - If it's set to "stderr", then logging happens on stdout (since 0.2) + - Otherwise, if it's set to a non empty string, the value is taken to be the + file path into which to write. *) + +val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a +(** [with_setup () f] (optionally) sets a collector up, calls [f()], and makes + sure to shutdown before exiting. since 0.2 a () argument was added. *) + +(**/**) + +module Private_ : sig + val mock_all_ : unit -> unit + (** use fake, deterministic timestamps, TID, PID *) + + val on_tracing_error : (string -> unit) ref + + val subscriber_jsonl : + finally:(unit -> unit) -> + out:[ `File_append of string | `Output of out_channel ] -> + unit -> + Trace_subscriber.t + + val collector_jsonl : + finally:(unit -> unit) -> + out:[ `File_append of string | `Output of out_channel ] -> + unit -> + Trace_core.collector + + module Event = Event +end + +(**/**) diff --git a/src/tef/lwt/writer.ml b/src/tef/lwt/writer.ml new file mode 100644 index 0000000..747a6b6 --- /dev/null +++ b/src/tef/lwt/writer.ml @@ -0,0 +1 @@ +include Trace_tef.Writer