generic consumer: sanity check on queue size

This commit is contained in:
Simon Cruanes 2025-12-09 22:46:24 -05:00
parent 4a61ab44d9
commit fda87007a8
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -83,13 +83,19 @@ end = struct
(** Shutdown one worker, when the queue is closed *)
let shutdown_worker (self : state) : unit =
(* let tid = Thread.id @@ Thread.self () in
Printf.eprintf "worker %d: shutting down\n%!" tid; *)
if Atomic.fetch_and_add self.n_workers (-1) = 1 then (
(* we were the last worker *)
(* Printf.eprintf "worker %d: last one!\n%!" tid; *)
(* we were the last worker, we can shut down the whole consumer *)
Atomic.set self.status Stopped;
Aswitch.turn_off self.active_trigger
Aswitch.turn_off self.active_trigger;
(* sanity check about the queue, which should be drained *)
let size_q = Bounded_queue.Recv.size self.q in
if size_q > 0 then
Printf.eprintf
"otel: warning: workers exited but work queue still contains %d \
elements\n\
%!"
size_q
)
let send_signals (self : state) (sender : Sender.t) ~backoff