diff --git a/src/fuchsia/bg_thread.ml b/src/fuchsia/bg_thread.ml deleted file mode 100644 index 631a1db..0000000 --- a/src/fuchsia/bg_thread.ml +++ /dev/null @@ -1,62 +0,0 @@ -open Common_ - -type out = - [ `Stdout - | `Stderr - | `File of string - ] - -type event = - | E_write_buf of Buf.t - | E_tick - -type state = { - buf_pool: Buf_pool.t; - oc: out_channel; - events: event B_queue.t; -} - -let with_out_ (out : out) f = - let oc, must_close = - match out with - | `Stdout -> stdout, false - | `Stderr -> stderr, false - | `File path -> open_out path, true - in - - if must_close then ( - let finally () = close_out_noerr oc in - Fun.protect ~finally (fun () -> f oc) - ) else - f oc - -let handle_ev (self : state) (ev : event) : unit = - match ev with - | E_tick -> flush self.oc - | E_write_buf buf -> - output self.oc buf.buf 0 buf.offset; - Buf_pool.recycle self.buf_pool buf - -let bg_loop (self : state) : unit = - let continue = ref true in - - while !continue do - match B_queue.pop_all self.events with - | exception B_queue.Closed -> continue := false - | evs -> List.iter (handle_ev self) evs - done - -let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit = - let@ oc = with_out_ out in - let st = { oc; buf_pool; events } in - bg_loop st - -(** Thread that simply regularly "ticks", sending events to the background - thread so it has a chance to write to the file, and call [f()] *) -let tick_thread events : unit = - try - while true do - Thread.delay 0.5; - B_queue.push events E_tick - done - with B_queue.Closed -> () diff --git a/src/fuchsia/write/buf.ml b/src/fuchsia/buf.ml similarity index 93% rename from src/fuchsia/write/buf.ml rename to src/fuchsia/buf.ml index b2aae8a..a1d46c5 100644 --- a/src/fuchsia/write/buf.ml +++ b/src/fuchsia/buf.ml @@ -8,12 +8,14 @@ type t = { let empty : t = { buf = Bytes.empty; offset = 0 } let create (n : int) : t = + (* multiple of 8-bytes size *) let buf = Bytes.create (round_to_word n) in { buf; offset = 0 } let[@inline] clear self = self.offset <- 0 let[@inline] available self = Bytes.length self.buf - self.offset let[@inline] size self = self.offset +let[@inline] is_empty self = self.offset = 0 (* see below: we assume little endian *) let () = assert (not Sys.big_endian) diff --git a/src/fuchsia/buf_chain.ml b/src/fuchsia/buf_chain.ml new file mode 100644 index 0000000..77616e5 --- /dev/null +++ b/src/fuchsia/buf_chain.ml @@ -0,0 +1,140 @@ +(** A set of buffers in use, and a set of ready buffers *) + +open Common_ + +(** Buffers in use *) +type buffers = + | B_one of { mutable buf: Buf.t } + | B_many of Buf.t Lock.t array + (** mask(thread id) -> buffer. This reduces contention *) + +type t = { + bufs: buffers; + has_ready: bool A.t; + ready: Buf.t Queue.t Lock.t; + (** Buffers that are full (enough) and must be written *) + buf_pool: Buf_pool.t; +} +(** A set of buffers, some of which are ready to be written *) + +open struct + let shard_log = 4 + let shard = 1 lsl shard_log + let shard_mask = shard - 1 +end + +let create ~(sharded : bool) ~(buf_pool : Buf_pool.t) () : t = + let bufs = + if sharded then ( + let bufs = + Array.init shard (fun _ -> Lock.create @@ Buf_pool.alloc buf_pool) + in + B_many bufs + ) else + B_one { buf = Buf_pool.alloc buf_pool } + in + { + bufs; + buf_pool; + has_ready = A.make false; + ready = Lock.create @@ Queue.create (); + } + +open struct + let put_in_ready (self : t) buf : unit = + if Buf.size buf > 0 then ( + let@ q = Lock.with_ self.ready in + Atomic.set self.has_ready true; + Queue.push buf q + ) + + let assert_available buf ~available = + if Buf.available buf < available then ( + let msg = + Printf.sprintf + "fuchsia: buffer is too small (available: %d bytes, needed: %d bytes)" + (Buf.available buf) available + in + failwith msg + ) +end + +(** Move all non-empty buffers to [ready] *) +let ready_all_non_empty (self : t) : unit = + let@ q = Lock.with_ self.ready in + match self.bufs with + | B_one r -> + if not (Buf.is_empty r.buf) then ( + Queue.push r.buf q; + A.set self.has_ready true; + r.buf <- Buf.empty + ) + | B_many bufs -> + Array.iter + (fun buf -> + Lock.update buf (fun buf -> + if Buf.size buf > 0 then ( + Queue.push buf q; + A.set self.has_ready true; + Buf.empty + ) else + buf)) + bufs + +let[@inline] has_ready self : bool = A.get self.has_ready + +(** Get access to ready buffers, then clean them up automatically *) +let pop_ready (self : t) ~(f : Buf.t Queue.t -> 'a) : 'a = + let@ q = Lock.with_ self.ready in + let res = f q in + + (* clear queue *) + Queue.iter (Buf_pool.recycle self.buf_pool) q; + Queue.clear q; + A.set self.has_ready false; + res + +(** Maximum size available, in words, for a single message *) +let[@inline] max_size_word (_self : t) : int = fuchsia_buf_size lsr 3 + +(** Obtain a buffer with at least [available_word] 64-bit words *) +let with_buf (self : t) ~(available_word : int) (f : Buf.t -> 'a) : 'a = + let available = available_word lsl 3 in + match self.bufs with + | B_one r -> + if Buf.available r.buf < available_word then ( + put_in_ready self r.buf; + r.buf <- Buf_pool.alloc self.buf_pool + ); + assert_available r.buf ~available; + f r.buf + | B_many bufs -> + let tid = Thread.(id (self ())) in + let masked_tid = tid land shard_mask in + let buf_lock = bufs.(masked_tid) in + let@ buf = Lock.with_ buf_lock in + let buf = + if Buf.available buf < available then ( + put_in_ready self buf; + let new_buf = Buf_pool.alloc self.buf_pool in + assert_available new_buf ~available; + Lock.set_while_locked buf_lock new_buf; + new_buf + ) else + buf + in + f buf + +(** Dispose of resources (here, recycle buffers) *) +let dispose (self : t) : unit = + match self.bufs with + | B_one r -> + Buf_pool.recycle self.buf_pool r.buf; + r.buf <- Buf.empty + | B_many bufs -> + Array.iter + (fun buf_lock -> + let@ buf = Lock.with_ buf_lock in + Buf_pool.recycle self.buf_pool buf; + Lock.set_while_locked buf_lock Buf.empty) + bufs diff --git a/src/fuchsia/buf_pool.ml b/src/fuchsia/buf_pool.ml new file mode 100644 index 0000000..6ea615d --- /dev/null +++ b/src/fuchsia/buf_pool.ml @@ -0,0 +1,23 @@ +open Common_ +open Trace_private_util + +type t = Buf.t Rpool.t + +let create ?(max_size = 64) () : t = + Rpool.create ~max_size ~clear:Buf.clear + ~create:(fun () -> Buf.create fuchsia_buf_size) + () + +let alloc = Rpool.alloc +let[@inline] recycle self buf = if buf != Buf.empty then Rpool.recycle self buf + +let with_ (self : t) f = + let x = alloc self in + try + let res = f x in + recycle self x; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + recycle self x; + Printexc.raise_with_backtrace e bt diff --git a/src/fuchsia/common_.ml b/src/fuchsia/common_.ml index 14b78bf..3a335a3 100644 --- a/src/fuchsia/common_.ml +++ b/src/fuchsia/common_.ml @@ -1,12 +1,22 @@ module A = Trace_core.Internal_.Atomic_ -module FWrite = Trace_fuchsia_write -module B_queue = Trace_private_util.B_queue -module Buf = FWrite.Buf -module Buf_pool = FWrite.Buf_pool -module Output = FWrite.Output +module Sub = Trace_subscriber let on_tracing_error = ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s) let ( let@ ) = ( @@ ) let spf = Printf.sprintf + +let with_lock lock f = + Mutex.lock lock; + try + let res = f () in + Mutex.unlock lock; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + Mutex.unlock lock; + Printexc.raise_with_backtrace e bt + +(** Buffer size we use. *) +let fuchsia_buf_size = 1 lsl 16 diff --git a/src/fuchsia/dune b/src/fuchsia/dune index 8e4f0f4..4ef6048 100644 --- a/src/fuchsia/dune +++ b/src/fuchsia/dune @@ -6,8 +6,8 @@ (libraries trace.core trace.private.util + trace.subscriber thread-local-storage - (re_export trace-fuchsia.write) bigarray mtime mtime.clock.os diff --git a/src/fuchsia/exporter.ml b/src/fuchsia/exporter.ml new file mode 100644 index 0000000..dc3b4e3 --- /dev/null +++ b/src/fuchsia/exporter.ml @@ -0,0 +1,61 @@ +(** An exporter, takes buffers with fuchsia events, and writes them somewhere *) + +open Common_ + +type t = { + write_bufs: Buf.t Queue.t -> unit; + (** Takes buffers and writes them somewhere. The buffers are only valid + during this call and must not be stored. The queue must not be + modified. *) + flush: unit -> unit; (** Force write *) + close: unit -> unit; (** Close underlying resources *) +} +(** An exporter, takes buffers and writes them somewhere. This should be + thread-safe if used in a threaded environment. *) + +open struct + let with_lock lock f = + Mutex.lock lock; + try + let res = f () in + Mutex.unlock lock; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + Mutex.unlock lock; + Printexc.raise_with_backtrace e bt +end + +(** Export to the channel + @param close_channel if true, closing the exporter will close the channel *) +let of_out_channel ~close_channel oc : t = + let lock = Mutex.create () in + let closed = ref false in + let flush () = + let@ () = with_lock lock in + flush oc + in + let close () = + let@ () = with_lock lock in + if not !closed then ( + closed := true; + if close_channel then close_out_noerr oc + ) + in + let write_bufs bufs = + if not (Queue.is_empty bufs) then + let@ () = with_lock lock in + Queue.iter (fun (buf : Buf.t) -> output oc buf.buf 0 buf.offset) bufs + in + { flush; close; write_bufs } + +let of_buffer (buffer : Buffer.t) : t = + let buffer = Lock.create buffer in + let write_bufs bufs = + if not (Queue.is_empty bufs) then + let@ buffer = Lock.with_ buffer in + Queue.iter + (fun (buf : Buf.t) -> Buffer.add_subbytes buffer buf.buf 0 buf.offset) + bufs + in + { flush = ignore; close = ignore; write_bufs } diff --git a/src/fuchsia/fcollector.ml b/src/fuchsia/fcollector.ml deleted file mode 100644 index 35c7925..0000000 --- a/src/fuchsia/fcollector.ml +++ /dev/null @@ -1,430 +0,0 @@ -open Trace_core -open Common_ -module TLS = Thread_local_storage -module Int_map = Map.Make (Int) - -let pid = Unix.getpid () - -module Mock_ = struct - let enabled = ref false - let now = ref 0 - - (* used to mock timing *) - let get_now_ns () : float = - let x = !now in - incr now; - float_of_int x *. 1000. - - let get_tid_ () : int = 3 -end - -(** Thread-local stack of span info *) -module Span_info_stack : sig - type t - - val create : unit -> t - - val push : - t -> - span -> - name:string -> - start_time_ns:int64 -> - data:(string * user_data) list -> - unit - - val pop : t -> int64 * string * int64 * (string * user_data) list - val find_ : t -> span -> int option - val add_data : t -> int -> (string * user_data) list -> unit -end = struct - module BA = Bigarray - module BA1 = Bigarray.Array1 - - type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t - - type t = { - mutable len: int; - mutable span: int64arr; - mutable start_time_ns: int64arr; - mutable name: string array; - mutable data: (string * user_data) list array; - } - - let init_size_ = 1 - - let create () : t = - { - len = 0; - span = BA1.create BA.Int64 BA.C_layout init_size_; - start_time_ns = BA1.create BA.Int64 BA.C_layout init_size_; - name = Array.make init_size_ ""; - data = Array.make init_size_ []; - } - - let[@inline] cap self = Array.length self.name - - let grow_ (self : t) : unit = - let new_cap = 2 * cap self in - let new_span = BA1.create BA.Int64 BA.C_layout new_cap in - BA1.blit self.span (BA1.sub new_span 0 self.len); - let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in - BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len); - let new_name = Array.make new_cap "" in - Array.blit self.name 0 new_name 0 self.len; - let new_data = Array.make new_cap [] in - Array.blit self.data 0 new_data 0 self.len; - self.span <- new_span; - self.start_time_ns <- new_startime_ns; - self.name <- new_name; - self.data <- new_data - - let push (self : t) (span : int64) ~name ~start_time_ns ~data = - if cap self = self.len then grow_ self; - BA1.set self.span self.len span; - BA1.set self.start_time_ns self.len start_time_ns; - Array.set self.name self.len name; - Array.set self.data self.len data; - self.len <- self.len + 1 - - let pop (self : t) = - assert (self.len > 0); - self.len <- self.len - 1; - - let span = BA1.get self.span self.len in - let name = self.name.(self.len) in - let start_time_ns = BA1.get self.start_time_ns self.len in - let data = self.data.(self.len) in - - (* avoid holding onto old values *) - Array.set self.name self.len ""; - Array.set self.data self.len []; - - span, name, start_time_ns, data - - let[@inline] add_data self i d : unit = - assert (i < self.len); - self.data.(i) <- List.rev_append d self.data.(i) - - exception Found of int - - let[@inline] find_ (self : t) span : _ option = - try - for i = self.len - 1 downto 0 do - if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i) - done; - - None - with Found i -> Some i -end - -type async_span_info = { - flavor: [ `Sync | `Async ] option; - name: string; - mutable data: (string * user_data) list; -} - -let key_async_data : async_span_info Meta_map.key = Meta_map.Key.create () - -open struct - let state_id_ = A.make 0 - - (* re-raise exception with its backtrace *) - external reraise : exn -> 'a = "%reraise" -end - -type per_thread_state = { - tid: int; - state_id: int; (** ID of the current collector state *) - local_span_id_gen: int A.t; (** Used for thread-local spans *) - mutable thread_ref: FWrite.Thread_ref.t; - mutable out: Output.t option; - spans: Span_info_stack.t; (** In-flight spans *) -} - -type state = { - active: bool A.t; - events: Bg_thread.event B_queue.t; - span_id_gen: int A.t; (** Used for async spans *) - bg_thread: Thread.t; - buf_pool: Buf_pool.t; - next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) - per_thread: per_thread_state Int_map.t A.t array; - (** the state keeps tabs on thread-local state, so it can flush writers at - the end. This is a tid-sharded array of maps. *) -} - -let[@inline] mk_trace_id (self : state) : trace_id = - let n = A.fetch_and_add self.span_id_gen 1 in - let b = Bytes.create 8 in - Bytes.set_int64_le b 0 (Int64.of_int n); - Bytes.unsafe_to_string b - -let key_thread_local_st : per_thread_state TLS.t = TLS.create () - -let[@inline never] mk_thread_local_st () = - let tid = Thread.id @@ Thread.self () in - let st = - { - tid; - state_id = A.get state_id_; - thread_ref = FWrite.Thread_ref.inline ~pid ~tid; - local_span_id_gen = A.make 0; - out = None; - spans = Span_info_stack.create (); - } - in - TLS.set key_thread_local_st st; - st - -let[@inline] get_thread_local_st () = - match TLS.get_opt key_thread_local_st with - | Some k -> k - | None -> mk_thread_local_st () - -let out_of_st (st : state) : Output.t = - FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf -> - try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ()) - -module C - (St : sig - val st : state - end) - () = -struct - open St - - let state_id = 1 + A.fetch_and_add state_id_ 1 - - (** prepare the thread's state *) - let[@inline never] update_or_init_local_state (self : per_thread_state) : unit - = - (* get an output *) - let out = out_of_st st in - self.out <- Some out; - - (* try to allocate a thread ref for current thread *) - let th_ref = A.fetch_and_add st.next_thread_ref 1 in - if th_ref <= 0xff then ( - self.thread_ref <- FWrite.Thread_ref.ref th_ref; - FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid () - ); - - (* add to [st]'s list of threads *) - let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in - while - let old = A.get shard_of_per_thread in - not - (A.compare_and_set shard_of_per_thread old - (Int_map.add self.tid self old)) - do - () - done; - - let on_exit _ = - while - let old = A.get shard_of_per_thread in - not - (A.compare_and_set shard_of_per_thread old - (Int_map.remove self.tid old)) - do - () - done; - Option.iter Output.flush self.out - in - - (* after thread exits, flush output and remove from global list *) - Gc.finalise on_exit (Thread.self ()); - () - - (** Obtain the output for the current thread *) - let[@inline] get_thread_output () : Output.t * per_thread_state = - let tls = get_thread_local_st () in - if tls.state_id != state_id || tls.out == None then - update_or_init_local_state tls; - let out = - match tls.out with - | None -> assert false - | Some o -> o - in - out, tls - - let close_per_thread (tls : per_thread_state) = - Option.iter Output.flush tls.out - - (** flush all outputs *) - let flush_all_outputs_ () = - Array.iter - (fun shard -> - let tls_l = A.get shard in - Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l) - st.per_thread - - let shutdown () = - if A.exchange st.active false then ( - flush_all_outputs_ (); - - B_queue.close st.events; - (* wait for writer thread to be done. The writer thread will exit - after processing remaining events because the queue is now closed *) - Thread.join st.bg_thread - ) - - let enter_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : span = - let tls = get_thread_local_st () in - let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in - let time_ns = Time.now_ns () in - Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns; - span - - let exit_span span : unit = - let out, tls = get_thread_output () in - let end_time_ns = Time.now_ns () in - - let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in - if span <> span' then - !on_tracing_error - (spf "span mismatch: top is %Ld, expected %Ld" span' span) - else - FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref - ~time_ns:start_time_ns ~end_time_ns ~args:data () - - let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f = - let out, tls = get_thread_output () in - let time_ns = Time.now_ns () in - let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in - Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name; - - let[@inline] exit () : unit = - let end_time_ns = Time.now_ns () in - - let _span', _, _, data = Span_info_stack.pop tls.spans in - assert (span = _span'); - FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns - ~t_ref:tls.thread_ref ~args:data () - in - - try - let x = f span in - exit (); - x - with exn -> - exit (); - reraise exn - - let add_data_to_span span data = - let tls = get_thread_local_st () in - match Span_info_stack.find_ tls.spans span with - | None -> !on_tracing_error (spf "unknown span %Ld" span) - | Some idx -> Span_info_stack.add_data tls.spans idx data - - let enter_manual_span ~(parent : explicit_span_ctx option) ~flavor - ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span = - let out, tls = get_thread_output () in - let time_ns = Time.now_ns () in - - (* get the id, or make a new one *) - let trace_id = - match parent with - | Some m -> m.trace_id - | None -> mk_trace_id st - in - - FWrite.Event.Async_begin.encode out ~name ~args:data ~t_ref:tls.thread_ref - ~time_ns ~async_id:trace_id (); - { - span = 0L; - trace_id; - meta = Meta_map.(empty |> add key_async_data { name; flavor; data = [] }); - } - - let exit_manual_span (es : explicit_span) : unit = - let { name; data; flavor = _ } = Meta_map.find_exn key_async_data es.meta in - let out, tls = get_thread_output () in - let time_ns = Time.now_ns () in - - FWrite.Event.Async_end.encode out ~name ~t_ref:tls.thread_ref ~time_ns - ~args:data ~async_id:es.trace_id () - - let add_data_to_manual_span (es : explicit_span) data = - let m = Meta_map.find_exn key_async_data es.meta in - m.data <- List.rev_append data m.data - - let message ?span:_ ~data msg : unit = - let out, tls = get_thread_output () in - let time_ns = Time.now_ns () in - FWrite.Event.Instant.encode out ~name:msg ~time_ns ~t_ref:tls.thread_ref - ~args:data () - - let counter_float ~data name f = - let out, tls = get_thread_output () in - let time_ns = Time.now_ns () in - FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref - ~args:((name, `Float f) :: data) - () - - let counter_int ~data name i = - let out, tls = get_thread_output () in - let time_ns = Time.now_ns () in - FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref - ~args:((name, `Int i) :: data) - () - - let name_process name : unit = - let out, _tls = get_thread_output () in - FWrite.Kernel_object.(encode out ~name ~ty:ty_process ~kid:pid ~args:[] ()) - - let name_thread name : unit = - let out, tls = get_thread_output () in - FWrite.Kernel_object.( - encode out ~name ~ty:ty_thread ~kid:tls.tid - ~args:[ "process", `Kid pid ] - ()) - - let extension_event _ = () -end - -let create ~out () : collector = - let buf_pool = Buf_pool.create () in - let events = B_queue.create () in - - let bg_thread = - Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) () - in - - let st = - { - active = A.make true; - buf_pool; - bg_thread; - events; - span_id_gen = A.make 0; - next_thread_ref = A.make 1; - per_thread = Array.init 16 (fun _ -> A.make Int_map.empty); - } - in - - let _tick_thread = Thread.create (fun () -> Bg_thread.tick_thread events) in - - (* write header *) - let out = out_of_st st in - FWrite.Metadata.Magic_record.encode out; - FWrite.Metadata.Initialization_record.( - encode out ~ticks_per_secs:default_ticks_per_sec ()); - FWrite.Metadata.Provider_info.encode out ~id:0 ~name:"ocaml-trace" (); - Output.flush out; - Output.dispose out; - - let module Coll = - C - (struct - let st = st - end) - () - in - (module Coll) - -module Internal_ = struct - let mock_all_ () = - Mock_.enabled := true; - Sub.Private_.get_now_ns_ := Some Mock_.get_now_ns; - Sub.Private_.get_tid_ := Some Mock_.get_tid_ -end diff --git a/src/fuchsia/fcollector.mli b/src/fuchsia/fcollector.mli deleted file mode 100644 index 2ba03e9..0000000 --- a/src/fuchsia/fcollector.mli +++ /dev/null @@ -1,11 +0,0 @@ -open Trace_core - -val create : out:Bg_thread.out -> unit -> collector - -(**/**) - -module Internal_ : sig - val mock_all_ : unit -> unit -end - -(**/**) diff --git a/src/fuchsia/lock.ml b/src/fuchsia/lock.ml new file mode 100644 index 0000000..fe2c1f0 --- /dev/null +++ b/src/fuchsia/lock.ml @@ -0,0 +1,28 @@ +type 'a t = { + mutex: Mutex.t; + mutable content: 'a; +} + +let create content : _ t = { mutex = Mutex.create (); content } + +let with_ (self : _ t) f = + Mutex.lock self.mutex; + try + let x = f self.content in + Mutex.unlock self.mutex; + x + with e -> + let bt = Printexc.get_raw_backtrace () in + Mutex.unlock self.mutex; + Printexc.raise_with_backtrace e bt + +let[@inline] update self f = with_ self (fun x -> self.content <- f x) + +let[@inline] update_map l f = + with_ l (fun x -> + let x', y = f x in + l.content <- x'; + y) + +let[@inline] set_while_locked (self : 'a t) (x : 'a) = self.content <- x + diff --git a/src/fuchsia/lock.mli b/src/fuchsia/lock.mli new file mode 100644 index 0000000..7a4e77b --- /dev/null +++ b/src/fuchsia/lock.mli @@ -0,0 +1,10 @@ +type 'a t +(** A value protected by a mutex *) + +val create : 'a -> 'a t +val with_ : 'a t -> ('a -> 'b) -> 'b +val update : 'a t -> ('a -> 'a) -> unit +val update_map : 'a t -> ('a -> 'a * 'b) -> 'b + +val set_while_locked : 'a t -> 'a -> unit +(** Change the value while inside [with_] or similar. *) diff --git a/src/fuchsia/subscriber.ml b/src/fuchsia/subscriber.ml new file mode 100644 index 0000000..fac5178 --- /dev/null +++ b/src/fuchsia/subscriber.ml @@ -0,0 +1,173 @@ +open Common_ +open Trace_core +module Span_tbl = Trace_subscriber.Span_tbl + +let on_tracing_error = on_tracing_error + +type span_info = { + tid: int; + name: string; + start_ns: int64; + mutable data: (string * Sub.user_data) list; + (* NOTE: thread safety: this is supposed to only be modified by the thread + that's running this (synchronous, stack-abiding) span. *) +} +(** Information we store about a span begin event, to emit a complete event when + we meet the corresponding span end event *) + +type t = { + active: bool A.t; + pid: int; + spans: span_info Span_tbl.t; + buf_chain: Buf_chain.t; + exporter: Exporter.t; +} +(** Subscriber state *) + +open struct + (** Write the buffers that are ready *) + let[@inline] write_ready_ (self : t) = + if Buf_chain.has_ready self.buf_chain then + Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs + + let print_non_closed_spans_warning spans = + let module Str_set = Set.Make (String) in + let spans = Span_tbl.to_list spans in + if spans <> [] then ( + !on_tracing_error + @@ Printf.sprintf "trace-tef: warning: %d spans were not closed\n" + (List.length spans); + let names = + List.fold_left + (fun set (_, span) -> Str_set.add span.name set) + Str_set.empty spans + in + Str_set.iter + (fun name -> + !on_tracing_error @@ Printf.sprintf " span %S was not closed\n" name) + names; + flush stderr + ) +end + +let close (self : t) : unit = + if A.exchange self.active false then ( + Buf_chain.ready_all_non_empty self.buf_chain; + write_ready_ self; + self.exporter.close (); + + print_non_closed_spans_warning self.spans + ) + +let[@inline] active self = A.get self.active + +let flush (self : t) : unit = + Buf_chain.ready_all_non_empty self.buf_chain; + write_ready_ self; + self.exporter.flush () + +let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t = + let buf_chain = Buf_chain.create ~sharded:true ~buf_pool () in + { active = A.make true; buf_chain; exporter; pid; spans = Span_tbl.create () } + +module Callbacks = struct + type st = t + + let on_init (self : st) ~time_ns:_ = + Writer.Metadata.Magic_record.encode self.buf_chain; + Writer.Metadata.Initialization_record.( + encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ()); + Writer.Metadata.Provider_info.encode self.buf_chain ~id:0 + ~name:"ocaml-trace" (); + (* make sure we write these immediately so they're not out of order *) + Buf_chain.ready_all_non_empty self.buf_chain; + + write_ready_ self + + let on_shutdown (self : st) ~time_ns:_ = close self + + let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit = + Writer.Kernel_object.( + encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ()); + write_ready_ self + + let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit = + Writer.Kernel_object.( + encode self.buf_chain ~name ~ty:ty_thread ~kid:tid + ~args:[ "process", A_kid (Int64.of_int self.pid) ] + ()); + write_ready_ self + + (* add function name, if provided, to the metadata *) + let add_fun_name_ fun_name data : _ list = + match fun_name with + | None -> data + | Some f -> ("function", Sub.U_string f) :: data + + let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_ + ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit = + let data = add_fun_name_ fun_name data in + let info = { tid; name; start_ns = time_ns; data } in + (* save the span so we find it at exit *) + Span_tbl.add self.spans span info + + let on_exit_span (self : st) ~time_ns ~tid:_ span : unit = + match Span_tbl.find_exn self.spans span with + | exception Not_found -> + !on_tracing_error (Printf.sprintf "cannot find span %Ld" span) + | { tid; name; start_ns; data } -> + Span_tbl.remove self.spans span; + Writer.( + Event.Duration_complete.encode self.buf_chain ~name + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~time_ns:start_ns ~end_time_ns:time_ns ~args:(args_of_user_data data) + ()); + write_ready_ self + + let on_add_data (self : st) ~data span = + if data <> [] then ( + try + let info = Span_tbl.find_exn self.spans span in + info.data <- List.rev_append data info.data + with Not_found -> + !on_tracing_error (Printf.sprintf "cannot find span %Ld" span) + ) + + let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit = + Writer.( + Event.Instant.encode self.buf_chain + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~name:msg ~time_ns ~args:(args_of_user_data data) ()); + write_ready_ self + + let on_counter (self : st) ~time_ns ~tid ~data ~name n : unit = + Writer.( + Event.Counter.encode self.buf_chain + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~name ~time_ns + ~args:((name, A_float n) :: args_of_user_data data) + ()); + write_ready_ self + + let on_enter_manual_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ + ~time_ns ~tid ~parent:_ ~data ~name ~flavor:_ ~trace_id _span : unit = + Writer.( + Event.Async_begin.encode self.buf_chain ~name + ~args:(args_of_user_data data) + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~time_ns ~async_id:trace_id ()); + write_ready_ self + + let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor:_ + ~trace_id (_ : span) : unit = + Writer.( + Event.Async_end.encode self.buf_chain ~name ~args:(args_of_user_data data) + ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid) + ~time_ns ~async_id:trace_id ()); + write_ready_ self + + let on_extension_event _ ~time_ns:_ ~tid:_ _ev = () +end + +let subscriber (self : t) : Sub.t = + Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) } diff --git a/src/fuchsia/subscriber.mli b/src/fuchsia/subscriber.mli new file mode 100644 index 0000000..66318f5 --- /dev/null +++ b/src/fuchsia/subscriber.mli @@ -0,0 +1,20 @@ +type t +(** Main subscriber state. *) + +val create : ?buf_pool:Buf_pool.t -> pid:int -> exporter:Exporter.t -> unit -> t +(** Create a subscriber state. *) + +val flush : t -> unit +val close : t -> unit +val active : t -> bool + +module Callbacks : Trace_subscriber.Callbacks.S with type st = t + +val subscriber : t -> Trace_subscriber.t +(** Subscriber that writes json into this writer *) + +(**/**) + +val on_tracing_error : (string -> unit) ref + +(**/**) diff --git a/src/fuchsia/trace_fuchsia.ml b/src/fuchsia/trace_fuchsia.ml index f6b2b14..6529192 100644 --- a/src/fuchsia/trace_fuchsia.ml +++ b/src/fuchsia/trace_fuchsia.ml @@ -1,31 +1,50 @@ open Common_ +module Buf = Buf +module Buf_chain = Buf_chain +module Buf_pool = Buf_pool +module Exporter = Exporter +module Subscriber = Subscriber +module Writer = Writer type output = - [ `Stdout - | `Stderr - | `File of string + [ `File of string + | `Exporter of Exporter.t ] -let collector = Fcollector.create +let get_out_ (out : [< output ]) : Exporter.t = + match out with + | `File path -> + let oc = open_out path in + Exporter.of_out_channel ~close_channel:true oc + | `Exporter e -> e + +let subscriber ~out () : Sub.t = + let exporter = get_out_ out in + let pid = + if !Trace_subscriber.Private_.mock then + 2 + else + Unix.getpid () + in + let sub = Subscriber.create ~pid ~exporter () in + Subscriber.subscriber sub + +let collector ~out () = Sub.collector @@ subscriber ~out () let setup ?(out = `Env) () = match out with - | `Stderr -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr () - | `Stdout -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout () - | `File path -> - Trace_core.setup_collector @@ Fcollector.create ~out:(`File path) () + | `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) () + | `Exporter _ as out -> + let sub = subscriber ~out () in + Trace_core.setup_collector @@ Sub.collector sub | `Env -> (match Sys.getenv_opt "TRACE" with | Some ("1" | "true") -> let path = "trace.fxt" in - let c = Fcollector.create ~out:(`File path) () in + let c = collector ~out:(`File path) () in Trace_core.setup_collector c - | Some "stdout" -> - Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout () - | Some "stderr" -> - Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr () | Some path -> - let c = Fcollector.create ~out:(`File path) () in + let c = collector ~out:(`File path) () in Trace_core.setup_collector c | None -> ()) @@ -33,7 +52,24 @@ let with_setup ?out () f = setup ?out (); Fun.protect ~finally:Trace_core.shutdown f +module Mock_ = struct + let now = ref 0 + + (* used to mock timing *) + let get_now_ns () : int64 = + let x = !now in + incr now; + Int64.(mul (of_int x) 1000L) + + let get_tid_ () : int = 3 +end + module Internal_ = struct - let mock_all_ = Fcollector.Internal_.mock_all_ + let mock_all_ () = + Sub.Private_.mock := true; + Sub.Private_.get_now_ns_ := Mock_.get_now_ns; + Sub.Private_.get_tid_ := Mock_.get_tid_; + () + let on_tracing_error = on_tracing_error end diff --git a/src/fuchsia/trace_fuchsia.mli b/src/fuchsia/trace_fuchsia.mli index d28111a..74905dc 100644 --- a/src/fuchsia/trace_fuchsia.mli +++ b/src/fuchsia/trace_fuchsia.mli @@ -6,22 +6,23 @@ trace format}. This reduces the tracing overhead compared to [trace-tef], at the expense of simplicity. *) -val collector : - out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector -(** Make a collector that writes into the given output. See {!setup} for more - details. *) +module Buf = Buf +module Buf_chain = Buf_chain +module Buf_pool = Buf_pool +module Exporter = Exporter +module Subscriber = Subscriber +module Writer = Writer type output = - [ `Stdout - | `Stderr - | `File of string + [ `File of string + | `Exporter of Exporter.t ] -(** Output for tracing. - - [`Stdout] will enable tracing and print events on stdout - - [`Stderr] will enable tracing and print events on stderr - - [`File "foo"] will enable tracing and print events into file named "foo" -*) +val subscriber : out:[< output ] -> unit -> Trace_subscriber.t + +val collector : out:[< output ] -> unit -> Trace_core.collector +(** Make a collector that writes into the given output. See {!setup} for more + details. *) val setup : ?out:[ output | `Env ] -> unit -> unit (** [setup ()] installs the collector depending on [out]. @@ -32,12 +33,10 @@ val setup : ?out:[ output | `Env ] -> unit -> unit - [`Env] will enable tracing if the environment variable "TRACE" is set. - If it's set to "1", then the file is "trace.fxt". - - If it's set to "stdout", then logging happens on stdout (since 0.2) - - If it's set to "stderr", then logging happens on stdout (since 0.2) - Otherwise, if it's set to a non empty string, the value is taken to be the file path into which to write. *) -val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a +val with_setup : ?out:[< output | `Env > `Env ] -> unit -> (unit -> 'a) -> 'a (** [with_setup () f] (optionally) sets a collector up, calls [f()], and makes sure to shutdown before exiting. *) diff --git a/src/fuchsia/write/util.ml b/src/fuchsia/util.ml similarity index 100% rename from src/fuchsia/write/util.ml rename to src/fuchsia/util.ml diff --git a/src/fuchsia/write/buf_pool.ml b/src/fuchsia/write/buf_pool.ml deleted file mode 100644 index 961a2d3..0000000 --- a/src/fuchsia/write/buf_pool.ml +++ /dev/null @@ -1,58 +0,0 @@ -open struct - module A = Trace_core.Internal_.Atomic_ - - exception Got_buf of Buf.t -end - -module List_with_len = struct - type +'a t = - | Nil - | Cons of int * 'a * 'a t - - let empty : _ t = Nil - - let[@inline] len = function - | Nil -> 0 - | Cons (i, _, _) -> i - - let[@inline] cons x self = Cons (len self + 1, x, self) -end - -type t = { - max_len: int; - buf_size: int; - bufs: Buf.t List_with_len.t A.t; -} - -let create ?(max_len = 64) ?(buf_size = 1 lsl 16) () : t = - let buf_size = min (1 lsl 22) (max buf_size (1 lsl 15)) in - { max_len; buf_size; bufs = A.make List_with_len.empty } - -let alloc (self : t) : Buf.t = - try - while - match A.get self.bufs with - | Nil -> false - | Cons (_, buf, tl) as old -> - if A.compare_and_set self.bufs old tl then - raise (Got_buf buf) - else - false - do - () - done; - Buf.create self.buf_size - with Got_buf b -> b - -let recycle (self : t) (buf : Buf.t) : unit = - Buf.clear buf; - try - while - match A.get self.bufs with - | Cons (i, _, _) when i >= self.max_len -> raise Exit - | old -> - not (A.compare_and_set self.bufs old (List_with_len.cons buf old)) - do - () - done - with Exit -> () (* do not recycle *) diff --git a/src/fuchsia/write/dune b/src/fuchsia/write/dune deleted file mode 100644 index 88acba8..0000000 --- a/src/fuchsia/write/dune +++ /dev/null @@ -1,9 +0,0 @@ -(library - (name trace_fuchsia_write) - (public_name trace-fuchsia.write) - (synopsis "Serialization part of trace-fuchsia") - (ocamlopt_flags - :standard - ;-dlambda - ) - (libraries trace.core threads)) diff --git a/src/fuchsia/write/output.ml b/src/fuchsia/write/output.ml deleted file mode 100644 index a2f79d4..0000000 --- a/src/fuchsia/write/output.ml +++ /dev/null @@ -1,65 +0,0 @@ -type t = { - mutable buf: Buf.t; - mutable send_buf: Buf.t -> unit; - buf_pool: Buf_pool.t; -} - -let create ~(buf_pool : Buf_pool.t) ~send_buf () : t = - let buf_size = buf_pool.buf_size in - let buf = Buf.create buf_size in - { buf; send_buf; buf_pool } - -open struct - (* NOTE: there is a potential race condition if an output is - flushed from the main thread upon closing, while - the local thread is blissfully writing new records to it - as we're winding down the collector. This is trying to reduce - the likelyhood of a race happening. *) - let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t = - let old_buf = self.buf in - self.buf <- new_buf; - old_buf - - let flush_ (self : t) : unit = - let new_buf = Buf_pool.alloc self.buf_pool in - let old_buf = replace_buf_ self new_buf in - self.send_buf old_buf - - let[@inline never] cycle_buf (self : t) ~available : Buf.t = - flush_ self; - let buf = self.buf in - - if Buf.available buf < available then ( - let msg = - Printf.sprintf - "fuchsia: buffer is too small (available: %d bytes, needed: %d bytes)" - (Buf.available buf) available - in - failwith msg - ); - buf -end - -let[@inline] flush (self : t) : unit = if Buf.size self.buf > 0 then flush_ self - -(** Maximum size available, in words, for a single message *) -let[@inline] max_size_word (self : t) : int = self.buf_pool.buf_size lsr 3 - -(** Obtain a buffer with at least [available] bytes *) -let[@inline] get_buf (self : t) ~(available_word : int) : Buf.t = - let available = available_word lsl 3 in - if Buf.available self.buf >= available then - self.buf - else - cycle_buf self ~available - -let into_buffer ~buf_pool (buffer : Buffer.t) : t = - let send_buf (buf : Buf.t) = - Buffer.add_subbytes buffer buf.buf 0 buf.offset - in - create ~buf_pool ~send_buf () - -let dispose (self : t) : unit = - flush_ self; - Buf_pool.recycle self.buf_pool self.buf; - self.buf <- Buf.empty diff --git a/src/fuchsia/write/trace_fuchsia_write.ml b/src/fuchsia/writer.ml similarity index 81% rename from src/fuchsia/write/trace_fuchsia_write.ml rename to src/fuchsia/writer.ml index 7cf9bf7..47d2646 100644 --- a/src/fuchsia/write/trace_fuchsia_write.ml +++ b/src/fuchsia/writer.ml @@ -2,14 +2,10 @@ Reference: https://fuchsia.dev/fuchsia-src/reference/tracing/trace-format *) +open Common_ module Util = Util -module Buf = Buf -module Output = Output -module Buf_pool = Buf_pool open struct - let spf = Printf.sprintf - let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 = if id == Trace_core.Collector.dummy_trace_id then 0L @@ -19,7 +15,27 @@ end open Util -type user_data = Trace_core.user_data +type user_data = Sub.user_data = + | U_bool of bool + | U_float of float + | U_int of int + | U_none + | U_string of string + +type arg = + | A_bool of bool + | A_float of float + | A_int of int + | A_none + | A_string of string + | A_kid of int64 + +(* NOTE: only works because [user_data] is a prefix of [arg] and is immutable *) +let arg_of_user_data : user_data -> arg = Obj.magic + +(* NOTE: only works because [user_data] is a prefix of [arg] and is immutable *) +let args_of_user_data : (string * user_data) list -> (string * arg) list = + Obj.magic module I64 = struct include Int64 @@ -111,8 +127,8 @@ module Metadata = struct let value = 0x0016547846040010L let size_word = 1 - let encode (out : Output.t) = - let buf = Output.get_buf out ~available_word:size_word in + let encode (bufs : Buf_chain.t) = + let@ buf = Buf_chain.with_buf bufs ~available_word:size_word in Buf.add_i64 buf value end @@ -122,8 +138,8 @@ module Metadata = struct (** Default: 1 tick = 1 ns *) let default_ticks_per_sec = 1_000_000_000L - let encode (out : Output.t) ~ticks_per_secs () : unit = - let buf = Output.get_buf out ~available_word:size_word in + let encode (bufs : Buf_chain.t) ~ticks_per_secs () : unit = + let@ buf = Buf_chain.with_buf bufs ~available_word:size_word in let hd = I64.(1L lor (of_int size_word lsl 4)) in Buf.add_i64 buf hd; Buf.add_i64 buf ticks_per_secs @@ -132,10 +148,10 @@ module Metadata = struct module Provider_info = struct let size_word ~name () = 1 + str_len_word name - let encode (out : Output.t) ~(id : int) ~name () : unit = + let encode (bufs : Buf_chain.t) ~(id : int) ~name () : unit = let name = truncate_string name in let size = size_word ~name () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.( (of_int size lsl 4) @@ -152,29 +168,29 @@ module Metadata = struct end module Argument = struct - type 'a t = string * ([< user_data | `Kid of int ] as 'a) + type t = string * arg - let check_valid_ : _ t -> unit = function - | _, `String s -> assert (String.length s < max_str_len) + let check_valid_ : t -> unit = function + | _, A_string s -> assert (String.length s < max_str_len) | _ -> () let[@inline] is_i32_ (i : int) : bool = Int32.(to_int (of_int i) = i) - let size_word (self : _ t) = + let size_word (self : t) = let name, data = self in match data with - | `None | `Bool _ -> 1 + str_len_word name - | `Int i when is_i32_ i -> 1 + str_len_word name - | `Int _ -> (* int64 *) 2 + str_len_word name - | `Float _ -> 2 + str_len_word name - | `String s -> 1 + str_len_word_maybe_too_big s + str_len_word name - | `Kid _ -> 2 + str_len_word name + | A_none | A_bool _ -> 1 + str_len_word name + | A_int i when is_i32_ i -> 1 + str_len_word name + | A_int _ -> (* int64 *) 2 + str_len_word name + | A_float _ -> 2 + str_len_word name + | A_string s -> 1 + str_len_word_maybe_too_big s + str_len_word name + | A_kid _ -> 2 + str_len_word name open struct external int_of_bool : bool -> int = "%identity" end - let encode (buf : Buf.t) (self : _ t) : unit = + let encode (buf : Buf.t) (self : t) : unit = let name, data = self in let name = truncate_string name in let size = size_word self in @@ -187,26 +203,26 @@ module Argument = struct in match data with - | `None -> + | A_none -> let hd = hd_arg_size in Buf.add_i64 buf hd; Buf.add_string buf name - | `Int i when is_i32_ i -> + | A_int i when is_i32_ i -> let hd = I64.(1L lor hd_arg_size lor (of_int i lsl 32)) in Buf.add_i64 buf hd; Buf.add_string buf name - | `Int i -> + | A_int i -> (* int64 *) let hd = I64.(3L lor hd_arg_size) in Buf.add_i64 buf hd; Buf.add_string buf name; Buf.add_i64 buf (I64.of_int i) - | `Float f -> + | A_float f -> let hd = I64.(5L lor hd_arg_size) in Buf.add_i64 buf hd; Buf.add_string buf name; Buf.add_i64 buf (I64.bits_of_float f) - | `String s -> + | A_string s -> let s = truncate_string s in let hd = I64.( @@ -216,35 +232,35 @@ module Argument = struct Buf.add_i64 buf hd; Buf.add_string buf name; Buf.add_string buf s - | `Bool b -> + | A_bool b -> let hd = I64.(9L lor hd_arg_size lor (of_int (int_of_bool b) lsl 16)) in Buf.add_i64 buf hd; Buf.add_string buf name - | `Kid kid -> + | A_kid kid -> (* int64 *) let hd = I64.(8L lor hd_arg_size) in Buf.add_i64 buf hd; Buf.add_string buf name; - Buf.add_i64 buf (I64.of_int kid) + Buf.add_i64 buf kid end module Arguments = struct - type 'a t = 'a Argument.t list + type t = Argument.t list - let[@inline] len (self : _ t) : int = + let[@inline] len (self : t) : int = match self with | [] -> 0 | [ _ ] -> 1 | _ :: _ :: tl -> 2 + List.length tl - let check_valid (self : _ t) = + let check_valid (self : t) = let len = len self in if len > 15 then invalid_arg (spf "fuchsia: can have at most 15 args, got %d" len); List.iter Argument.check_valid_ self; () - let[@inline] size_word (self : _ t) = + let[@inline] size_word (self : t) = match self with | [] -> 0 | [ a ] -> Argument.size_word a @@ -254,7 +270,7 @@ module Arguments = struct (Argument.size_word a + Argument.size_word b) tl - let[@inline] encode (buf : Buf.t) (self : _ t) = + let[@inline] encode (buf : Buf.t) (self : t) = let rec aux buf l = match l with | [] -> () @@ -276,11 +292,11 @@ module Thread_record = struct let size_word : int = 3 (** Record that [Thread_ref.ref as_ref] represents the pair [pid, tid] *) - let encode (out : Output.t) ~as_ref ~pid ~tid () : unit = + let encode (bufs : Buf_chain.t) ~as_ref ~pid ~tid () : unit = if as_ref <= 0 || as_ref > 255 then invalid_arg "fuchsia: thread_record: invalid ref"; - let buf = Output.get_buf out ~available_word:size_word in + let@ buf = Buf_chain.with_buf bufs ~available_word:size_word in let hd = I64.(3L lor (of_int size_word lsl 4) lor (of_int as_ref lsl 16)) in Buf.add_i64 buf hd; @@ -296,11 +312,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () - : unit = + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args + () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in (* set category = 0 *) let hd = @@ -331,11 +347,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args + 1 (* counter id *) - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () - : unit = + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args + () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.( @@ -368,11 +384,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () - : unit = + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args + () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.( @@ -403,11 +419,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () - : unit = + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args + () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.( @@ -438,11 +454,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args + 1 (* end timestamp *) - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~end_time_ns ~args () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in (* set category = 0 *) let hd = @@ -475,11 +491,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args + 1 (* async id *) - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~(async_id : Trace_core.trace_id) ~args () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.( @@ -511,11 +527,11 @@ module Event = struct 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + str_len_word name + Arguments.size_word args + 1 (* async id *) - let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + let encode (bufs : Buf_chain.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~(async_id : Trace_core.trace_id) ~args () : unit = let name = truncate_string name in let size = size_word ~name ~t_ref ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.( @@ -556,10 +572,11 @@ module Kernel_object = struct let ty_process : ty = 1 let ty_thread : ty = 2 - let encode (out : Output.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit = + let encode (bufs : Buf_chain.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit + = let name = truncate_string name in let size = size_word ~name ~args () in - let buf = Output.get_buf out ~available_word:size in + let@ buf = Buf_chain.with_buf bufs ~available_word:size in let hd = I64.(