From 68d3969cde1facce3b2dfbd4b2f4bc28e8bbd754 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 25 Dec 2023 22:52:50 -0500 Subject: [PATCH] good progress on fuchsia collector --- src/fuchsia/bg_thread.ml | 121 ++++--- src/fuchsia/common_.ml | 8 + src/fuchsia/dune | 2 + src/fuchsia/fcollector.ml | 265 +++++++++++++++ src/fuchsia/fcollector.mli | 3 + src/fuchsia/global_.ml.tmp | 4 + src/fuchsia/time.ml | 18 +- src/fuchsia/trace_fuchsia.ml | 408 +---------------------- src/fuchsia/trace_fuchsia.mli | 3 - src/fuchsia/write/buf.ml | 42 +++ src/fuchsia/write/buf_pool.ml | 58 ++++ src/fuchsia/write/dune | 3 + src/fuchsia/write/output.ml | 46 +++ src/fuchsia/write/trace_fuchsia_write.ml | 214 +++++++++--- src/fuchsia/write/util.ml | 5 + 15 files changed, 666 insertions(+), 534 deletions(-) create mode 100644 src/fuchsia/fcollector.ml create mode 100644 src/fuchsia/fcollector.mli create mode 100644 src/fuchsia/global_.ml.tmp create mode 100644 src/fuchsia/write/buf.ml create mode 100644 src/fuchsia/write/buf_pool.ml create mode 100644 src/fuchsia/write/output.ml create mode 100644 src/fuchsia/write/util.ml diff --git a/src/fuchsia/bg_thread.ml b/src/fuchsia/bg_thread.ml index e12767b..1ac6aa0 100644 --- a/src/fuchsia/bg_thread.ml +++ b/src/fuchsia/bg_thread.ml @@ -1,74 +1,67 @@ open Common_ -(** Background thread, takes events from the queue, puts them - in context using local state, and writes fully resolved - TEF events to [out]. *) -let bg_thread ~out (events : event B_queue.t) : unit = - (* open a writer to [out] *) - Writer.with_ ~out @@ fun writer -> - (* local state, to keep track of span information and implicit stack context *) - let spans : span_info Span_tbl.t = Span_tbl.create 32 in +type out = + [ `Stdout + | `Stderr + | `File of string + ] - (* add function name, if provided, to the metadata *) - let add_fun_name_ fun_name data : _ list = - match fun_name with - | None -> data - | Some f -> ("function", `String f) :: data +type event = + | E_write_buf of Buf.t + | E_tick + +type state = { + buf_pool: Buf_pool.t; + oc: out_channel; + events: event B_queue.t; +} + +let with_out_ (out : out) f = + let oc, must_close = + match out with + | `Stdout -> stdout, false + | `Stderr -> stderr, false + | `File path -> open_out path, true in - (* how to deal with an event *) - let handle_ev (ev : event) : unit = - match ev with - | E_tick -> Writer.flush writer - | E_message { tid; msg; time_us; data } -> - Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer - | E_define_span { tid; name; id; time_us; fun_name; data } -> - let data = add_fun_name_ fun_name data in - let info = { tid; name; start_us = time_us; data } in - (* save the span so we find it at exit *) - Span_tbl.add spans id info - | E_exit_span { id; time_us = stop_us } -> - (match Span_tbl.find_opt spans id with - | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) - | Some { tid; name; start_us; data } -> - Span_tbl.remove spans id; - Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us - ~args:data writer) - | E_add_data { id; data } -> - (match Span_tbl.find_opt spans id with - | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) - | Some info -> info.data <- List.rev_append data info.data) - | E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } -> - let data = add_fun_name_ fun_name data in - Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor - writer - | E_exit_manual_span { tid; time_us; name; id; flavor; data } -> - Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data - writer - | E_counter { tid; name; time_us; n } -> - Writer.emit_counter ~name ~tid ~ts:time_us writer n - | E_name_process { name } -> Writer.emit_name_process ~name writer - | E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer - in + if must_close then ( + let finally () = close_out_noerr oc in + Fun.protect ~finally (fun () -> f oc) + ) else + f oc - try - while true do - (* get all the events in the incoming blocking queue, in - one single critical section. *) - let local = B_queue.pop_all events in - List.iter handle_ev local - done - with B_queue.Closed -> - (* write a message about us closing *) - Writer.emit_instant_event ~name:"tef-worker.exit" - ~tid:(Thread.id @@ Thread.self ()) - ~ts:(now_us ()) ~args:[] writer; +let handle_ev (self : state) (ev : event) : unit = + match ev with + | E_tick -> flush self.oc + | E_write_buf buf -> + output self.oc buf.buf 0 buf.offset; + Buf_pool.recycle self.buf_pool buf - (* warn if app didn't close all spans *) - if Span_tbl.length spans > 0 then - Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!" - (Span_tbl.length spans); - () +let bg_loop (self : state) : unit = + let continue = ref true in + + while !continue do + match B_queue.pop_all self.events with + | exception B_queue.Closed -> continue := false + | evs -> List.iter (handle_ev self) evs + done + +let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit = + let@ oc = with_out_ out in + let st = { oc; buf_pool; events } in + bg_loop st + +(* TODO: + (* write a message about us closing *) + Writer.emit_instant_event ~name:"tef-worker.exit" + ~tid:(Thread.id @@ Thread.self ()) + ~ts:(now_us ()) ~args:[] writer; + + (* warn if app didn't close all spans *) + if Span_tbl.length spans > 0 then + Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!" + (Span_tbl.length spans); +*) (** Thread that simply regularly "ticks", sending events to the background thread so it has a chance to write to the file *) diff --git a/src/fuchsia/common_.ml b/src/fuchsia/common_.ml index 986880b..38ded80 100644 --- a/src/fuchsia/common_.ml +++ b/src/fuchsia/common_.ml @@ -1,4 +1,9 @@ module A = Trace_core.Internal_.Atomic_ +module FWrite = Trace_fuchsia_write +module B_queue = Trace_private_util.B_queue +module Buf = FWrite.Buf +module Buf_pool = FWrite.Buf_pool +module Output = FWrite.Output module Span_tbl = Hashtbl.Make (struct include Int64 @@ -8,3 +13,6 @@ end) let on_tracing_error = ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s) + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf diff --git a/src/fuchsia/dune b/src/fuchsia/dune index 32bd35e..f8fd6c6 100644 --- a/src/fuchsia/dune +++ b/src/fuchsia/dune @@ -4,5 +4,7 @@ (name trace_fuchsia) (public_name trace-fuchsia) (synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file") + (flags :standard -w -27) ; TODO: remove (libraries trace.core trace.private.util thread-local-storage + (re_export trace-fuchsia.write) mtime mtime.clock.os atomic unix threads)) diff --git a/src/fuchsia/fcollector.ml b/src/fuchsia/fcollector.ml new file mode 100644 index 0000000..ecfb046 --- /dev/null +++ b/src/fuchsia/fcollector.ml @@ -0,0 +1,265 @@ +open Trace_core +open Common_ +module TLS = Thread_local_storage + +let pid = Unix.getpid () + +type state = { + active: bool A.t; + events: Bg_thread.event B_queue.t; + span_id_gen: int A.t; (** Used for async spans *) + bg_thread: Thread.t; + buf_pool: Buf_pool.t; + next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) +} + +type span_info = { + start_time_ns: int64; + name: string; + mutable data: (string * user_data) list; +} + +(* TODO: + (** key used to carry a unique "id" for all spans in an async context *) + let key_async_id : int Meta_map.Key.t = Meta_map.Key.create () + + let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t = + Meta_map.Key.create () + + let key_data : (string * user_data) list ref Meta_map.Key.t = + Meta_map.Key.create () +*) + +open struct + let state_id_ = A.make 0 + + (* re-raise exception with its backtrace *) + external reraise : exn -> 'a = "%reraise" +end + +type per_thread_state = { + tid: int; + state_id: int; (** ID of the current collector state *) + local_span_id_gen: int A.t; (** Used for thread-local spans *) + mutable thread_ref: FWrite.Thread_ref.t; + mutable out: Output.t option; + spans: span_info Span_tbl.t; (** In-flight spans *) +} + +let key_thread_local_st : per_thread_state TLS.key = + TLS.new_key (fun () -> + let tid = Thread.id @@ Thread.self () in + { + tid; + state_id = A.get state_id_; + thread_ref = FWrite.Thread_ref.inline ~pid ~tid; + local_span_id_gen = A.make 0; + out = None; + spans = Span_tbl.create 32; + }) + +let out_of_st (st : state) : Output.t = + FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf -> + if A.get st.active then ( + try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> () + )) + +module C (St : sig + val st : state +end) +() = +struct + open St + + let state_id = 1 + A.fetch_and_add state_id_ 1 + + (** prepare the thread's state *) + let[@inline never] update_local_state (self : per_thread_state) : unit = + (* get an output *) + let out = out_of_st st in + self.out <- Some out; + + (* try to allocate a thread ref for current thread *) + let th_ref = A.fetch_and_add st.next_thread_ref 1 in + if th_ref <= 0xff then ( + self.thread_ref <- FWrite.Thread_ref.ref th_ref; + FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid () + ); + () + + (** Obtain the output for the current thread *) + let[@inline] get_thread_output () : Output.t * per_thread_state = + let st = TLS.get key_thread_local_st in + if st.state_id != state_id || st.out == None then update_local_state st; + Option.get st.out, st + + let shutdown () = + if A.exchange st.active false then ( + B_queue.close st.events; + (* wait for writer thread to be done. The writer thread will exit + after processing remaining events because the queue is now closed *) + Thread.join st.bg_thread + ) + + let enter_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : span = + let tls = TLS.get key_thread_local_st in + let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in + let time_ns = Time.now_ns () in + Span_tbl.add tls.spans span { name; data; start_time_ns = time_ns }; + span + + let exit_span span : unit = + let out, tls = get_thread_output () in + let end_time_ns = Time.now_ns () in + match Span_tbl.find_opt tls.spans span with + | None -> !on_tracing_error (spf "unknown span %Ld" span) + | Some info -> + Span_tbl.remove tls.spans span; + FWrite.Event.Duration_complete.encode out ~name:info.name + ~t_ref:tls.thread_ref ~time_ns:info.start_time_ns ~end_time_ns + ~args:info.data () + + let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in + let info = { start_time_ns = time_ns; data; name } in + Span_tbl.add tls.spans span info; + + let[@inline] exit () : unit = + let end_time_ns = Time.now_ns () in + Span_tbl.remove tls.spans span; + FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns + ~t_ref:tls.thread_ref ~args:info.data () + in + + try + let x = f span in + exit (); + x + with exn -> + exit (); + reraise exn + + let add_data_to_span span data = + let tls = TLS.get key_thread_local_st in + match Span_tbl.find_opt tls.spans span with + | None -> !on_tracing_error (spf "unknown span %Ld" span) + | Some info -> info.data <- List.rev_append data info.data + + let enter_manual_span ~(parent : explicit_span option) ~flavor + ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span + = + assert false + (* TODO: + (* get the id, or make a new one *) + let id = + match parent with + | Some m -> Meta_map.find_exn key_async_id m.meta + | None -> A.fetch_and_add span_id_gen_ 1 + in + let time_us = now_us () in + B_queue.push events + (E_enter_manual_span + { id; time_us; tid = get_tid_ (); data; name; fun_name; flavor }); + { + span = 0L; + meta = + Meta_map.( + empty |> add key_async_id id |> add key_async_data (name, flavor)); + } + *) + + let exit_manual_span (es : explicit_span) : unit = assert false + (* TODO: + let id = Meta_map.find_exn key_async_id es.meta in + let name, flavor = Meta_map.find_exn key_async_data es.meta in + let data = + try !(Meta_map.find_exn key_data es.meta) with Not_found -> [] + in + let time_us = now_us () in + let tid = get_tid_ () in + B_queue.push events + (E_exit_manual_span { tid; id; name; time_us; data; flavor }) + *) + + let add_data_to_manual_span (es : explicit_span) data = assert false + (* TODO: + if data <> [] then ( + let data_ref, add = + try Meta_map.find_exn key_data es.meta, false + with Not_found -> ref [], true + in + let new_data = List.rev_append data !data_ref in + data_ref := new_data; + if add then es.meta <- Meta_map.add key_data data_ref es.meta + ) + *) + + let message ?span:_ ~data msg : unit = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + FWrite.Event.Instant.encode out ~name:msg ~time_ns ~t_ref:tls.thread_ref + ~args:data () + + let counter_float ~data name f = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref + ~args:((name, `Float f) :: data) + () + + let counter_int ~data name i = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref + ~args:((name, `Int i) :: data) + () + + let name_process name : unit = () + (* TODO: B_queue.push events (E_name_process { name }) *) + + let name_thread name : unit = () + (* TODO: + let tid = get_tid_ () in + B_queue.push events (E_name_thread { tid; name }) + *) +end + +let create ~out () : collector = + let buf_pool = Buf_pool.create () in + let events = B_queue.create () in + + let bg_thread = + Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) () + in + let _tick_thread = Thread.create Bg_thread.tick_thread events in + + let st = + { + active = A.make true; + buf_pool; + bg_thread; + events; + span_id_gen = A.make 0; + next_thread_ref = A.make 1; + } + in + + (* write header *) + let out = out_of_st st in + FWrite.Metadata.Magic_record.encode out; + FWrite.Metadata.Initialization_record.( + encode out ~ticks_per_secs:default_ticks_per_sec ()); + FWrite.Metadata.Provider_info.encode out ~id:0 ~name:"ocaml-trace" (); + Output.flush out; + Output.dispose out; + + let module Coll = + C + (struct + let st = st + end) + () + in + (module Coll) diff --git a/src/fuchsia/fcollector.mli b/src/fuchsia/fcollector.mli new file mode 100644 index 0000000..780b3f1 --- /dev/null +++ b/src/fuchsia/fcollector.mli @@ -0,0 +1,3 @@ +open Trace_core + +val create : out:Bg_thread.out -> unit -> collector diff --git a/src/fuchsia/global_.ml.tmp b/src/fuchsia/global_.ml.tmp new file mode 100644 index 0000000..49df805 --- /dev/null +++ b/src/fuchsia/global_.ml.tmp @@ -0,0 +1,4 @@ +(** A bit of global state that can be reached + from each thread without too much overhead *) + +open Common_ diff --git a/src/fuchsia/time.ml b/src/fuchsia/time.ml index dd11ae5..d988369 100644 --- a/src/fuchsia/time.ml +++ b/src/fuchsia/time.ml @@ -1,20 +1,6 @@ -module Mock_ = struct - let enabled = ref false - let now = ref 0 - - let[@inline never] now_us () : int64 = - let x = !now in - incr now; - Int64.of_int x -end - let counter = Mtime_clock.counter () (** Now, in nanoseconds *) let[@inline] now_ns () : int64 = - if !Mock_.enabled then - Mock_.now_us () - else ( - let t = Mtime_clock.count counter in - Mtime.Span.to_uint64_ns t - ) + let t = Mtime_clock.count counter in + Mtime.Span.to_uint64_ns t diff --git a/src/fuchsia/trace_fuchsia.ml b/src/fuchsia/trace_fuchsia.ml index 68e7c42..102a744 100644 --- a/src/fuchsia/trace_fuchsia.ml +++ b/src/fuchsia/trace_fuchsia.ml @@ -1,416 +1,31 @@ -open Trace_core -open Trace_private_util open Common_ -(* -type span_info = { - tid: int; - name: string; - start_ns: float; - mutable data: (string * user_data) list; -} - -(** key used to carry a unique "id" for all spans in an async context *) -let key_async_id : int Meta_map.Key.t = Meta_map.Key.create () - -let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t = - Meta_map.Key.create () - -let key_data : (string * user_data) list ref Meta_map.Key.t = - Meta_map.Key.create () - *) - -(* TODO: - (** Writer: knows how to write entries to a file in TEF format *) - module Writer = struct - type t = { - oc: out_channel; - mutable first: bool; (** first event? *) - buf: Buffer.t; (** Buffer to write into *) - must_close: bool; (** Do we have to close the underlying channel [oc]? *) - pid: int; - } - (** A writer to a [out_channel]. It writes JSON entries in an array - and closes the array at the end. *) - - let create ~out () : t = - let oc, must_close = - match out with - | `Stdout -> stdout, false - | `Stderr -> stderr, false - | `File path -> open_out path, true - in - let pid = - if !Mock_.enabled then - 2 - else - Unix.getpid () - in - output_char oc '['; - { oc; first = true; pid; must_close; buf = Buffer.create 2_048 } - - let close (self : t) : unit = - output_char self.oc ']'; - flush self.oc; - if self.must_close then close_out self.oc - - let with_ ~out f = - let writer = create ~out () in - Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) - - let[@inline] flush (self : t) : unit = flush self.oc - - (** Emit "," if we need, and get the buffer ready *) - let emit_sep_and_start_ (self : t) = - Buffer.reset self.buf; - if self.first then - self.first <- false - else - Buffer.add_string self.buf ",\n" - - let char = Buffer.add_char - let raw_string = Buffer.add_string - - let str_val (buf : Buffer.t) (s : string) = - char buf '"'; - let encode_char c = - match c with - | '"' -> raw_string buf {|\"|} - | '\\' -> raw_string buf {|\\|} - | '\n' -> raw_string buf {|\n|} - | '\b' -> raw_string buf {|\b|} - | '\r' -> raw_string buf {|\r|} - | '\t' -> raw_string buf {|\t|} - | _ when Char.code c <= 0x1f -> - raw_string buf {|\u00|}; - Printf.bprintf buf "%02x" (Char.code c) - | c -> char buf c - in - String.iter encode_char s; - char buf '"' - - let pp_user_data_ (out : Buffer.t) : [< user_data ] -> unit = function - | `None -> raw_string out "null" - | `Int i -> Printf.bprintf out "%d" i - | `Bool b -> Printf.bprintf out "%b" b - | `String s -> str_val out s - | `Float f -> Printf.bprintf out "%g" f - - (* emit args, if not empty. [ppv] is used to print values. *) - let emit_args_o_ ppv (out : Buffer.t) args : unit = - if args <> [] then ( - Printf.bprintf out {json|,"args": {|json}; - List.iteri - (fun i (n, value) -> - if i > 0 then raw_string out ","; - Printf.bprintf out {json|"%s":%a|json} n ppv value) - args; - char out '}' - ) - - let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit = - let dur = end_ -. start in - let ts = start in - - emit_sep_and_start_ self; - - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} - self.pid tid dur ts str_val name - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_manual_begin ~tid ~name ~id ~ts ~args ~flavor (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - self.pid id tid ts str_val name - (match flavor with - | None | Some `Async -> 'b' - | Some `Sync -> 'B') - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_manual_end ~tid ~name ~id ~ts ~flavor ~args (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} - self.pid id tid ts str_val name - (match flavor with - | None | Some `Async -> 'e' - | Some `Sync -> 'E') - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} - self.pid tid ts str_val name - (emit_args_o_ pp_user_data_) - args; - Buffer.output_buffer self.oc self.buf - - let emit_name_thread ~tid ~name (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid - tid - (emit_args_o_ pp_user_data_) - [ "name", `String name ]; - Buffer.output_buffer self.oc self.buf - - let emit_name_process ~name (self : t) : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid - (emit_args_o_ pp_user_data_) - [ "name", `String name ]; - Buffer.output_buffer self.oc self.buf - - let emit_counter ~name ~tid ~ts (self : t) f : unit = - emit_sep_and_start_ self; - Printf.bprintf self.buf - {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid - tid ts - (emit_args_o_ pp_user_data_) - [ name, `Float f ]; - Buffer.output_buffer self.oc self.buf - end -*) - -(* TODO: - (** Background thread, takes events from the queue, puts them - in context using local state, and writes fully resolved - TEF events to [out]. *) - let bg_thread ~out (events : event B_queue.t) : unit = - (* open a writer to [out] *) - Writer.with_ ~out @@ fun writer -> - (* local state, to keep track of span information and implicit stack context *) - let spans : span_info Span_tbl.t = Span_tbl.create 32 in - - (* add function name, if provided, to the metadata *) - let add_fun_name_ fun_name data : _ list = - match fun_name with - | None -> data - | Some f -> ("function", `String f) :: data - in - - (* how to deal with an event *) - let handle_ev (ev : event) : unit = - match ev with - | E_tick -> Writer.flush writer - | E_message { tid; msg; time_us; data } -> - Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer - | E_define_span { tid; name; id; time_us; fun_name; data } -> - let data = add_fun_name_ fun_name data in - let info = { tid; name; start_us = time_us; data } in - (* save the span so we find it at exit *) - Span_tbl.add spans id info - | E_exit_span { id; time_us = stop_us } -> - (match Span_tbl.find_opt spans id with - | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) - | Some { tid; name; start_us; data } -> - Span_tbl.remove spans id; - Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us - ~args:data writer) - | E_add_data { id; data } -> - (match Span_tbl.find_opt spans id with - | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) - | Some info -> info.data <- List.rev_append data info.data) - | E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } -> - let data = add_fun_name_ fun_name data in - Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor - writer - | E_exit_manual_span { tid; time_us; name; id; flavor; data } -> - Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data - writer - | E_counter { tid; name; time_us; n } -> - Writer.emit_counter ~name ~tid ~ts:time_us writer n - | E_name_process { name } -> Writer.emit_name_process ~name writer - | E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer - in - - try - while true do - (* get all the events in the incoming blocking queue, in - one single critical section. *) - let local = B_queue.pop_all events in - List.iter handle_ev local - done - with B_queue.Closed -> - (* write a message about us closing *) - Writer.emit_instant_event ~name:"tef-worker.exit" - ~tid:(Thread.id @@ Thread.self ()) - ~ts:(now_us ()) ~args:[] writer; - - (* warn if app didn't close all spans *) - if Span_tbl.length spans > 0 then - Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!" - (Span_tbl.length spans); - () - - (** Thread that simply regularly "ticks", sending events to - the background thread so it has a chance to write to the file *) - let tick_thread events : unit = - try - while true do - Thread.delay 0.5; - B_queue.push events E_tick - done - with B_queue.Closed -> () -*) - type output = [ `Stdout | `Stderr | `File of string ] -let collector ~out () : collector = assert false -(* TODO: - let module M = struct - let active = A.make true - - (** generator for span ids *) - let span_id_gen_ = A.make 0 - - (* queue of messages to write *) - let events : event B_queue.t = B_queue.create () - - (** writer thread. It receives events and writes them to [oc]. *) - let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) () - - (** ticker thread, regularly sends a message to the writer thread. - no need to join it. *) - let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () - - let shutdown () = - if A.exchange active false then ( - B_queue.close events; - (* wait for writer thread to be done. The writer thread will exit - after processing remaining events because the queue is now closed *) - Thread.join t_write - ) - - let get_tid_ () : int = - if !Mock_.enabled then - 3 - else - Thread.id (Thread.self ()) - - let[@inline] enter_span_ ~fun_name ~data name : span = - let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in - let tid = get_tid_ () in - let time_us = now_us () in - B_queue.push events - (E_define_span { tid; name; time_us; id = span; fun_name; data }); - span - - let enter_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : - span = - enter_span_ ~fun_name ~data name - - let exit_span span : unit = - let time_us = now_us () in - B_queue.push events (E_exit_span { id = span; time_us }) - - (* re-raise exception with its backtrace *) - external reraise : exn -> 'a = "%reraise" - - let with_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name f = - let span = enter_span_ ~fun_name ~data name in - try - let x = f span in - exit_span span; - x - with exn -> - exit_span span; - reraise exn - - let add_data_to_span span data = - if data <> [] then B_queue.push events (E_add_data { id = span; data }) - - let enter_manual_span ~(parent : explicit_span option) ~flavor - ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : - explicit_span = - (* get the id, or make a new one *) - let id = - match parent with - | Some m -> Meta_map.find_exn key_async_id m.meta - | None -> A.fetch_and_add span_id_gen_ 1 - in - let time_us = now_us () in - B_queue.push events - (E_enter_manual_span - { id; time_us; tid = get_tid_ (); data; name; fun_name; flavor }); - { - span = 0L; - meta = - Meta_map.( - empty |> add key_async_id id |> add key_async_data (name, flavor)); - } - - let exit_manual_span (es : explicit_span) : unit = - let id = Meta_map.find_exn key_async_id es.meta in - let name, flavor = Meta_map.find_exn key_async_data es.meta in - let data = - try !(Meta_map.find_exn key_data es.meta) with Not_found -> [] - in - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events - (E_exit_manual_span { tid; id; name; time_us; data; flavor }) - - let add_data_to_manual_span (es : explicit_span) data = - if data <> [] then ( - let data_ref, add = - try Meta_map.find_exn key_data es.meta, false - with Not_found -> ref [], true - in - let new_data = List.rev_append data !data_ref in - data_ref := new_data; - if add then es.meta <- Meta_map.add key_data data_ref es.meta - ) - - let message ?span:_ ~data msg : unit = - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events (E_message { tid; time_us; msg; data }) - - let counter_float ~data:_ name f = - let time_us = now_us () in - let tid = get_tid_ () in - B_queue.push events (E_counter { name; n = f; time_us; tid }) - - let counter_int ~data name i = counter_float ~data name (float_of_int i) - let name_process name : unit = B_queue.push events (E_name_process { name }) - - let name_thread name : unit = - let tid = get_tid_ () in - B_queue.push events (E_name_thread { tid; name }) - end in - (module M) -*) +let collector = Fcollector.create let setup ?(out = `Env) () = match out with - | `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr () - | `Stdout -> Trace_core.setup_collector @@ collector ~out:`Stdout () - | `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) () + | `Stderr -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr () + | `Stdout -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout () + | `File path -> + Trace_core.setup_collector @@ Fcollector.create ~out:(`File path) () | `Env -> (match Sys.getenv_opt "TRACE" with | Some ("1" | "true") -> let path = "trace.fxt" in - let c = collector ~out:(`File path) () in + let c = Fcollector.create ~out:(`File path) () in Trace_core.setup_collector c - | Some "stdout" -> Trace_core.setup_collector @@ collector ~out:`Stdout () - | Some "stderr" -> Trace_core.setup_collector @@ collector ~out:`Stderr () + | Some "stdout" -> + Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout () + | Some "stderr" -> + Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr () | Some path -> - let c = collector ~out:(`File path) () in + let c = Fcollector.create ~out:(`File path) () in Trace_core.setup_collector c | None -> ()) @@ -419,6 +34,5 @@ let with_setup ?out () f = Fun.protect ~finally:Trace_core.shutdown f module Internal_ = struct - let mock_all_ () = Mock_.enabled := true let on_tracing_error = on_tracing_error end diff --git a/src/fuchsia/trace_fuchsia.mli b/src/fuchsia/trace_fuchsia.mli index f6fa66e..b08620a 100644 --- a/src/fuchsia/trace_fuchsia.mli +++ b/src/fuchsia/trace_fuchsia.mli @@ -40,9 +40,6 @@ val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a (**/**) module Internal_ : sig - val mock_all_ : unit -> unit - (** use fake, deterministic timestamps, TID, PID *) - val on_tracing_error : (string -> unit) ref end diff --git a/src/fuchsia/write/buf.ml b/src/fuchsia/write/buf.ml new file mode 100644 index 0000000..2b13b34 --- /dev/null +++ b/src/fuchsia/write/buf.ml @@ -0,0 +1,42 @@ +open Util + +type t = { + buf: bytes; + mutable offset: int; +} + +let empty : t = { buf = Bytes.empty; offset = 0 } + +let create (n : int) : t = + let buf = Bytes.create (round_to_word n) in + { buf; offset = 0 } + +let[@inline] clear self = self.offset <- 0 +let[@inline] available self = Bytes.length self.buf - self.offset +let[@inline] size self = self.offset + +(* see below: we assume little endian *) +let () = assert (not Sys.big_endian) + +let[@inline] add_i64 (self : t) (i : int64) : unit = + (* NOTE: we use LE, most systems are this way, even though fuchsia + says we should use the system's native endianess *) + Bytes.set_int64_le self.buf self.offset i; + self.offset <- self.offset + 8 + +let[@inline] add_string (self : t) (s : string) : unit = + let len = String.length s in + let missing = missing_to_round len in + + (* bound check *) + assert (len + missing + self.offset < Bytes.length self.buf); + Bytes.unsafe_blit_string s 0 self.buf self.offset len; + self.offset <- self.offset + len; + + (* add 0-padding *) + if missing != 0 then ( + Bytes.unsafe_fill self.buf self.offset missing '\x00'; + self.offset <- self.offset + missing + ) + +let to_string (self : t) : string = Bytes.sub_string self.buf 0 self.offset diff --git a/src/fuchsia/write/buf_pool.ml b/src/fuchsia/write/buf_pool.ml new file mode 100644 index 0000000..fc9cf45 --- /dev/null +++ b/src/fuchsia/write/buf_pool.ml @@ -0,0 +1,58 @@ +open struct + module A = Atomic + + exception Got_buf of Buf.t +end + +module List_with_len = struct + type +'a t = + | Nil + | Cons of int * 'a * 'a t + + let empty : _ t = Nil + + let[@inline] len = function + | Nil -> 0 + | Cons (i, _, _) -> i + + let[@inline] cons x self = Cons (len self + 1, x, self) +end + +type t = { + max_len: int; + buf_size: int; + bufs: Buf.t List_with_len.t A.t; +} + +let create ?(max_len = 64) ?(buf_size = 1 lsl 16) () : t = + let buf_size = min (1 lsl 22) (max buf_size (1 lsl 15)) in + { max_len; buf_size; bufs = A.make List_with_len.empty } + +let alloc (self : t) : Buf.t = + try + while + match A.get self.bufs with + | Nil -> false + | Cons (_, buf, tl) as old -> + if A.compare_and_set self.bufs old tl then + raise (Got_buf buf) + else + false + do + () + done; + Buf.create self.buf_size + with Got_buf b -> b + +let recycle (self : t) (buf : Buf.t) : unit = + Buf.clear buf; + try + while + match A.get self.bufs with + | Cons (i, _, _) when i >= self.max_len -> raise Exit + | old -> + not (A.compare_and_set self.bufs old (List_with_len.cons buf old)) + do + () + done + with Exit -> () (* do not recycle *) diff --git a/src/fuchsia/write/dune b/src/fuchsia/write/dune index c4d7ad1..9b8634d 100644 --- a/src/fuchsia/write/dune +++ b/src/fuchsia/write/dune @@ -3,4 +3,7 @@ (name trace_fuchsia_write) (public_name trace-fuchsia.write) (synopsis "Serialization part of trace-fuchsia") + (ocamlopt_flags :standard -S + ;-dlambda + ) (libraries trace.core atomic threads)) diff --git a/src/fuchsia/write/output.ml b/src/fuchsia/write/output.ml new file mode 100644 index 0000000..3911192 --- /dev/null +++ b/src/fuchsia/write/output.ml @@ -0,0 +1,46 @@ +type t = { + mutable buf: Buf.t; + mutable send_buf: Buf.t -> unit; + buf_pool: Buf_pool.t; +} + +let create ~(buf_pool : Buf_pool.t) ~send_buf () : t = + let buf_size = buf_pool.buf_size in + let buf = Buf.create buf_size in + { buf; send_buf; buf_pool } + +open struct + let flush_ (self : t) : unit = + self.send_buf self.buf; + let buf = Buf_pool.alloc self.buf_pool in + self.buf <- buf + + let[@inline never] cycle_buf (self : t) ~available : Buf.t = + flush_ self; + let buf = self.buf in + + if Buf.available buf < available then + failwith "fuchsia: buffer is too small"; + buf +end + +let[@inline] flush (self : t) : unit = if Buf.size self.buf > 0 then flush_ self + +(** Obtain a buffer with at least [available] bytes *) +let[@inline] get_buf (self : t) ~(available_word : int) : Buf.t = + let available = available_word lsl 3 in + if Buf.available self.buf >= available then + self.buf + else + cycle_buf self ~available + +let into_buffer ~buf_pool (buffer : Buffer.t) : t = + let send_buf (buf : Buf.t) = + Buffer.add_subbytes buffer buf.buf 0 buf.offset + in + create ~buf_pool ~send_buf () + +let dispose (self : t) : unit = + flush_ self; + Buf_pool.recycle self.buf_pool self.buf; + self.buf <- Buf.empty diff --git a/src/fuchsia/write/trace_fuchsia_write.ml b/src/fuchsia/write/trace_fuchsia_write.ml index 4bed4cb..8791f9a 100644 --- a/src/fuchsia/write/trace_fuchsia_write.ml +++ b/src/fuchsia/write/trace_fuchsia_write.ml @@ -2,53 +2,17 @@ Reference: https://fuchsia.dev/fuchsia-src/reference/tracing/trace-format *) -module B = Bytes +module Util = Util +module Buf = Buf +module Output = Output +module Buf_pool = Buf_pool open struct let spf = Printf.sprintf end -module Util = struct - (** How many bytes are missing for [n] to be a multiple of 8 *) - let[@inline] missing_to_round (n : int) : int = lnot (n - 1) land 0b111 - - (** Round up to a multiple of 8 *) - let[@inline] round_to_word (n : int) : int = n + (lnot (n - 1) land 0b111) -end - open Util -module Buf = struct - type t = { - buf: bytes; - mutable offset: int; - } - - let create (n : int) : t = - let buf = Bytes.create (round_to_word n) in - { buf; offset = 0 } - - let[@inline] clear self = self.offset <- 0 - - let[@inline] add_i64 (self : t) (i : int64) : unit = - (* NOTE: we use LE, most systems are this way, even though fuchsia - says we should use the system's native endianess *) - Bytes.set_int64_le self.buf self.offset i; - self.offset <- self.offset + 8 - - let add_string (self : t) (s : string) : unit = - let len = String.length s in - Bytes.blit_string s 0 self.buf self.offset len; - self.offset <- self.offset + len; - - (* add 0-padding *) - let missing = missing_to_round len in - Bytes.fill self.buf self.offset missing '\x00'; - self.offset <- self.offset + missing - - let to_string (self : t) : string = Bytes.sub_string self.buf 0 self.offset -end - type user_data = Trace_core.user_data module I64 = struct @@ -85,6 +49,8 @@ module Thread_ref = struct tid: int; } + let inline ~pid ~tid : t = Inline { pid; tid } + let ref x : t = if x = 0 || x > 255 then invalid_arg "fuchsia: thread inline ref must be >0 < 256"; @@ -108,7 +74,10 @@ module Metadata = struct module Magic_record = struct let value = 0x0016547846040010L let size_word = 1 - let encode (buf : Buf.t) = Buf.add_i64 buf value + + let encode (out : Output.t) = + let buf = Output.get_buf out ~available_word:size_word in + Buf.add_i64 buf value end module Initialization_record = struct @@ -117,7 +86,8 @@ module Metadata = struct (** Default: 1 tick = 1 ns *) let default_ticks_per_sec = 1_000_000_000L - let encode (buf : Buf.t) ~ticks_per_secs () : unit = + let encode (out : Output.t) ~ticks_per_secs () : unit = + let buf = Output.get_buf out ~available_word:size_word in let hd = I64.(1L lor (of_int size_word lsl 4)) in Buf.add_i64 buf hd; Buf.add_i64 buf ticks_per_secs @@ -126,8 +96,9 @@ module Metadata = struct module Provider_info = struct let size_word ~name () = 1 + (round_to_word (String.length name) lsr 3) - let encode buf ~(id : int) ~name () : unit = + let encode (out : Output.t) ~(id : int) ~name () : unit = let size = size_word ~name () in + let buf = Output.get_buf out ~available_word:size in let hd = I64.( (of_int size lsl 4) @@ -216,17 +187,30 @@ end module Arguments = struct type t = Argument.t list + let[@inline] len (self : t) : int = + match self with + | [] -> 0 + | [ _ ] -> 1 + | _ :: _ :: tl -> 2 + List.length tl + let check_valid (self : t) = - let len = List.length self in + let len = len self in if len > 15 then invalid_arg (spf "fuchsia: can have at most 15 args, got %d" len); List.iter Argument.check_valid self; () let[@inline] size_word (self : t) = - List.fold_left (fun n arg -> n + Argument.size_word arg) 0 self + match self with + | [] -> 0 + | [ a ] -> Argument.size_word a + | a :: b :: tl -> + List.fold_left + (fun n arg -> n + Argument.size_word arg) + (Argument.size_word a + Argument.size_word b) + tl - let encode (buf : Buf.t) (self : t) = + let[@inline] encode (buf : Buf.t) (self : t) = let rec aux buf l = match l with | [] -> () @@ -234,7 +218,13 @@ module Arguments = struct Argument.encode buf x; aux buf tl in - aux buf self + + match self with + | [] -> () + | [ x ] -> Argument.encode buf x + | x :: tl -> + Argument.encode buf x; + aux buf tl end (** record type = 3 *) @@ -242,9 +232,12 @@ module Thread_record = struct let size_word : int = 3 (** Record that [Thread_ref.ref as_ref] represents the pair [pid, tid] *) - let encode (buf : Buf.t) ~as_ref ~pid ~tid () : unit = + let encode (out : Output.t) ~as_ref ~pid ~tid () : unit = if as_ref <= 0 || as_ref > 255 then invalid_arg "fuchsia: thread_record: invalid ref"; + + let buf = Output.get_buf out ~available_word:size_word in + let hd = I64.(3L lor (of_int size_word lsl 4) lor (of_int as_ref lsl 16)) in Buf.add_i64 buf hd; Buf.add_i64 buf (I64.of_int pid); @@ -253,21 +246,24 @@ end (** record type = 4 *) module Event = struct + (** type=0 *) module Instant = struct let size_word ~name ~t_ref ~args () : int = 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + (round_to_word (String.length name) / 8) + Arguments.size_word args - let encode (buf : Buf.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () : - unit = + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + (* set category = 0 *) let hd = I64.( 4L lor (of_int size lsl 4) - lor (of_int (List.length args) lsl 20) + lor (of_int (Arguments.len args) lsl 20) lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) lor (of_int (Str_ref.inline (String.length name)) lsl 48)) in @@ -285,22 +281,132 @@ module Event = struct () end + (** type=1 *) + module Counter = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* counter id *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (1L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + (* just use 0 as counter id *) + Buf.add_i64 buf 0L; + () + end + + (** type=2 *) + module Duration_begin = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (2L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + () + end + + (** type=3 *) + module Duration_end = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (3L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + () + end + + (** type=4 *) module Duration_complete = struct let size_word ~name ~t_ref ~args () : int = 1 + Thread_ref.size_word t_ref + 1 (* timestamp *) + (round_to_word (String.length name) lsr 3) + Arguments.size_word args + 1 (* end timestamp *) - let encode (buf : Buf.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~end_time_ns - ~args () : unit = + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + ~end_time_ns ~args () : unit = let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + (* set category = 0 *) let hd = I64.( 4L lor (of_int size lsl 4) lor (4L lsl 16) - lor (of_int (List.length args) lsl 20) + lor (of_int (Arguments.len args) lsl 20) lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) lor (of_int (Str_ref.inline (String.length name)) lsl 48)) in diff --git a/src/fuchsia/write/util.ml b/src/fuchsia/write/util.ml new file mode 100644 index 0000000..7af7dec --- /dev/null +++ b/src/fuchsia/write/util.ml @@ -0,0 +1,5 @@ +(** How many bytes are missing for [n] to be a multiple of 8 *) +let[@inline] missing_to_round (n : int) : int = lnot (n - 1) land 0b111 + +(** Round up to a multiple of 8 *) +let[@inline] round_to_word (n : int) : int = n + (lnot (n - 1) land 0b111)