do not drop events still in buffers at exit

This commit is contained in:
Simon Cruanes 2023-12-26 01:14:14 -05:00
parent 713cf6b4cf
commit 56d3117d06
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
8 changed files with 39 additions and 26 deletions

View file

@ -66,11 +66,10 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
(** Thread that simply regularly "ticks", sending events to (** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file, the background thread so it has a chance to write to the file,
and call [f()] *) and call [f()] *)
let tick_thread events ~f : unit = let tick_thread events : unit =
try try
while true do while true do
Thread.delay 0.5; Thread.delay 0.5;
B_queue.push events E_tick; B_queue.push events E_tick
f ()
done done
with B_queue.Closed -> () with B_queue.Closed -> ()

View file

@ -43,7 +43,9 @@ type state = {
bg_thread: Thread.t; bg_thread: Thread.t;
buf_pool: Buf_pool.t; buf_pool: Buf_pool.t;
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
threads: per_thread_state Int_map.t A.t; per_thread: per_thread_state Int_map.t A.t;
(** the state keeps tabs on thread-local state, so it can flush writers
at the end *)
} }
let key_thread_local_st : per_thread_state TLS.key = let key_thread_local_st : per_thread_state TLS.key =
@ -60,15 +62,7 @@ let key_thread_local_st : per_thread_state TLS.key =
let out_of_st (st : state) : Output.t = let out_of_st (st : state) : Output.t =
FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf -> FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf ->
if A.get st.active then ( try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ())
try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ()
))
let flush_all_ (st : state) =
let outs = A.get st.threads in
Int_map.iter (fun _tid (tls : per_thread_state) -> ()) outs;
()
module C (St : sig module C (St : sig
val st : state val st : state
@ -94,12 +88,24 @@ struct
(* add to [st]'s list of threads *) (* add to [st]'s list of threads *)
while while
let old = A.get st.threads in let old = A.get st.per_thread in
not (A.compare_and_set st.threads old (Int_map.add self.tid self old)) not (A.compare_and_set st.per_thread old (Int_map.add self.tid self old))
do do
() ()
done; done;
let on_exit _ =
while
let old = A.get st.per_thread in
not (A.compare_and_set st.per_thread old (Int_map.remove self.tid old))
do
()
done;
Option.iter Output.flush self.out
in
(* after thread exits, flush output and remove from global list *)
Gc.finalise on_exit (Thread.self ());
() ()
(** Obtain the output for the current thread *) (** Obtain the output for the current thread *)
@ -111,7 +117,15 @@ struct
let shutdown () = let shutdown () =
if A.exchange st.active false then ( if A.exchange st.active false then (
(* flush all outputs *) (* flush all outputs *)
flush_all_ st; let tls_l = A.get st.per_thread in
(* FIXME: there's a potential race condition here. How to fix it
without overhead on every regular event? *)
Int_map.iter
(fun _tid (tls : per_thread_state) ->
Printf.eprintf "flush for %d\n%!" tls.tid;
Option.iter Output.flush tls.out)
tls_l;
B_queue.close st.events; B_queue.close st.events;
(* wait for writer thread to be done. The writer thread will exit (* wait for writer thread to be done. The writer thread will exit
@ -248,14 +262,11 @@ let create ~out () : collector =
events; events;
span_id_gen = A.make 0; span_id_gen = A.make 0;
next_thread_ref = A.make 1; next_thread_ref = A.make 1;
threads = A.make Int_map.empty; per_thread = A.make Int_map.empty;
} }
in in
let _tick_thread = let _tick_thread = Thread.create (fun () -> Bg_thread.tick_thread events) in
Thread.create (fun () ->
Bg_thread.tick_thread events ~f:(fun () -> flush_all_ st))
in
(* write header *) (* write header *)
let out = out_of_st st in let out = out_of_st st in

View file

@ -1 +0,0 @@
let cpu_relax = Domain.cpu_relax

View file

@ -1 +1,2 @@
let cpu_relax () = () let cpu_relax () = ()
let n_domains () = 1

View file

@ -1 +1,2 @@
val cpu_relax : unit -> unit val cpu_relax : unit -> unit
val n_domains : unit -> int

View file

@ -0,0 +1,2 @@
let cpu_relax = Domain.cpu_relax
let n_domains = Domain.recommended_domain_count

View file

@ -4,6 +4,6 @@
(synopsis "internal utilities for trace. No guarantees of stability.") (synopsis "internal utilities for trace. No guarantees of stability.")
(name trace_private_util) (name trace_private_util)
(libraries trace.core mtime mtime.clock.os atomic unix threads (libraries trace.core mtime mtime.clock.os atomic unix threads
(select cpu_relax.ml from (select domain_util.ml from
(base-domain -> cpu_relax.real.ml) (base-domain -> domain_util.real.ml)
( -> cpu_relax.dummy.ml)))) ( -> domain_util.dummy.ml))))

View file

@ -11,7 +11,7 @@ module Backoff = struct
let once (b : t) : t = let once (b : t) : t =
for _i = 1 to b do for _i = 1 to b do
Cpu_relax.cpu_relax () Domain_util.cpu_relax ()
done; done;
min (b * 2) 256 min (b * 2) 256
end end