mirror of
https://github.com/ocaml-tracing/ocaml-trace.git
synced 2026-03-07 18:37:56 -05:00
perf(trace-tef): improve behavior of collector under contention
writers still do one lock+unlock per event, but the collector can read multiple events at once using Queue.transfer.
This commit is contained in:
parent
261f143dd2
commit
1886ee737e
3 changed files with 38 additions and 6 deletions
|
|
@ -51,3 +51,22 @@ let pop (self : 'a t) : 'a =
|
||||||
)
|
)
|
||||||
in
|
in
|
||||||
loop ()
|
loop ()
|
||||||
|
|
||||||
|
let transfer (self : 'a t) q2 : unit =
|
||||||
|
Mutex.lock self.mutex;
|
||||||
|
while
|
||||||
|
if Queue.is_empty self.q then (
|
||||||
|
if self.closed then (
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
raise Closed
|
||||||
|
);
|
||||||
|
Condition.wait self.cond self.mutex;
|
||||||
|
true
|
||||||
|
) else (
|
||||||
|
Queue.transfer self.q q2;
|
||||||
|
Mutex.unlock self.mutex;
|
||||||
|
false
|
||||||
|
)
|
||||||
|
do
|
||||||
|
()
|
||||||
|
done
|
||||||
|
|
|
||||||
|
|
@ -14,5 +14,10 @@ val pop : 'a t -> 'a
|
||||||
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
(** [pop q] pops the next element in [q]. It might block until an element comes.
|
||||||
@raise Closed if the queue was closed before a new element was available. *)
|
@raise Closed if the queue was closed before a new element was available. *)
|
||||||
|
|
||||||
|
val transfer : 'a t -> 'a Queue.t -> unit
|
||||||
|
(** [transfer bq q2] transfers all items presently
|
||||||
|
in [bq] into [q2], and clears [bq].
|
||||||
|
It blocks if no element is in [bq]. *)
|
||||||
|
|
||||||
val close : _ t -> unit
|
val close : _ t -> unit
|
||||||
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
(** Close the queue, meaning there won't be any more [push] allowed. *)
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,7 @@ module Writer = struct
|
||||||
let ts = start in
|
let ts = start in
|
||||||
emit_sep_ self;
|
emit_sep_ self;
|
||||||
Printf.fprintf self.oc
|
Printf.fprintf self.oc
|
||||||
{json|{"pid": %d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json}
|
{json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json}
|
||||||
self.pid tid dur ts str_val name
|
self.pid tid dur ts str_val name
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args;
|
args;
|
||||||
|
|
@ -164,7 +164,7 @@ module Writer = struct
|
||||||
let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit =
|
let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit =
|
||||||
emit_sep_ self;
|
emit_sep_ self;
|
||||||
Printf.fprintf self.oc
|
Printf.fprintf self.oc
|
||||||
{json|{"pid": %d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json}
|
{json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json}
|
||||||
self.pid tid ts str_val name
|
self.pid tid ts str_val name
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
args;
|
args;
|
||||||
|
|
@ -173,7 +173,7 @@ module Writer = struct
|
||||||
let emit_name_thread ~tid ~name (self : t) : unit =
|
let emit_name_thread ~tid ~name (self : t) : unit =
|
||||||
emit_sep_ self;
|
emit_sep_ self;
|
||||||
Printf.fprintf self.oc
|
Printf.fprintf self.oc
|
||||||
{json|{"pid": %d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid
|
{json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid
|
||||||
tid
|
tid
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
[ "name", `String name ];
|
[ "name", `String name ];
|
||||||
|
|
@ -182,7 +182,7 @@ module Writer = struct
|
||||||
let emit_name_process ~name (self : t) : unit =
|
let emit_name_process ~name (self : t) : unit =
|
||||||
emit_sep_ self;
|
emit_sep_ self;
|
||||||
Printf.fprintf self.oc
|
Printf.fprintf self.oc
|
||||||
{json|{"pid": %d,"name":"process_name","ph":"M"%a}|json} self.pid
|
{json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid
|
||||||
(emit_args_o_ pp_user_data_)
|
(emit_args_o_ pp_user_data_)
|
||||||
[ "name", `String name ];
|
[ "name", `String name ];
|
||||||
()
|
()
|
||||||
|
|
@ -201,6 +201,7 @@ let bg_thread ~out (events : event B_queue.t) : unit =
|
||||||
let writer = Writer.create ~out () in
|
let writer = Writer.create ~out () in
|
||||||
protect ~finally:(fun () -> Writer.close writer) @@ fun () ->
|
protect ~finally:(fun () -> Writer.close writer) @@ fun () ->
|
||||||
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
|
let spans : span_info Span_tbl.t = Span_tbl.create 32 in
|
||||||
|
let local_q = Queue.create () in
|
||||||
|
|
||||||
(* how to deal with an event *)
|
(* how to deal with an event *)
|
||||||
let handle_ev (ev : event) : unit =
|
let handle_ev (ev : event) : unit =
|
||||||
|
|
@ -225,8 +226,15 @@ let bg_thread ~out (events : event B_queue.t) : unit =
|
||||||
|
|
||||||
try
|
try
|
||||||
while true do
|
while true do
|
||||||
let ev = B_queue.pop events in
|
(* work on local events, already on this thread *)
|
||||||
handle_ev ev
|
while not (Queue.is_empty local_q) do
|
||||||
|
let ev = Queue.pop local_q in
|
||||||
|
handle_ev ev
|
||||||
|
done;
|
||||||
|
|
||||||
|
(* get all the events in the incoming blocking queue, in
|
||||||
|
one single critical section. *)
|
||||||
|
B_queue.transfer events local_q
|
||||||
done
|
done
|
||||||
with B_queue.Closed ->
|
with B_queue.Closed ->
|
||||||
(* warn if app didn't close all spans *)
|
(* warn if app didn't close all spans *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue