wip: batching

This commit is contained in:
Simon Cruanes 2023-10-30 21:56:49 -04:00
parent 157957530a
commit 435873b79d
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 20 additions and 1 deletions

View file

@ -29,6 +29,7 @@
(trace (= :version)) (trace (= :version))
(mtime (>= 2.0)) (mtime (>= 2.0))
base-unix base-unix
thread-local-storage
dune) dune)
(tags (tags
(trace tracing catapult))) (trace tracing catapult)))

View file

@ -3,4 +3,4 @@
(name trace_tef) (name trace_tef)
(public_name trace-tef) (public_name trace-tef)
(synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process")
(libraries trace.core mtime mtime.clock.os unix threads)) (libraries trace.core mtime mtime.clock.os unix threads thread-local-storage))

View file

@ -1,5 +1,6 @@
open Trace_core open Trace_core
module A = Trace_core.Internal_.Atomic_ module A = Trace_core.Internal_.Atomic_
module TLS = Thread_local_storage
module Mock_ = struct module Mock_ = struct
let enabled = ref false let enabled = ref false
@ -336,12 +337,16 @@ let bg_thread ~out (events : event B_queue.t) : unit =
(Span_tbl.length spans); (Span_tbl.length spans);
() ()
(** Current tick. *)
let cur_tick_ : int A.t = A.make 0
(** Thread that simply regularly "ticks", sending events to (** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file *) the background thread so it has a chance to write to the file *)
let tick_thread events : unit = let tick_thread events : unit =
try try
while true do while true do
Thread.delay 0.5; Thread.delay 0.5;
A.incr cur_tick_;
B_queue.push events E_tick B_queue.push events E_tick
done done
with B_queue.Closed -> () with B_queue.Closed -> ()
@ -352,6 +357,16 @@ type output =
| `File of string | `File of string
] ]
type 'a batch = {
mutable last_tick: int;
batch: 'a Queue.t;
}
(** Key to access the batch for the current thread *)
let k_tls_batch : event batch TLS.key =
TLS.new_key (fun () ->
{ last_tick = A.get cur_tick_; batch = Queue.create () })
let collector ~out () : collector = let collector ~out () : collector =
let module M = struct let module M = struct
let active = A.make true let active = A.make true
@ -371,6 +386,8 @@ let collector ~out () : collector =
let shutdown () = let shutdown () =
if A.exchange active false then ( if A.exchange active false then (
(* FIXME: how do we make sure all batches are emptied
before we close [events]? *)
B_queue.close events; B_queue.close events;
(* wait for writer thread to be done. The writer thread will exit (* wait for writer thread to be done. The writer thread will exit
after processing remaining events because the queue is now closed *) after processing remaining events because the queue is now closed *)

View file

@ -14,6 +14,7 @@ depends: [
"trace" {= version} "trace" {= version}
"mtime" {>= "2.0"} "mtime" {>= "2.0"}
"base-unix" "base-unix"
"thread-local-storage"
"dune" {>= "2.9"} "dune" {>= "2.9"}
"odoc" {with-doc} "odoc" {with-doc}
] ]