perf: when we get multiple messages, check batches only once

This commit is contained in:
Simon Cruanes 2023-12-20 14:01:24 -05:00
parent 47f7f1d110
commit 5005c3aa9b
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -266,39 +266,36 @@ end = struct
(* read multiple events at once *)
B_queue.pop_all self.q local_q;
let now = Mtime_clock.now () in
(* are we asked to flush all events? *)
let must_flush_all = ref false in
(* how to process a single event *)
let process_ev (ev : Event.t) : unit =
match ev with
| Event.E_metric m ->
Batch.push batches.metrics m;
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ()
| Event.E_trace tr ->
Batch.push batches.traces tr;
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_logs logs ->
Batch.push batches.logs logs;
if should_send_batch_ ~config ~now batches.logs then send_logs ()
| Event.E_tick ->
(* check for batches whose timeout expired *)
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();
if should_send_batch_ ~config ~now batches.logs then send_logs ();
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_flush_all ->
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
| Event.E_metric m -> Batch.push batches.metrics m
| Event.E_trace tr -> Batch.push batches.traces tr
| Event.E_logs logs -> Batch.push batches.logs logs
| Event.E_tick -> ()
| Event.E_flush_all -> must_flush_all := true
in
while not (Queue.is_empty local_q) do
let ev = Queue.pop local_q in
process_ev ev
done
done;
if !must_flush_all then (
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
) else (
let now = Mtime_clock.now () in
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();
if should_send_batch_ ~config ~now batches.traces then send_traces ();
if should_send_batch_ ~config ~now batches.logs then send_logs ()
)
done
with B_queue.Closed -> ()