wip: bugfixing

This commit is contained in:
Simon Cruanes 2025-12-08 16:04:30 -05:00
parent 87ccde2783
commit e3c6c41a0d
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 18 additions and 6 deletions

View file

@ -141,11 +141,13 @@ let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
in in
let tick ~now = let tick ~now =
if not (Atomic.get closed_here) then (
(* first, check if batch has timed out *) (* first, check if batch has timed out *)
maybe_emit ~now; maybe_emit ~now;
(* only then, tick the underlying emitter *) (* only then, tick the underlying emitter *)
Emitter.tick e ~now Emitter.tick e ~now
)
in in
let emit l = let emit l =

View file

@ -87,7 +87,9 @@ let push (self : _ state) x =
match match
Q.push_while_not_full self.q ~high_watermark:self.high_watermark x Q.push_while_not_full self.q ~high_watermark:self.high_watermark x
with with
| Closed -> Printf.eprintf "bounded queue: warning: queue is closed\n%!" | Closed ->
Printf.eprintf "bounded queue: warning: queue is closed\n%!";
ignore (Atomic.fetch_and_add self.n_discarded (List.length x) : int)
| Pushed { num_discarded } -> | Pushed { num_discarded } ->
if num_discarded > 0 then ( if num_discarded > 0 then (
Printf.eprintf "DISCARD %d items\n%!" num_discarded; Printf.eprintf "DISCARD %d items\n%!" num_discarded;
@ -106,7 +108,11 @@ let to_bounded_queue (self : 'a state) : 'a BQ.t =
let push x = push self x in let push x = push self x in
let on_non_empty = Cb_set.register self.on_non_empty in let on_non_empty = Cb_set.register self.on_non_empty in
let try_pop () = try_pop self in let try_pop () = try_pop self in
let close () = Q.close self.q in let close () =
Q.close self.q;
(* waiters will want to know *)
Cb_set.trigger self.on_non_empty
in
{ BQ.push; num_discarded; try_pop; on_non_empty; close; closed } { BQ.push; num_discarded; try_pop; on_non_empty; close; closed }
let create ~high_watermark () : _ BQ.t = let create ~high_watermark () : _ BQ.t =

View file

@ -4,6 +4,10 @@ open Common_
module BQ = Bounded_queue module BQ = Bounded_queue
module BQ_emitters = struct module BQ_emitters = struct
(* NOTE: these emitters, when closed, don't close the bounded
queue because we need to flush_and_close the other emitters first.
The bounded queue is a shared resource. *)
let logs_emitter_of_bq ?service_name ?attrs let logs_emitter_of_bq ?service_name ?attrs
(q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t = (q : Any_resource.t Bounded_queue.t) : OTEL.Logger.t =
Bounded_queue.to_emitter q ~close_queue_on_close:false Bounded_queue.to_emitter q ~close_queue_on_close:false