first implem of trace-spall

This commit is contained in:
Simon Cruanes 2026-04-26 21:14:29 -04:00
parent 8005926bfc
commit 73c4562551
9 changed files with 748 additions and 0 deletions

View file

@ -109,4 +109,22 @@
(tags
(trace tracing trace runtime-events)))
(package
(name trace-spall)
(synopsis
"A fast binary backend for trace, emitting Spall profiler format (.spall)")
(depends
(ocaml
(>= 4.08))
(trace
(= :version))
(mtime
(>= 2.0))
(thread-local-storage
(>= 0.2))
base-unix
dune)
(tags
(trace tracing spall profiling)))
; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project

View file

@ -0,0 +1,176 @@
open struct
module TLS = Thread_local_storage
module A = Trace_core.Internal_.Atomic_
let with_lock mu f =
Mutex.lock mu;
match f () with
| v ->
Mutex.unlock mu;
v
| exception e ->
let bt = Printexc.get_raw_backtrace () in
Mutex.unlock mu;
Printexc.raise_with_backtrace e bt
let ( let@ ) = ( @@ )
end
open Trace_core
type span += Span_spall
type thread_state = {
tid: int;
pid: int;
mutable first_ts: int64;
buf: Buffer.t;
mutable prev: thread_state;
mutable next: thread_state;
}
type t = {
active: bool A.t;
pid: int;
mu: Mutex.t; (** guards: [oc] writes, [all_states] updates *)
all_states: thread_state Lazy.t;
(** double linked list. first item is always the main thread. *)
oc: out_channel;
close_oc: bool;
high_water_mark: int;
}
(* One module-level TLS slot shared across all instances; stores a (collector,
thread_state) pair so we detect stale entries from a previous collector. *)
let tls_key : (t * thread_state) TLS.t = TLS.create ()
let flush_thread_ (self : t) (ts : thread_state) : unit =
let size = Buffer.length ts.buf in
if size > 0 then (
let hdr =
Writer.write_header ~size ~tid:ts.tid ~pid:ts.pid ~first_ts:ts.first_ts
in
output_string self.oc hdr;
Buffer.output_buffer self.oc ts.buf;
(* if we overallocated by a lot, free memory *)
Buffer.reset ts.buf;
ts.first_ts <- 0L
)
open struct
let buf_size = 4_096
let default_high_water_mark = buf_size
end
let create_thread_state_ (self : t) : thread_state =
let tid = Trace_util.Mock_.get_tid () in
let buf = Buffer.create buf_size in
let rec ts =
{ tid; pid = self.pid; first_ts = 0L; buf; prev = ts; next = ts }
in
TLS.set tls_key (self, ts);
ts
(* Returns the thread_state for this thread+collector, creating it on first
call. Fast path (after initialization) is lock-free. *)
let get_or_create_thread_ (self : t) : thread_state =
match TLS.get_opt tls_key with
| Some (c, ts) when c == self -> ts
| _ ->
let ts = create_thread_state_ self in
with_lock self.mu (fun () ->
let (lazy start) = self.all_states in
ts.prev <- start.prev;
ts.next <- start;
start.prev.next <- ts;
start.prev <- ts);
ts
let flush_all_ self =
let (lazy start) = self.all_states in
let cur = ref start in
let continue = ref true in
while !continue do
flush_thread_ self !cur;
cur := !cur.next;
if !cur == start then continue := false
done
let close (self : t) : unit =
if A.exchange self.active false then (
let@ () = with_lock self.mu in
flush_all_ self;
flush self.oc;
if self.close_oc then close_out_noerr self.oc
)
let create ~pid ~oc ?(close_oc = true)
?(high_water_mark = default_high_water_mark) () : t =
let rec self =
{
active = A.make true;
pid;
mu = Mutex.create ();
all_states = lazy (create_thread_state_ self);
oc;
close_oc;
high_water_mark;
}
in
ignore (Lazy.force self.all_states : thread_state);
let hdr = Buffer.create 32 in
Writer.write_file_header hdr ~timestamp_unit:1e-3;
Buffer.output_buffer oc hdr;
flush oc;
self
open struct
type st = t
let init _ = ()
let shutdown (self : st) = close self
let enter_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~level:_
~params:_ ~data:_ ~parent:_ name : span =
let ts = Trace_util.Mock_.now_ns () in
let tst = get_or_create_thread_ self in
if Buffer.length tst.buf = 0 then tst.first_ts <- ts;
Writer.write_begin tst.buf ~ts ~name;
(if Buffer.length tst.buf >= self.high_water_mark then
let@ () = with_lock self.mu in
flush_thread_ self tst);
Span_spall
let exit_span (self : st) sp =
match sp with
| Span_spall ->
let ts = Trace_util.Mock_.now_ns () in
let tst = get_or_create_thread_ self in
Writer.write_end tst.buf ~ts;
if Buffer.length tst.buf >= self.high_water_mark then
let@ () = with_lock self.mu in
flush_thread_ self tst
| _ -> ()
let add_data_to_span _st _sp _data = ()
let message _self ~level:_ ~params:_ ~data:_ ~span:_ _msg = ()
let metric _self ~level:_ ~params:_ ~data:_ _name _m = ()
let extension (self : st) ~level:_ ev =
match ev with
| Core_ext.Extension_set_thread_name name ->
let tst = get_or_create_thread_ self in
let@ () = with_lock self.mu in
Writer.write_name tst.buf ~kind:`Thread ~name
| Core_ext.Extension_set_process_name name ->
let tst = get_or_create_thread_ self in
let@ () = with_lock self.mu in
Writer.write_name tst.buf ~kind:`Process ~name
| _ -> ()
end
let callbacks : _ Collector.Callbacks.t =
Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span
~add_data_to_span ~message ~metric ~extension ()
let collector (self : t) : Collector.t = Collector.C_some (self, callbacks)

