diff --git a/src/client/batch.ml b/src/client/batch.ml index 94b69cab..7e75fb99 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -141,11 +141,13 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = in let tick ~now = - (* first, check if batch has timed out *) - maybe_emit ~now; + if not (Atomic.get closed_here) then ( + (* first, check if batch has timed out *) + maybe_emit ~now; - (* only then, tick the underlying emitter *) - Emitter.tick e ~now + (* only then, tick the underlying emitter *) + Emitter.tick e ~now + ) in let emit l = diff --git a/src/client/bounded_queue_sync.ml b/src/client/bounded_queue_sync.ml index 1b02ca26..f8bf507d 100644 --- a/src/client/bounded_queue_sync.ml +++ b/src/client/bounded_queue_sync.ml @@ -87,7 +87,9 @@ let push (self : _ state) x = match Q.push_while_not_full self.q ~high_watermark:self.high_watermark x with - | Closed -> Printf.eprintf "bounded queue: warning: queue is closed\n%!" + | Closed -> + Printf.eprintf "bounded queue: warning: queue is closed\n%!"; + ignore (Atomic.fetch_and_add self.n_discarded (List.length x) : int) | Pushed { num_discarded } -> if num_discarded > 0 then ( Printf.eprintf "DISCARD %d items\n%!" num_discarded; @@ -106,7 +108,11 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t = let push x = push self x in let on_non_empty = Cb_set.register self.on_non_empty in let try_pop () = try_pop self in - let close () = Q.close self.q in + let close () = + Q.close self.q; + (* waiters will want to know *) + Cb_set.trigger self.on_non_empty + in { BQ.push; num_discarded; try_pop; on_non_empty; close; closed } let create ~high_watermark () : _ BQ.t = diff --git a/src/client/exporter_queued.ml b/src/client/exporter_queued.ml index d9e287ca..969988d1 100644 --- a/src/client/exporter_queued.ml +++ b/src/client/exporter_queued.ml @@ -4,6 +4,10 @@ open Common_ module BQ = Bounded_queue module BQ_emitters = struct + (* NOTE: these emitters, when closed, don't close the bounded + queue because we need to flush_and_close the other emitters first. + The bounded queue is a shared resource. *) + let logs_emitter_of_bq ?service_name ?attrs (q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t = Bounded_queue.to_emitter q ~close_queue_on_close:false