From bebd037803625f61b4f43a1a6002920ecb9be321 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 9 Sep 2024 14:14:25 -0400 Subject: [PATCH] wip: `trace-subscriber` package --- dune-project | 88 ++++++++--- src/subscriber/callbacks.ml | 121 +++++++++++++++ src/subscriber/dune | 6 + src/subscriber/subscriber.ml | 49 ++++++ src/subscriber/trace_subscriber.ml | 157 +++++++++++++++++++ src/subscriber/trace_subscriber.mli | 29 ++++ src/tef/dune | 17 ++- src/tef/event.ml | 1 + src/tef/trace_tef.ml | 226 +++++++++++----------------- src/tef/trace_tef.mli | 15 +- trace-subscriber.opam | 34 +++++ trace-tef.opam | 1 + 12 files changed, 566 insertions(+), 178 deletions(-) create mode 100644 src/subscriber/callbacks.ml create mode 100644 src/subscriber/dune create mode 100644 src/subscriber/subscriber.ml create mode 100644 src/subscriber/trace_subscriber.ml create mode 100644 src/subscriber/trace_subscriber.mli create mode 100644 trace-subscriber.opam diff --git a/dune-project b/dune-project index e55a503..858ed66 100644 --- a/dune-project +++ b/dune-project @@ -1,26 +1,34 @@ (lang dune 2.9) (name trace) + (generate_opam_files true) + (version 0.7) (source (github c-cube/ocaml-trace)) + (authors "Simon Cruanes") + (maintainers "Simon Cruanes") + (license MIT) ;(documentation https://url/to/documentation) (package (name trace) - (synopsis "A stub for tracing/observability, agnostic in how data is collected") + (synopsis + "A stub for tracing/observability, agnostic in how data is collected") (depends - (ocaml (>= 4.08)) - dune) + (ocaml + (>= 4.08)) + dune) (depopts - hmap - (mtime (>= 2.0))) + hmap + (mtime + (>= 2.0))) (tags (trace tracing observability profiling))) @@ -28,37 +36,67 @@ (name ppx_trace) (synopsis "A ppx-based preprocessor for trace") (depends - (ocaml (>= 4.12)) ; we use __FUNCTION__ - (ppxlib (>= 0.28)) - (trace (= :version)) - (trace-tef (and (= :version) :with-test)) - dune) + (ocaml + (>= 4.12)) ; we use __FUNCTION__ + (ppxlib + (>= 0.28)) + (trace + (= :version)) + (trace-tef + (and + (= :version) + :with-test)) + dune) (tags (trace ppx))) (package - (name trace-tef) - (synopsis "A simple backend for trace, emitting Catapult/TEF JSON into a file") + (name trace-subscriber) + (synopsis "Generic subscriber system for `trace`") (depends - (ocaml (>= 4.08)) - (trace (= :version)) - (mtime (>= 2.0)) - base-unix - dune) + (ocaml + (>= 4.08)) + (trace + (= :version)) + (mtime + (>= 2.0))) + (tags + (trace subscriber))) + +(package + (name trace-tef) + (synopsis + "A simple backend for trace, emitting Catapult/TEF JSON into a file") + (depends + (ocaml + (>= 4.08)) + (trace + (= :version)) + (trace-subscriber + (= :version)) + (mtime + (>= 2.0)) + base-unix + dune) (tags (trace tracing catapult TEF chrome-format))) (package (name trace-fuchsia) - (synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file") + (synopsis + "A high-performance backend for trace, emitting a Fuchsia trace into a file") (depends - (ocaml (>= 4.08)) - (trace (= :version)) - (mtime (>= 2.0)) - (thread-local-storage (>= 0.2)) - base-bigarray - base-unix - dune) + (ocaml + (>= 4.08)) + (trace + (= :version)) + (mtime + (>= 2.0)) + (thread-local-storage + (>= 0.2)) + base-bigarray + base-unix + dune) (tags (trace tracing fuchsia))) diff --git a/src/subscriber/callbacks.ml b/src/subscriber/callbacks.ml new file mode 100644 index 0000000..db7df94 --- /dev/null +++ b/src/subscriber/callbacks.ml @@ -0,0 +1,121 @@ +open Trace_core + +module type S = sig + type st + + val on_init : st -> time_ns:float -> unit + (** Called when the subscriber is initialized in a collector *) + + val on_shutdown : st -> time_ns:float -> unit + (** Called when the collector is shutdown *) + + val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit + (** Current thread is being named *) + + val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit + (** Current process is being named *) + + val on_enter_span : + st -> + __FUNCTION__:string option -> + __FILE__:string -> + __LINE__:int -> + time_ns:float -> + tid:int -> + data:(string * user_data) list -> + name:string -> + span -> + unit + (** Enter a regular (sync) span *) + + val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit + (** Exit a span. This and [on_enter_span] must follow strict stack discipline *) + + val on_add_data : st -> data:(string * user_data) list -> span -> unit + (** Add data to a regular span (which must be active) *) + + val on_message : + st -> + time_ns:float -> + tid:int -> + span:span option -> + data:(string * user_data) list -> + string -> + unit + (** Emit a log message *) + + val on_counter : + st -> + time_ns:float -> + tid:int -> + data:(string * user_data) list -> + name:string -> + float -> + unit + (** Emit the current value of a counter *) + + val on_enter_manual_span : + st -> + __FUNCTION__:string option -> + __FILE__:string -> + __LINE__:int -> + time_ns:float -> + tid:int -> + parent:explicit_span option -> + data:(string * user_data) list -> + name:string -> + flavor:[ `Sync | `Async ] option -> + trace_id:int -> + span -> + unit + (** Enter a manual (possibly async) span *) + + val on_exit_manual_span : + st -> + time_ns:float -> + tid:int -> + name:string -> + data:(string * user_data) list -> + flavor:[ `Sync | `Async ] option -> + trace_id:int -> + span -> + unit + (** Exit a manual span *) +end + +type 'st t = (module S with type st = 'st) + +(** Callbacks for a subscriber *) + +(** Dummy callbacks *) +module Dummy = struct + let on_init _ ~time_ns:_ = () + let on_shutdown _ ~time_ns:_ = () + let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = () + let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = () + let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = () + let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = () + + let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ ~tid:_ + ~data:_ ~name:_ _sp = + () + + let on_exit_span _ ~time_ns:_ ~tid:_ _ = () + let on_add_data _ ~data:_ _sp = () + + let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ + ~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp = + () + + let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_ + ~trace_id:_ _ = + () +end + +let dummy (type st) () : st t = + let module M = struct + type nonrec st = st + + include Dummy + end in + (module M) diff --git a/src/subscriber/dune b/src/subscriber/dune new file mode 100644 index 0000000..a76b0fd --- /dev/null +++ b/src/subscriber/dune @@ -0,0 +1,6 @@ + +(library + (name trace_subscriber) + (public_name trace-subscriber) + (libraries (re_export trace.core) mtime mtime.clock.os threads)) + diff --git a/src/subscriber/subscriber.ml b/src/subscriber/subscriber.ml new file mode 100644 index 0000000..ef2dd77 --- /dev/null +++ b/src/subscriber/subscriber.ml @@ -0,0 +1,49 @@ +(** A trace subscriber *) +type t = + | Sub : { + st: 'st; + callbacks: 'st Callbacks.t; + } + -> t + +let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () } + +(* TODO: + let multiplex (l : t list) : t = + match l with + | [] -> dummy + | [ s ] -> s + | _ -> + let module M = struct + type st = t list + + let on_init l ~time_ns = + List.iter + (fun (Sub { st; callbacks = (module CB) }) -> CB.on_init st ~time_ns) + l + + let on_shutdown _ ~time_ns:_ = () + let on_tick _ = () + let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = () + let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = () + let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = () + let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = () + + let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ + ~tid:_ ~data:_ ~name:_ _sp = + () + + let on_exit_span _ ~time_ns:_ ~tid:_ _ = () + let on_add_data _ ~data:_ _sp = () + + let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ + ~time_ns:_ ~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp + = + () + + let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_ + ~trace_id:_ _ = + () + end in + Sub { st = l; callbacks = (module M) } +*) diff --git a/src/subscriber/trace_subscriber.ml b/src/subscriber/trace_subscriber.ml new file mode 100644 index 0000000..203ab8c --- /dev/null +++ b/src/subscriber/trace_subscriber.ml @@ -0,0 +1,157 @@ +open Trace_core +module Callbacks = Callbacks +module Subscriber = Subscriber + +type t = Subscriber.t + +module Private_ = struct + let get_now_ns_ = ref None + let get_tid_ = ref None + + (** Now, in nanoseconds *) + let[@inline] now_ns () : float = + match !get_now_ns_ with + | Some f -> f () + | None -> + let t = Mtime_clock.now () in + Int64.to_float (Mtime.to_uint64_ns t) + + let[@inline] tid_ () : int = + match !get_tid_ with + | Some f -> f () + | None -> Thread.id (Thread.self ()) +end + +open struct + module A = Trace_core.Internal_.Atomic_ + + type manual_span_info = { + name: string; + flavor: [ `Sync | `Async ] option; + mutable data: (string * user_data) list; + } + + (** Key used to carry some information between begin and end of + manual spans, by way of the meta map *) + let key_manual_info : manual_span_info Meta_map.key = Meta_map.Key.create () + + (** key used to carry a unique "id" for all spans in an async context *) + let key_async_trace_id : int Meta_map.key = Meta_map.Key.create () +end + +(** A collector that calls the callbacks of subscriber *) +let collector (Sub { st; callbacks = (module CB) } : Subscriber.t) : collector = + let open Private_ in + let module M = struct + let trace_id_gen_ = A.make 0 + + (** generator for span ids *) + let new_span_ : unit -> int = + let span_id_gen_ = A.make 0 in + fun [@inline] () -> A.fetch_and_add span_id_gen_ 1 + + let enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : span = + let span = Int64.of_int (new_span_ ()) in + let tid = tid_ () in + let time_ns = now_ns () in + CB.on_enter_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data + ~name span; + span + + let exit_span span : unit = + let time_ns = now_ns () in + let tid = tid_ () in + CB.on_exit_span st ~time_ns ~tid span + + let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name f = + let span = enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name in + try + let x = f span in + exit_span span; + x + with exn -> + let bt = Printexc.get_raw_backtrace () in + exit_span span; + Printexc.raise_with_backtrace exn bt + + let add_data_to_span span data = + if data <> [] then CB.on_add_data st ~data span + + let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__ + ~__FILE__ ~__LINE__ ~data name : explicit_span = + let span = Int64.of_int (new_span_ ()) in + let tid = tid_ () in + let time_ns = now_ns () in + + (* get the common trace id, or make a new one *) + let trace_id = + match parent with + | Some m -> Meta_map.find_exn key_async_trace_id m.meta + | None -> A.fetch_and_add trace_id_gen_ 1 + in + + CB.on_enter_manual_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~parent ~data + ~time_ns ~tid ~name ~flavor ~trace_id span; + let meta = + Meta_map.empty + |> Meta_map.add key_manual_info { name; flavor; data = [] } + |> Meta_map.add key_async_trace_id trace_id + in + { span; meta } + + let exit_manual_span (es : explicit_span) : unit = + let time_ns = now_ns () in + let tid = tid_ () in + let trace_id = + match Meta_map.find key_async_trace_id es.meta with + | None -> assert false + | Some id -> id + in + let minfo = + match Meta_map.find key_manual_info es.meta with + | None -> assert false + | Some m -> m + in + CB.on_exit_manual_span st ~tid ~time_ns ~data:minfo.data ~name:minfo.name + ~flavor:minfo.flavor ~trace_id es.span + + let add_data_to_manual_span (es : explicit_span) data = + if data <> [] then ( + match Meta_map.find key_manual_info es.meta with + | None -> assert false + | Some m -> m.data <- List.rev_append data m.data + ) + + let message ?span ~data msg : unit = + let time_ns = now_ns () in + let tid = tid_ () in + CB.on_message st ~time_ns ~tid ~span ~data msg + + let counter_float ~data name f : unit = + let time_ns = now_ns () in + let tid = tid_ () in + CB.on_counter st ~tid ~time_ns ~data ~name f + + let[@inline] counter_int ~data name i = + counter_float ~data name (float_of_int i) + + let name_process name : unit = + let tid = tid_ () in + let time_ns = now_ns () in + CB.on_name_process st ~time_ns ~tid ~name + + let name_thread name : unit = + let tid = tid_ () in + let time_ns = now_ns () in + CB.on_name_thread st ~time_ns ~tid ~name + + let shutdown () = + let time_ns = now_ns () in + CB.on_shutdown st ~time_ns + + let () = + (* init code *) + let time_ns = now_ns () in + CB.on_init st ~time_ns + end in + (module M) diff --git a/src/subscriber/trace_subscriber.mli b/src/subscriber/trace_subscriber.mli new file mode 100644 index 0000000..fcd4b73 --- /dev/null +++ b/src/subscriber/trace_subscriber.mli @@ -0,0 +1,29 @@ +(** Generic subscribers. + + This defines the notion of a {b subscriber}, + a set of callbacks for every trace event. + It also defines a collector that needs to be installed + for the subscriber(s) to be called. + + @since NEXT_RELEASE +*) + +module Callbacks = Callbacks +module Subscriber = Subscriber + +type t = Subscriber.t + +val collector : t -> Trace_core.collector + +(**/**) + +(**/*) +module Private_ : sig + val get_now_ns_ : (unit -> float) option ref + (** The callback used to get the current timestamp *) + + val get_tid_ : (unit -> int) option ref + (** The callback used to get the current thread's id *) + + val now_ns : unit -> float +end diff --git a/src/tef/dune b/src/tef/dune index 156eec1..b1bac7c 100644 --- a/src/tef/dune +++ b/src/tef/dune @@ -1,6 +1,13 @@ - (library - (name trace_tef) - (public_name trace-tef) - (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") - (libraries trace.core trace.private.util mtime mtime.clock.os unix threads)) + (name trace_tef) + (public_name trace-tef) + (synopsis + "Simple and lightweight tracing using TEF/Catapult format, in-process") + (libraries + trace.core + trace.private.util + trace-subscriber + mtime + mtime.clock.os + unix + threads)) diff --git a/src/tef/event.ml b/src/tef/event.ml index 17d8fff..d9d304d 100644 --- a/src/tef/event.ml +++ b/src/tef/event.ml @@ -1,5 +1,6 @@ open Trace_core +(** An event, specialized for TEF *) type t = | E_tick | E_message of { diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index 7fea2ae..2bfc00d 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -1,31 +1,24 @@ open Trace_core open Trace_private_util open Event +module Sub = Trace_subscriber module A = Trace_core.Internal_.Atomic_ -let ( let@ ) = ( @@ ) +let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) module Mock_ = struct let enabled = ref false let now = ref 0 - let[@inline never] now_us () : float = + (* used to mock timing *) + let get_now_ns () : float = let x = !now in incr now; - float_of_int x + float_of_int x *. 1000. + + let get_tid_ () : int = 3 end -(** Now, in microseconds *) -let[@inline] now_us () : float = - if !Mock_.enabled then - Mock_.now_us () - else ( - let t = Mtime_clock.now () in - Int64.to_float (Mtime.to_uint64_ns t) /. 1e3 - ) - -let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) - module Span_tbl = Hashtbl.Make (struct include Int64 @@ -39,15 +32,6 @@ type span_info = { mutable data: (string * user_data) list; } -(** key used to carry a unique "id" for all spans in an async context *) -let key_async_id : int Meta_map.key = Meta_map.Key.create () - -let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.key = - Meta_map.Key.create () - -let key_data : (string * user_data) list ref Meta_map.key = - Meta_map.Key.create () - (** Writer: knows how to write entries to a file in TEF format *) module Writer = struct type t = { @@ -280,7 +264,8 @@ let bg_thread ~mode ~out (events : Event.t B_queue.t) : unit = (* write a message about us closing *) Writer.emit_instant_event ~name:"tef-worker.exit" ~tid:(Thread.id @@ Thread.self ()) - ~ts:(now_us ()) ~args:[] writer; + ~ts:(Sub.Private_.now_ns () *. 1e-3) + ~args:[] writer; (* warn if app didn't close all spans *) if Span_tbl.length spans > 0 then @@ -304,138 +289,90 @@ type output = | `File of string ] -let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out () - : collector = - let module M = struct - let active = A.make true +module Internal_st = struct + type t = { + active: bool A.t; + events: Event.t B_queue.t; + t_write: Thread.t; + } +end - (** generator for span ids *) - let span_id_gen_ = A.make 0 +let subscriber_ ~finally ~out ~(mode : [ `Single | `Jsonl ]) () : Sub.t = + let module M : Sub.Callbacks.S with type st = Internal_st.t = struct + type st = Internal_st.t - (* queue of messages to write *) - let events : Event.t B_queue.t = B_queue.create () + let on_init _ ~time_ns:_ = () - (** writer thread. It receives events and writes them to [oc]. *) - let t_write : Thread.t = - Thread.create - (fun () -> - let@ () = Fun.protect ~finally in - bg_thread ~mode ~out events) - () - - (** ticker thread, regularly sends a message to the writer thread. - no need to join it. *) - let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () - - let shutdown () = - if A.exchange active false then ( - B_queue.close events; + let on_shutdown (self : st) ~time_ns:_ = + if A.exchange self.active false then ( + B_queue.close self.events; (* wait for writer thread to be done. The writer thread will exit after processing remaining events because the queue is now closed *) - Thread.join t_write + Thread.join self.t_write ) - let get_tid_ () : int = - if !Mock_.enabled then - 3 - else - Thread.id (Thread.self ()) + let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = + B_queue.push self.events @@ E_name_process { name } - let[@inline] enter_span_ ~fun_name ~data name : span = - let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in - let tid = get_tid_ () in - let time_us = now_us () in - B_queue.push events - (E_define_span { tid; name; time_us; id = span; fun_name; data }); - span + let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = + B_queue.push self.events @@ E_name_thread { tid; name } - let enter_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : - span = - enter_span_ ~fun_name ~data name + let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = + let time_us = time_ns *. 1e-3 in + B_queue.push self.events + @@ E_define_span { tid; name; time_us; id = span; fun_name; data } - let exit_span span : unit = - let time_us = now_us () in - B_queue.push events (E_exit_span { id = span; time_us }) + let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = + let time_us = time_ns *. 1e-3 in + B_queue.push self.events @@ E_exit_span { id = span; time_us } - (* re-raise exception with its backtrace *) - external reraise : exn -> 'a = "%reraise" + let on_add_data (self : st) ~data span = + if data <> [] then + B_queue.push self.events @@ E_add_data { id = span; data } - let with_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name f = - let span = enter_span_ ~fun_name ~data name in - try - let x = f span in - exit_span span; - x - with exn -> - exit_span span; - reraise exn + let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = + let time_us = time_ns *. 1e-3 in + B_queue.push self.events @@ E_message { tid; time_us; msg; data } - let add_data_to_span span data = - if data <> [] then B_queue.push events (E_add_data { id = span; data }) + let on_counter (self : st) ~time_ns ~tid ~data:_ ~name f : unit = + let time_us = time_ns *. 1e-3 in + B_queue.push self.events @@ E_counter { name; n = f; time_us; tid } - let enter_manual_span ~(parent : explicit_span option) ~flavor - ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : - explicit_span = - (* get the id, or make a new one *) - let id = - match parent with - | Some m -> Meta_map.find_exn key_async_id m.meta - | None -> A.fetch_and_add span_id_gen_ 1 - in - let time_us = now_us () in - B_queue.push events - (E_enter_manual_span - { id; time_us; tid = get_tid_ (); data; name; fun_name; flavor }); - { - span = 0L; - meta = - Meta_map.( - empty |> add key_async_id id |> add key_async_data (name, flavor)); - } + let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span + : unit = + let time_us = time_ns *. 1e-3 in + B_queue.push self.events + @@ E_enter_manual_span + { id = trace_id; time_us; tid; data; name; fun_name; flavor } - let exit_manual_span (es : explicit_span) : unit = - let id = Meta_map.find_exn key_async_id es.meta in - let name, flavor = Meta_map.find_exn key_async_data es.meta in - let data = - match Meta_map.find key_data es.meta with - | None -> [] - | Some r -> !r - in - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events - (E_exit_manual_span { tid; id; name; time_us; data; flavor }) - - let add_data_to_manual_span (es : explicit_span) data = - if data <> [] then ( - let data_ref, add = - match Meta_map.find key_data es.meta with - | Some r -> r, false - | None -> ref [], true - in - let new_data = List.rev_append data !data_ref in - data_ref := new_data; - if add then es.meta <- Meta_map.add key_data data_ref es.meta - ) - - let message ?span:_ ~data msg : unit = - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events (E_message { tid; time_us; msg; data }) - - let counter_float ~data:_ name f = - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events (E_counter { name; n = f; time_us; tid }) - - let counter_int ~data name i = counter_float ~data name (float_of_int i) - let name_process name : unit = B_queue.push events (E_name_process { name }) - - let name_thread name : unit = - let tid = get_tid_ () in - B_queue.push events (E_name_thread { tid; name }) + let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor + ~trace_id (_ : span) : unit = + let time_us = time_ns *. 1e-3 in + B_queue.push self.events + @@ E_exit_manual_span { tid; id = trace_id; name; time_us; data; flavor } end in - (module M) + let events = B_queue.create () in + let t_write = + Thread.create + (fun () -> Fun.protect ~finally @@ fun () -> bg_thread ~mode ~out events) + () + in + + (* ticker thread, regularly sends a message to the writer thread. + no need to join it. *) + let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () in + let st : Internal_st.t = { active = A.make true; events; t_write } in + Sub.Subscriber.Sub { st; callbacks = (module M) } + +let collector_ ~(finally : unit -> unit) ~(mode : [ `Single | `Jsonl ]) ~out () + : collector = + let sub = subscriber_ ~finally ~mode ~out () in + Sub.collector sub + +let[@inline] subscriber ~out () : Sub.t = + subscriber_ ~finally:ignore ~mode:`Single ~out () let[@inline] collector ~out () : collector = collector_ ~finally:ignore ~mode:`Single ~out () @@ -462,8 +399,13 @@ let with_setup ?out () f = setup ?out (); Fun.protect ~finally:Trace_core.shutdown f -module Internal_ = struct - let mock_all_ () = Mock_.enabled := true +module Private_ = struct + let mock_all_ () = + Mock_.enabled := true; + Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns; + Sub.Private_.get_tid_ := Some Mock_.get_tid_; + () + let on_tracing_error = on_tracing_error let collector_jsonl ~finally ~out () : collector = diff --git a/src/tef/trace_tef.mli b/src/tef/trace_tef.mli index 6c73f13..a1009e1 100644 --- a/src/tef/trace_tef.mli +++ b/src/tef/trace_tef.mli @@ -1,8 +1,3 @@ -val collector : - out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector -(** Make a collector that writes into the given output. - See {!setup} for more details. *) - type output = [ `Stdout | `Stderr @@ -16,6 +11,14 @@ type output = named "foo" *) +val subscriber : out:[< output ] -> unit -> Trace_subscriber.t +(** A subscriber emitting TEF traces into [out]. + @since NEXT_RELEASE *) + +val collector : out:[< output ] -> unit -> Trace_core.collector +(** Make a collector that writes into the given output. + See {!setup} for more details. *) + val setup : ?out:[ output | `Env ] -> unit -> unit (** [setup ()] installs the collector depending on [out]. @@ -39,7 +42,7 @@ val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a (**/**) -module Internal_ : sig +module Private_ : sig val mock_all_ : unit -> unit (** use fake, deterministic timestamps, TID, PID *) diff --git a/trace-subscriber.opam b/trace-subscriber.opam new file mode 100644 index 0000000..254d443 --- /dev/null +++ b/trace-subscriber.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.7" +synopsis: "Generic subscriber system for `trace`" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["trace" "subscriber"] +homepage: "https://github.com/c-cube/ocaml-trace" +bug-reports: "https://github.com/c-cube/ocaml-trace/issues" +depends: [ + "dune" {>= "2.9"} + "ocaml" {>= "4.08"} + "trace" {= version} + "mtime" {>= "2.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/c-cube/ocaml-trace.git" diff --git a/trace-tef.opam b/trace-tef.opam index 2c255e7..1ea9f2b 100644 --- a/trace-tef.opam +++ b/trace-tef.opam @@ -12,6 +12,7 @@ bug-reports: "https://github.com/c-cube/ocaml-trace/issues" depends: [ "ocaml" {>= "4.08"} "trace" {= version} + "trace-subscriber" {= version} "mtime" {>= "2.0"} "base-unix" "dune" {>= "2.9"}