View file

@ -0,0 +1,12 @@
type t
val create :
pid:int ->
oc:out_channel ->
?close_oc:bool ->
?high_water_mark:int ->
unit ->
t
val collector : t -> Trace_core.Collector.t
val close : t -> unit

6
src/spall/dune Normal file
View file

@ -0,0 +1,6 @@
(library
(name trace_spall)
(public_name trace-spall)
(synopsis
"A fast binary backend for trace, emitting Spall profiler format (.spall)")
(libraries trace.core trace.util unix threads thread-local-storage))

361
src/spall/spall.h Normal file
View file

@ -0,0 +1,361 @@
// SPDX-FileCopyrightText: © 2023 Phillip Trudeau-Tavara <pmttavara@protonmail.com>
// SPDX-License-Identifier: MIT
/*
TODO: Optional Helper APIs:
- Compression API: would require a mutexed lockable context (yuck...)
- Either using a ZIP library, a name cache + TIDPID cache, or both (but ZIP is likely more than enough!!!)
- begin()/end() writes compressed chunks to a caller-determined destination
- The destination can be the buffered-writing API or a custom user destination
- Ultimately need to take a lock with some granularity... can that be the caller's responsibility?
- Counter Event: should allow tracking arbitrary named values with a single event, for memory and frame profiling
- Ring-buffer API
spall_ring_init
spall_ring_emit_begin
spall_ring_emit_end
spall_ring_flush
*/
#ifndef SPALL_H
#define SPALL_H
#if !defined(_MSC_VER) || defined(__clang__)
#define SPALL_NOINSTRUMENT __attribute__((no_instrument_function))
#define SPALL_FORCEINLINE __attribute__((always_inline))
#else
#ifndef _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_WARNINGS
#endif
#define SPALL_NOINSTRUMENT // Can't noinstrument on MSVC!
#define SPALL_FORCEINLINE __forceinline
#endif
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#define SPALL_FN static inline SPALL_NOINSTRUMENT
#define SPALL_MIN(a, b) (((a) < (b)) ? (a) : (b))
#define SPALL_MAX(a, b) (((a) > (b)) ? (a) : (b))
#pragma pack(push, 1)
typedef struct SpallHeader {
uint64_t magic_header; // = 0x0BADF00D
uint64_t version; // = 3
double timestamp_unit;
uint64_t must_be_0;
} SpallHeader;
typedef enum {
SpallEventType_Invalid = 0,
SpallEventType_Custom_Data = 1, // Basic readers can skip this.
SpallEventType_StreamOver = 2,
SpallEventType_Begin = 3,
SpallEventType_End = 4,
SpallEventType_Instant = 5,
SpallEventType_Overwrite_Timestamp = 6, // Retroactively change timestamp units - useful for incrementally improving RDTSC frequency.
SpallEventType_Pad_Skip = 7,
SpallEventType_NameProcess = 8,
SpallEventType_NameThread = 9,
} SpallEventType;
typedef struct SpallBufferHeader {
uint32_t size;
uint32_t tid;
uint32_t pid;
uint64_t first_ts;
} SpallBufferHeader;
typedef struct SpallBeginEvent {
uint8_t type; // = SpallEventType_Begin
uint64_t when;
uint8_t name_length;
uint8_t args_length;
} SpallBeginEvent;
typedef struct SpallBeginEventMax {
SpallBeginEvent event;
char name_bytes[255];
char args_bytes[255];
} SpallBeginEventMax;
typedef struct SpallEndEvent {
uint8_t type; // = SpallEventType_End
uint64_t when;
} SpallEndEvent;
typedef struct SpallPadSkipEvent {
uint8_t type; // = SpallEventType_Pad_Skip
uint32_t size;
} SpallPadSkipEvent;
typedef struct SpallNameContainerEvent {
uint8_t type; // = SpallEventType_NameThread/Process
uint8_t name_length;
} SpallNameContainerEvent;
typedef struct SpallNameContainerEventMax {
SpallNameContainerEvent event;
char name_bytes[255];
} SpallNameContainerEventMax;
#pragma pack(pop)
typedef struct SpallProfile SpallProfile;
// Important!: If you define your own callbacks, mark them SPALL_NOINSTRUMENT!
typedef bool (*SpallWriteCallback)(SpallProfile *self, const void *data, size_t length);
typedef bool (*SpallFlushCallback)(SpallProfile *self);
typedef void (*SpallCloseCallback)(SpallProfile *self);
struct SpallProfile {
double timestamp_unit;
SpallWriteCallback write;
SpallFlushCallback flush;
SpallCloseCallback close;
void *data;
};
// Important!: If you are writing Begin/End events, then do NOT write
// events for the same PID + TID pair on different buffers!!!
typedef struct SpallBuffer {
void *data;
size_t length;
uint32_t tid;
uint32_t pid;
// Internal data - don't assign this
size_t head;
uint64_t first_ts;
} SpallBuffer;
#ifdef __cplusplus
extern "C" {
#endif
SPALL_FN SPALL_FORCEINLINE bool spall__file_write(SpallProfile *ctx, const void *p, size_t n) {
if (fwrite(p, n, 1, (FILE *)ctx->data) != 1) return false;
return true;
}
SPALL_FN bool spall__file_flush(SpallProfile *ctx) {
if (fflush((FILE *)ctx->data)) return false;
return true;
}
SPALL_FN void spall__file_close(SpallProfile *ctx) {
fclose((FILE *)ctx->data);
ctx->data = NULL;
}
SPALL_FN SPALL_FORCEINLINE bool spall__buffer_flush(SpallProfile *ctx, SpallBuffer *wb, uint64_t ts) {
wb->first_ts = SPALL_MAX(wb->first_ts, ts);
SpallBufferHeader hdr;
hdr.size = wb->head - sizeof(SpallBufferHeader);
hdr.pid = wb->pid;
hdr.tid = wb->tid;
hdr.first_ts = wb->first_ts;
memcpy(wb->data, &hdr, sizeof(hdr));
if (!ctx->write(ctx, wb->data, wb->head)) return false;
if (!ctx->flush(ctx)) return false;
wb->head = sizeof(SpallBufferHeader);
return true;
}
SPALL_FN bool spall_buffer_flush(SpallProfile *ctx, SpallBuffer *wb) {
if (!spall__buffer_flush(ctx, wb, 0)) return false;
return true;
}
SPALL_FN bool spall_buffer_quit(SpallProfile *ctx, SpallBuffer *wb) {
if (!spall_buffer_flush(ctx, wb)) return false;
return true;
}
SPALL_FN size_t spall_build_header(void *buffer, size_t rem_size, double timestamp_unit) {
size_t header_size = sizeof(SpallHeader);
if (header_size > rem_size) {
return 0;
}
SpallHeader *header = (SpallHeader *)buffer;
header->magic_header = 0x0BADF00D;
header->version = 3;
header->timestamp_unit = timestamp_unit;
header->must_be_0 = 0;
return header_size;
}
SPALL_FN SPALL_FORCEINLINE size_t spall_build_begin(void *buffer, size_t rem_size, const char *name, int32_t name_len, const char *args, int32_t args_len, uint64_t when) {
SpallBeginEventMax *ev = (SpallBeginEventMax *)buffer;
uint8_t trunc_name_len = (uint8_t)SPALL_MIN(name_len, 255); // will be interpreted as truncated in the app (?)
uint8_t trunc_args_len = (uint8_t)SPALL_MIN(args_len, 255); // will be interpreted as truncated in the app (?)
size_t ev_size = sizeof(SpallBeginEvent) + trunc_name_len + trunc_args_len;
if (ev_size > rem_size) {
return 0;
}
ev->event.type = SpallEventType_Begin;
ev->event.when = when;
ev->event.name_length = trunc_name_len;
ev->event.args_length = trunc_args_len;
memcpy(ev->name_bytes, name, trunc_name_len);
memcpy(ev->name_bytes + trunc_name_len, args, trunc_args_len);
return ev_size;
}
SPALL_FN SPALL_FORCEINLINE size_t spall_build_end(void *buffer, size_t rem_size, uint64_t when) {
size_t ev_size = sizeof(SpallEndEvent);
if (ev_size > rem_size) {
return 0;
}
SpallEndEvent *ev = (SpallEndEvent *)buffer;
ev->type = SpallEventType_End;
ev->when = when;
return ev_size;
}
SPALL_FN SPALL_FORCEINLINE size_t spall_build_name(void *buffer, size_t rem_size, const char *name, int32_t name_len, SpallEventType type) {
SpallNameContainerEventMax *ev = (SpallNameContainerEventMax *)buffer;
uint8_t trunc_name_len = (uint8_t)SPALL_MIN(name_len, 255); // will be interpreted as truncated in the app (?)
size_t ev_size = sizeof(SpallNameContainerEvent) + trunc_name_len;
if (ev_size > rem_size) {
return 0;
}
ev->event.type = type;
ev->event.name_length = trunc_name_len;
memcpy(ev->name_bytes, name, trunc_name_len);
return ev_size;
}
SPALL_FN void spall_quit(SpallProfile *ctx) {
if (!ctx) return;
if (ctx->close) ctx->close(ctx);
memset(ctx, 0, sizeof(*ctx));
}
SPALL_FN bool spall_init_callbacks(double timestamp_unit,
SpallWriteCallback write,
SpallFlushCallback flush,
SpallCloseCallback close,
void *userdata,
SpallProfile *ctx) {
if (timestamp_unit < 0) return false;
memset(ctx, 0, sizeof(*ctx));
ctx->timestamp_unit = timestamp_unit;
ctx->data = userdata;
ctx->write = write;
ctx->flush = flush;
ctx->close = close;
SpallHeader header;
size_t len = spall_build_header(&header, sizeof(header), timestamp_unit);
if (!ctx->write(ctx, &header, len)) {
spall_quit(ctx);
return false;
}
return true;
}
SPALL_FN bool spall_init_file(const char* filename, double timestamp_unit, SpallProfile *ctx) {
if (!filename) return false;
FILE *f = fopen(filename, "wb"); // TODO: handle utf8 and long paths on windows
if (f) { // basically freopen() but we don't want to force users to lug along another macro define
fclose(f);
f = fopen(filename, "ab");
}
if (!f) { return false; }
return spall_init_callbacks(timestamp_unit, spall__file_write, spall__file_flush, spall__file_close, (void *)f, ctx);
}
SPALL_FN bool spall_flush(SpallProfile *ctx) {
if (!ctx->flush(ctx)) return false;
return true;
}
SPALL_FN bool spall_buffer_init(SpallProfile *ctx, SpallBuffer *wb) {
// Fails if buffer is not big enough to contain at least one event!
if (wb->length < sizeof(SpallBufferHeader) + sizeof(SpallBeginEventMax)) {
return false;
}
wb->head = sizeof(SpallBufferHeader);
return true;
}
SPALL_FN SPALL_FORCEINLINE bool spall_buffer_begin_args(SpallProfile *ctx, SpallBuffer *wb, const char *name, int32_t name_len, const char *args, int32_t args_len, uint64_t when) {
if ((wb->head + sizeof(SpallBeginEventMax)) > wb->length) {
if (!spall__buffer_flush(ctx, wb, when)) {
return false;
}
}
wb->head += spall_build_begin((char *)wb->data + wb->head, wb->length - wb->head, name, name_len, args, args_len, when);
return true;
}
SPALL_FN bool spall_buffer_begin(SpallProfile *ctx, SpallBuffer *wb, const char *name, int32_t name_len, uint64_t when) {
return spall_buffer_begin_args(ctx, wb, name, name_len, "", 0, when);
}
SPALL_FN bool spall_buffer_end(SpallProfile *ctx, SpallBuffer *wb, uint64_t when) {
if ((wb->head + sizeof(SpallEndEvent)) > wb->length) {
if (!spall__buffer_flush(ctx, wb, when)) {
return false;
}
}
wb->head += spall_build_end((char *)wb->data + wb->head, wb->length - wb->head, when);
return true;
}
SPALL_FN bool spall_buffer_name_thread(SpallProfile *ctx, SpallBuffer *wb, const char *name, int32_t name_len) {
if ((wb->head + sizeof(SpallNameContainerEvent)) > wb->length) {
if (!spall__buffer_flush(ctx, wb, 0)) {
return false;
}
}
wb->head += spall_build_name((char *)wb->data + wb->head, wb->length - wb->head, name, name_len, SpallEventType_NameThread);
return true;
}
SPALL_FN bool spall_buffer_name_process(SpallProfile *ctx, SpallBuffer *wb, const char *name, int32_t name_len) {
if ((wb->head + sizeof(SpallNameContainerEvent)) > wb->length) {
if (!spall__buffer_flush(ctx, wb, 0)) {
return false;
}
}
wb->head += spall_build_name((char *)wb->data + wb->head, wb->length - wb->head, name, name_len, SpallEventType_NameProcess);
return true;
}
#ifdef __cplusplus
}
#endif
#endif // SPALL_H

