diff --git a/src/fuchsia/bg_thread.ml b/src/fuchsia/bg_thread.ml index 8664452..b8c9100 100644 --- a/src/fuchsia/bg_thread.ml +++ b/src/fuchsia/bg_thread.ml @@ -66,11 +66,10 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit = (** Thread that simply regularly "ticks", sending events to the background thread so it has a chance to write to the file, and call [f()] *) -let tick_thread events ~f : unit = +let tick_thread events : unit = try while true do Thread.delay 0.5; - B_queue.push events E_tick; - f () + B_queue.push events E_tick done with B_queue.Closed -> () diff --git a/src/fuchsia/fcollector.ml b/src/fuchsia/fcollector.ml index 5c661c2..d66a822 100644 --- a/src/fuchsia/fcollector.ml +++ b/src/fuchsia/fcollector.ml @@ -43,7 +43,9 @@ type state = { bg_thread: Thread.t; buf_pool: Buf_pool.t; next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) - threads: per_thread_state Int_map.t A.t; + per_thread: per_thread_state Int_map.t A.t; + (** the state keeps tabs on thread-local state, so it can flush writers + at the end *) } let key_thread_local_st : per_thread_state TLS.key = @@ -60,15 +62,7 @@ let key_thread_local_st : per_thread_state TLS.key = let out_of_st (st : state) : Output.t = FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf -> - if A.get st.active then ( - try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> () - )) - -let flush_all_ (st : state) = - let outs = A.get st.threads in - - Int_map.iter (fun _tid (tls : per_thread_state) -> ()) outs; - () + try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ()) module C (St : sig val st : state @@ -94,12 +88,24 @@ struct (* add to [st]'s list of threads *) while - let old = A.get st.threads in - not (A.compare_and_set st.threads old (Int_map.add self.tid self old)) + let old = A.get st.per_thread in + not (A.compare_and_set st.per_thread old (Int_map.add self.tid self old)) do () done; + let on_exit _ = + while + let old = A.get st.per_thread in + not (A.compare_and_set st.per_thread old (Int_map.remove self.tid old)) + do + () + done; + Option.iter Output.flush self.out + in + + (* after thread exits, flush output and remove from global list *) + Gc.finalise on_exit (Thread.self ()); () (** Obtain the output for the current thread *) @@ -111,7 +117,15 @@ struct let shutdown () = if A.exchange st.active false then ( (* flush all outputs *) - flush_all_ st; + let tls_l = A.get st.per_thread in + + (* FIXME: there's a potential race condition here. How to fix it + without overhead on every regular event? *) + Int_map.iter + (fun _tid (tls : per_thread_state) -> + Printf.eprintf "flush for %d\n%!" tls.tid; + Option.iter Output.flush tls.out) + tls_l; B_queue.close st.events; (* wait for writer thread to be done. The writer thread will exit @@ -248,14 +262,11 @@ let create ~out () : collector = events; span_id_gen = A.make 0; next_thread_ref = A.make 1; - threads = A.make Int_map.empty; + per_thread = A.make Int_map.empty; } in - let _tick_thread = - Thread.create (fun () -> - Bg_thread.tick_thread events ~f:(fun () -> flush_all_ st)) - in + let _tick_thread = Thread.create (fun () -> Bg_thread.tick_thread events) in (* write header *) let out = out_of_st st in diff --git a/src/util/cpu_relax.real.ml b/src/util/cpu_relax.real.ml deleted file mode 100644 index f3dab5c..0000000 --- a/src/util/cpu_relax.real.ml +++ /dev/null @@ -1 +0,0 @@ -let cpu_relax = Domain.cpu_relax diff --git a/src/util/cpu_relax.dummy.ml b/src/util/domain_util.dummy.ml similarity index 51% rename from src/util/cpu_relax.dummy.ml rename to src/util/domain_util.dummy.ml index 3c5fd6f..2a59baf 100644 --- a/src/util/cpu_relax.dummy.ml +++ b/src/util/domain_util.dummy.ml @@ -1 +1,2 @@ let cpu_relax () = () +let n_domains () = 1 diff --git a/src/util/cpu_relax.mli b/src/util/domain_util.mli similarity index 50% rename from src/util/cpu_relax.mli rename to src/util/domain_util.mli index 17542a8..666b1f5 100644 --- a/src/util/cpu_relax.mli +++ b/src/util/domain_util.mli @@ -1 +1,2 @@ val cpu_relax : unit -> unit +val n_domains : unit -> int diff --git a/src/util/domain_util.real.ml b/src/util/domain_util.real.ml new file mode 100644 index 0000000..ea4c225 --- /dev/null +++ b/src/util/domain_util.real.ml @@ -0,0 +1,2 @@ +let cpu_relax = Domain.cpu_relax +let n_domains = Domain.recommended_domain_count diff --git a/src/util/dune b/src/util/dune index fc5fb6f..7d7f2a4 100644 --- a/src/util/dune +++ b/src/util/dune @@ -4,6 +4,6 @@ (synopsis "internal utilities for trace. No guarantees of stability.") (name trace_private_util) (libraries trace.core mtime mtime.clock.os atomic unix threads - (select cpu_relax.ml from - (base-domain -> cpu_relax.real.ml) - ( -> cpu_relax.dummy.ml)))) + (select domain_util.ml from + (base-domain -> domain_util.real.ml) + ( -> domain_util.dummy.ml)))) diff --git a/src/util/mpsc_bag.ml b/src/util/mpsc_bag.ml index 453357f..a8f49aa 100644 --- a/src/util/mpsc_bag.ml +++ b/src/util/mpsc_bag.ml @@ -11,7 +11,7 @@ module Backoff = struct let once (b : t) : t = for _i = 1 to b do - Cpu_relax.cpu_relax () + Domain_util.cpu_relax () done; min (b * 2) 256 end