From c75ded014cfbad24c04c49dc37537b453b2069ad Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 16 Aug 2024 15:45:49 -0400 Subject: [PATCH] feat: add `trace-tef.multiproc` produce a TEF trace in a single file, even in a context with multiple sub-processes. The file is produced as the end by merging multiple .jsonl files (one per subprocess) --- src/tef-multiproc/dune | 6 + src/tef-multiproc/trace_tef_multiproc.ml | 161 ++++++++++++++++++++++ src/tef-multiproc/trace_tef_multiproc.mli | 40 ++++++ 3 files changed, 207 insertions(+) create mode 100644 src/tef-multiproc/dune create mode 100644 src/tef-multiproc/trace_tef_multiproc.ml create mode 100644 src/tef-multiproc/trace_tef_multiproc.mli diff --git a/src/tef-multiproc/dune b/src/tef-multiproc/dune new file mode 100644 index 0000000..262ed69 --- /dev/null +++ b/src/tef-multiproc/dune @@ -0,0 +1,6 @@ + +(library + (name trace_tef_multiproc) + (public_name trace-tef.multiproc) + (synopsis "Tracing using TEF/Catapult format for multiple processes") + (libraries trace.core trace-tef unix threads)) diff --git a/src/tef-multiproc/trace_tef_multiproc.ml b/src/tef-multiproc/trace_tef_multiproc.ml new file mode 100644 index 0000000..a844f79 --- /dev/null +++ b/src/tef-multiproc/trace_tef_multiproc.ml @@ -0,0 +1,161 @@ +open Trace_core + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf + +type output = [ `File of string ] + +(** Env variable used to communicate to subprocesses, which directory into + which they must write trace files as *) +let env_var = "TRACE_TEF_MULTIPROC_DIR" + +(** Read all the ".jsonl" files in [dir] and write a single json into [final_file] *) +let aggregate_into ~dir ~final_file () : unit = + let files = Sys.readdir dir in + let oc = open_out final_file in + let@ () = + Fun.protect ~finally:(fun () -> + output_char oc ']'; + close_out_noerr oc) + in + output_char oc '['; + + let first = ref true in + let afternewline = ref false in + + (* buffer used to read lines *) + let buf = Bytes.create 4096 in + + let emit_chunk buf i len = + if !afternewline && !first then + first := false + else if !afternewline then ( + output_string oc ",\n"; + afternewline := false + ); + output oc buf i len + in + + (* dump content of jsonl file into [oc]. Insert "," before every object + except the very first one *) + let dump_file file = + let ic = open_in (Filename.concat dir file) in + let@ () = Fun.protect ~finally:(fun () -> close_in_noerr ic) in + let continue = ref true in + while !continue do + let n = input ic buf 0 (Bytes.length buf) in + if n = 0 then continue := false; + + let start = ref 0 in + while !start < n do + match Bytes.index_from_opt buf !start '\n' with + | None -> + emit_chunk buf !start (n - !start); + start := n + | Some i -> + (* found a new line, emit chunk before it if non empty *) + if i > !start then emit_chunk buf !start (i - !start); + afternewline := true; + start := i + 1 + done + done + in + Array.iter dump_file files + +(** Remove the temporary directory and the files it contains. Assumes + it doesn't contain sub-directories *) +let cleanup_dir_and_content (dir : string) : unit = + try + let entries = Sys.readdir dir in + Array.iter (fun f -> Sys.remove (Filename.concat dir f)) entries; + Sys.rmdir dir + with e -> + Printf.eprintf + "ocaml_trace: error while cleaning temporary directory: %s\n%!" + (Printexc.to_string e) + +type child = { write_jsonl: string } + +type parent = { + dir: string; + write_jsonl: string; + final_file: string; +} + +type role = + [ `Nop + | `Child of child + | `Parent of parent + ] + +(** Find what this particular process has to do wrt tracing *) +let find_role ~out () : role = + let file_of_dir dir = + let pid = Unix.getpid () in + Filename.concat dir (spf "%d.jsonl" pid) + in + let mk_tempdir () = + let dir = Filename.temp_dir "ocaml_trace_tef_multiproc" ".tmp" in + (* communicate the directory to child processes *) + Unix.putenv env_var dir; + dir + in + + let write_to_file_as_parent path = + let dir = mk_tempdir () in + `Parent { final_file = path; dir; write_jsonl = file_of_dir dir } + in + + match Sys.getenv_opt env_var with + | Some dir -> `Child { write_jsonl = file_of_dir dir } + | None -> + (match out with + | `File path -> write_to_file_as_parent path + | `Env -> + (match Sys.getenv_opt "TRACE" with + | Some ("1" | "true") -> write_to_file_as_parent "trace.json" + | Some path -> write_to_file_as_parent path + | None -> `Nop)) + +let collector ~out () : collector = + let role = find_role ~out () in + match role with + | `Nop -> assert false + | `Child { write_jsonl } -> + Trace_tef.Internal_.collector_jsonl ~finally:ignore + ~out:(`File_append write_jsonl) () + | `Parent { dir; write_jsonl; final_file } -> + (* what to do when the collector shuts down *) + let finally () = + aggregate_into ~dir ~final_file (); + cleanup_dir_and_content dir + in + Trace_tef.Internal_.collector_jsonl ~finally ~out:(`File_append write_jsonl) + () + +let setup ?(out = `Env) () = + let role = find_role ~out () in + match role with + | `Nop -> () + | `Child { write_jsonl } -> + Trace_core.setup_collector + @@ Trace_tef.Internal_.collector_jsonl ~finally:ignore + ~out:(`File_append write_jsonl) () + | `Parent { dir; write_jsonl; final_file } -> + (* what to do when the collector shuts down *) + let finally () = + aggregate_into ~dir ~final_file (); + cleanup_dir_and_content dir + in + + Trace_core.setup_collector + @@ Trace_tef.Internal_.collector_jsonl ~finally + ~out:(`File_append write_jsonl) () + +let with_setup ?out () f = + setup ?out (); + Fun.protect ~finally:Trace_core.shutdown f + +module Internal_ = struct + include Trace_tef.Internal_ +end diff --git a/src/tef-multiproc/trace_tef_multiproc.mli b/src/tef-multiproc/trace_tef_multiproc.mli new file mode 100644 index 0000000..dc67648 --- /dev/null +++ b/src/tef-multiproc/trace_tef_multiproc.mli @@ -0,0 +1,40 @@ +val collector : out:[ `File of string ] -> unit -> Trace_core.collector +(** Make a collector that writes into the given output. + See {!setup} for more details. *) + +type output = [ `File of string ] +(** Output for tracing. + - [`File "foo"] will enable tracing and print events into file + named "foo". The file is only written at exit. +*) + +val setup : ?out:[ output | `Env ] -> unit -> unit +(** [setup ()] installs the collector depending on [out]. + + @param out can take different values: + - regular {!output} value to specify where events go + - [`Env] will enable tracing if the environment + variable "TRACE" is set. + + - If it's set to "1", then the file is "trace.json". + - If it's set to "stdout", then logging happens on stdout (since 0.2) + - If it's set to "stderr", then logging happens on stdout (since 0.2) + - Otherwise, if it's set to a non empty string, the value is taken + to be the file path into which to write. +*) + +val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a +(** [with_setup () f] (optionally) sets a collector up, calls [f()], + and makes sure to shutdown before exiting. +*) + +(**/**) + +module Internal_ : sig + val mock_all_ : unit -> unit + (** use fake, deterministic timestamps, TID, PID *) + + val on_tracing_error : (string -> unit) ref +end + +(**/**)