53
src/spall/trace_spall.ml Normal file
View file

@ -0,0 +1,53 @@
open Trace_core
module Collector_spall = Collector_spall
module Writer = Writer
type output =
[ `Stdout
| `Stderr
| `File of string
]
let collector ~(out : [< output ]) () : collector =
let oc, close_oc =
match out with
| `Stdout -> stdout, false
| `Stderr -> stderr, false
| `File path -> open_out path, true
in
let pid = Trace_util.Mock_.get_pid () in
let st = Collector_spall.create ~pid ~oc ~close_oc () in
Collector_spall.collector st
open struct
let register_atexit =
let has_registered = ref false in
fun () ->
if not !has_registered then (
has_registered := true;
at_exit Trace_core.shutdown
)
end
let setup ?(out = `Env) () =
register_atexit ();
let make_col out = Trace_core.setup_collector (collector ~out ()) in
match out with
| `Stderr -> make_col `Stderr
| `Stdout -> make_col `Stdout
| `File path -> make_col (`File path)
| `Env ->
(match Sys.getenv_opt "TRACE" with
| Some ("1" | "true") -> make_col (`File "trace.spall")
| Some "stdout" -> make_col `Stdout
| Some "stderr" -> make_col `Stderr
| Some path -> make_col (`File path)
| None -> ())
let with_setup ?out () f =
setup ?out ();
Fun.protect ~finally:Trace_core.shutdown f
module Private_ = struct
let mock_all_ () = Trace_util.Mock_.mock_all ()
end

40
src/spall/trace_spall.mli Normal file
View file

@ -0,0 +1,40 @@
(** Spall backend for Trace.
This emits Spall binary traces (.spall), viewable at
https://gravitymoth.com/spall/spall.html .
Reference format: spall.h (vendored alongside this library). *)
module Collector_spall = Collector_spall
module Writer = Writer
type output =
[ `Stdout
| `Stderr
| `File of string
]
(** Output destination for tracing. *)
val collector : out:[< output ] -> unit -> Trace_core.collector
(** Make a collector writing to the given output. *)
val setup : ?out:[ output | `Env ] -> unit -> unit
(** [setup ()] installs the Spall collector.
@param out
where to write events:
- a {!output} value, or
- [`Env] (default): enabled if the [TRACE] environment variable is set.
["1"] or ["true"] writes to [trace.spall]; any other value is the path;
["stdout"] / ["stderr"] write to those streams. *)
val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a
(** [with_setup () f] sets up the collector, calls [f()], then shuts down. *)
(**/**)
module Private_ : sig
val mock_all_ : unit -> unit
end
(**/**)

45
src/spall/writer.ml Normal file
View file

@ -0,0 +1,45 @@
open struct
let[@inline] add_u8 buf n = Buffer.add_char buf (Char.chr (n land 0xff))
let add_u64_le = Buffer.add_int64_le
let[@inline] add_f64_le buf f = add_u64_le buf (Int64.bits_of_float f)
end
let write_file_header buf ~timestamp_unit =
add_u64_le buf 0x0BADF00DL;
add_u64_le buf 3L;
add_f64_le buf timestamp_unit;
add_u64_le buf 0L
let write_header ~size ~tid ~pid ~first_ts : string =
let buf = Bytes.create 20 in
Bytes.set_int32_le buf 0 (Int32.of_int size);
Bytes.set_int32_le buf 4 (Int32.of_int tid);
Bytes.set_int32_le buf 8 (Int32.of_int pid);
Bytes.set_int64_le buf 12 first_ts;
Bytes.unsafe_to_string buf
(* type=3 Begin: type(u8) when(u64) name_len(u8) args_len(u8) name_bytes *)
let write_begin buf ~ts ~name =
let name_len = min (String.length name) 255 in
add_u8 buf 3;
add_u64_le buf ts;
add_u8 buf name_len;
add_u8 buf 0;
Buffer.add_substring buf name 0 name_len
(* type=4 End: type(u8) when(u64) *)
let write_end buf ~ts =
add_u8 buf 4;
add_u64_le buf ts
(* type=8 NameProcess / type=9 NameThread: type(u8) name_len(u8) name_bytes *)
let write_name buf ~kind ~name =
let ty =
match kind with
| `Process -> 8
| `Thread -> 9
in
let name_len = min (String.length name) 255 in
add_u8 buf ty;
add_u8 buf name_len;
Buffer.add_substring buf name 0 name_len

37
trace-spall.opam Normal file
View file

@ -0,0 +1,37 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.12"
synopsis:
"A fast binary backend for trace, emitting Spall profiler format (.spall)"
maintainer: ["Simon Cruanes"]
authors: ["Simon Cruanes"]
license: "MIT"
tags: ["trace" "tracing" "spall" "profiling"]
homepage: "https://github.com/ocaml-tracing/ocaml-trace"
bug-reports: "https://github.com/ocaml-tracing/ocaml-trace/issues"
depends: [
"ocaml" {>= "4.08"}
"trace" {= version}
"mtime" {>= "2.0"}
"thread-local-storage" {>= "0.2"}
"base-unix"
"dune" {>= "2.9"}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"--promote-install-files=false"
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
["dune" "install" "-p" name "--create-install-files" name]
]
dev-repo: "git+https://github.com/ocaml-tracing/ocaml-trace.git"