diff --git a/dune-project b/dune-project index 78001d0..75eb850 100644 --- a/dune-project +++ b/dune-project @@ -18,6 +18,18 @@ (ocaml (>= 4.05)) dune) (tags - (tracing observability profiling))) + (trace tracing observability profiling))) + +(package + (name trace-tef) + (synopsis "A simple backend for trace") + (depends + (ocaml (>= 4.05)) + (trace (= :version)) + mtime + base-unix + dune) + (tags + (trace tracing catapult))) ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project diff --git a/src/tef/b_queue.ml b/src/tef/b_queue.ml new file mode 100644 index 0000000..ce9add5 --- /dev/null +++ b/src/tef/b_queue.ml @@ -0,0 +1,53 @@ +type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; + mutable closed: bool; +} + +exception Closed + +let create () : _ t = + { + mutex = Mutex.create (); + cond = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Condition.broadcast self.cond (* awake waiters so they fail *) + ); + Mutex.unlock self.mutex + +let push (self : _ t) x : unit = + Mutex.lock self.mutex; + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else ( + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + ) + +let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + x + ) + in + loop () diff --git a/src/tef/b_queue.mli b/src/tef/b_queue.mli new file mode 100644 index 0000000..e833c92 --- /dev/null +++ b/src/tef/b_queue.mli @@ -0,0 +1,18 @@ +(** Basic Blocking Queue *) + +type 'a t + +val create : unit -> _ t + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + +val pop : 'a t -> 'a +(** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val close : _ t -> unit +(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/tef/dune b/src/tef/dune new file mode 100644 index 0000000..4dcd4df --- /dev/null +++ b/src/tef/dune @@ -0,0 +1,6 @@ + +(library + (name trace_tef) + (public_name trace-tef) + (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") + (libraries trace mtime mtime.clock.os unix threads)) diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml new file mode 100644 index 0000000..495d772 --- /dev/null +++ b/src/tef/trace_tef.ml @@ -0,0 +1,272 @@ +open Trace +module A = Trace.Internal_.Atomic_ + +module Mock_mtime_ = struct + let enabled = ref false + let now = ref 0 + + let[@inline never] now_us () : float = + let x = !now in + incr now; + float_of_int x +end + +let counter = Mtime_clock.counter () + +(** Now, in microseconds *) +let[@inline] now_us () : float = + if !Mock_mtime_.enabled then + Mock_mtime_.now_us () + else ( + let t = Mtime_clock.count counter in + Mtime.Span.to_float_ns t /. 1e3 + ) + +let protect ~finally f = + try + let x = f () in + finally (); + x + with exn -> + let bt = Printexc.get_raw_backtrace () in + finally (); + Printexc.raise_with_backtrace exn bt + +type event = + | E_message of { + (* + __FUNCTION__: string; + __FILE__: string; + __LINE__: int; + *) + tid: int; + msg: string; + time_us: float; + } + | E_define_span of { + (* + __FUNCTION__: string; + __FILE__: string; + __LINE__: int; + *) + tid: int; + name: string; + time_us: float; + id: span; + } + | E_exit_span of { + id: span; + time_us: float; + } + +module Span_tbl = Hashtbl.Make (struct + include Int64 + + let hash : t -> int = Hashtbl.hash +end) + +type span_info = { + (* + __FUNCTION__: string; + __FILE__: string; + __LINE__: int; + *) + tid: int; + name: string; + start_us: float; +} + +module Writer = struct + type t = { + oc: out_channel; + mutable first: bool; (** first event? *) + must_close: bool; + pid: int; + } + + let create ~out () : t = + let oc, must_close = + match out with + | `Stdout -> stdout, false + | `Stderr -> stderr, false + | `File path -> open_out path, true + in + let pid = Unix.getpid () in + output_char oc '['; + { oc; first = true; pid; must_close } + + let close (self : t) : unit = + output_char self.oc ']'; + flush self.oc; + if self.must_close then close_out self.oc + + let emit_sep_ (self : t) = + if self.first then + self.first <- false + else + output_string self.oc ",\n" + + let char = output_char + let raw_string = output_string + + let str_val oc (s : string) = + char oc '"'; + let encode_char c = + match c with + | '"' -> raw_string oc {|\"|} + | '\\' -> raw_string oc {|\\|} + | '\n' -> raw_string oc {|\n|} + | '\b' -> raw_string oc {|\b|} + | '\r' -> raw_string oc {|\r|} + | '\t' -> raw_string oc {|\t|} + | _ when Char.code c <= 0x1f -> + raw_string oc {|\u00|}; + Printf.fprintf oc "%02x" (Char.code c) + | c -> char oc c + in + String.iter encode_char s; + char oc '"' + + (* emit args, if not empty. [ppv] is used to print values. *) + let emit_args_o_ ppv oc args : unit = + if args <> [] then ( + Printf.fprintf oc {json|,"args": {|json}; + List.iteri + (fun i (n, value) -> + if i > 0 then Printf.fprintf oc ","; + Printf.fprintf oc {json|"%s":%a|json} n ppv value) + args; + char oc '}' + ) + + let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit = + let dur = end_ -. start in + let ts = start in + emit_sep_ self; + Printf.fprintf self.oc + {json|{"pid": %d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} + self.pid tid dur ts str_val name (emit_args_o_ str_val) args; + () + + let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = + emit_sep_ self; + Printf.fprintf self.oc + {json|{"pid": %d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} + self.pid tid ts str_val name (emit_args_o_ str_val) args; + () +end + +let bg_thread ~out (events : event B_queue.t) : unit = + let writer = Writer.create ~out () in + protect ~finally:(fun () -> Writer.close writer) @@ fun () -> + let spans : span_info Span_tbl.t = Span_tbl.create 32 in + + (* how to deal with an event *) + let handle_ev (ev : event) : unit = + match ev with + | E_message { (* __FUNCTION__; __FILE__; __LINE__; *) tid; msg; time_us } -> + Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:[] writer + | E_define_span + { (* __FUNCTION__; __FILE__; __LINE__; *) tid; name; id; time_us } -> + (* save the span so we find it at exit *) + Span_tbl.add spans id + { + (* __FUNCTION__; __FILE__; __LINE__; *) tid; + name; + start_us = time_us; + } + | E_exit_span { id; time_us = stop_us } -> + (match Span_tbl.find_opt spans id with + | None -> (* bug! *) () + | Some { (* __FUNCTION__; __FILE__; __LINE__; *) tid; name; start_us } -> + Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us + ~args:[] writer) + in + + try + while true do + let ev = B_queue.pop events in + handle_ev ev + done + with B_queue.Closed -> () + +type output = + [ `Stdout + | `Stderr + | `File of string + ] + +let collector ~out () : collector = + let module M = struct + let active = A.make true + + (** generator for span ids *) + let span_id_gen_ = A.make 0 + + let enabled () = true + + (* queue of messages to write *) + let events : event B_queue.t = B_queue.create () + + (* writer thread. It receives events and writes them to [oc]. *) + let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) () + + let shutdown () = + if A.exchange active false then ( + Printf.eprintf "shutdown\n%!"; + B_queue.close events; + Printf.eprintf "wait\n%!"; + Thread.join t_write + ) + + let[@inline] get_tid_ () : int = Thread.id (Thread.self ()) + + let create_span ?__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ name : span = + let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in + let tid = get_tid_ () in + let time_us = now_us () in + B_queue.push events + (E_define_span + { + (* __FUNCTION__; __FILE__; __LINE__; *) tid; + name; + time_us; + id = span; + }); + span + + let exit_span span : unit = + let time_us = now_us () in + B_queue.push events (E_exit_span { id = span; time_us }) + + let message ?__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ msg : unit = + let time_us = now_us () in + let tid = get_tid_ () in + B_queue.push events + (E_message { (* __FUNCTION__; __FILE__; __LINE__; *) tid; time_us; msg }) + end in + (module M) + +let setup ?(out = `Env) () = + match out with + | `Stderr -> Trace.setup_collector @@ collector ~out:`Stderr () + | `Stdout -> Trace.setup_collector @@ collector ~out:`Stdout () + | `File path -> Trace.setup_collector @@ collector ~out:(`File path) () + | `Env -> + (match Sys.getenv_opt "TRACE" with + | Some "1" -> + let path = "trace.json" in + let c = collector ~out:(`File path) () in + Trace.setup_collector c + | Some path -> + let c = collector ~out:(`File path) () in + Trace.setup_collector c + | None -> ()) + +let with_setup ?out f = + setup ?out (); + protect ~finally:Trace.shutdown f + +module Internal_ = struct + let use_mock_mtime_ () = Mock_mtime_.enabled := true +end diff --git a/src/tef/trace_tef.mli b/src/tef/trace_tef.mli new file mode 100644 index 0000000..f9d3de1 --- /dev/null +++ b/src/tef/trace_tef.mli @@ -0,0 +1,44 @@ +open Trace + +val collector : + out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace.collector +(** Make a collector that writes into the given output. + See {!setup} for more details. *) + +type output = + [ `Stdout + | `Stderr + | `File of string + ] +(** Output for tracing. + + - [`Stdout] will enable tracing and print events on stdout + - [`Stderr] will enable tracing and print events on stderr + - [`File "foo"] will enable tracing and print events into file + named "foo" +*) + +val setup : ?out:[ output | `Env ] -> unit -> unit +(** [setup ()] installs the collector depending on [out]. + + @param out can take different values: + - regular {!output} value to specify where events go + - [`Env] will enable tracing if the environment + variable "TRACE" is set. + + If it's set to anything but "1", the value is taken + to be the file path into which to write. + If it's set to "1", then the file is "trace.json". +*) + +val with_setup : ?out:[ output | `Env ] -> (unit -> 'a) -> 'a +(** Setup, and make sure to shutdown before exiting *) + +(**/**) + +module Internal_ : sig + val use_mock_mtime_ : unit -> unit + (* use fake, deterministic timestamps *) +end + +(**/**) diff --git a/src/trace.ml b/src/trace.ml index 809fc30..08d9548 100644 --- a/src/trace.ml +++ b/src/trace.ml @@ -64,3 +64,7 @@ let shutdown () = match A.exchange collector None with | None -> () | Some (module C) -> C.shutdown () + +module Internal_ = struct + module Atomic_ = Atomic_ +end diff --git a/src/trace.mli b/src/trace.mli index f7d714f..e437db6 100644 --- a/src/trace.mli +++ b/src/trace.mli @@ -23,6 +23,8 @@ val with_ : val message : ?__FUNCTION__:string -> __FILE__:string -> __LINE__:int -> string -> unit +(* TODO: counter/plot/metric *) + val messagef : ?__FUNCTION__:string -> __FILE__:string -> @@ -40,3 +42,12 @@ val setup_collector : collector -> unit collector. *) val shutdown : unit -> unit + +(**/**) + +(* no guarantee of stability *) +module Internal_ : sig + module Atomic_ = Atomic_ +end + +(**/**) diff --git a/trace-tef.opam b/trace-tef.opam new file mode 100644 index 0000000..026ee06 --- /dev/null +++ b/trace-tef.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "A simple backend for trace" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["trace" "tracing" "catapult"] +homepage: "https://github.com/c-cube/trace" +bug-reports: "https://github.com/c-cube/trace/issues" +depends: [ + "ocaml" {>= "4.05"} + "trace" {= version} + "mtime" + "base-unix" + "dune" {>= "2.9"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/c-cube/trace.git" diff --git a/trace.opam b/trace.opam index 162e697..ea1e948 100644 --- a/trace.opam +++ b/trace.opam @@ -4,7 +4,7 @@ synopsis: "A stub for tracing/observability" maintainer: ["Simon Cruanes"] authors: ["Simon Cruanes"] license: "MIT" -tags: ["tracing" "observability" "profiling"] +tags: ["trace" "tracing" "observability" "profiling"] homepage: "https://github.com/c-cube/trace" bug-reports: "https://github.com/c-cube/trace/issues" depends: [