add Emitter_add_batching to client library

fronting an emitter with a batch belongs in its own module
This commit is contained in:
Simon Cruanes 2026-01-12 20:09:41 -05:00
parent 07d8357cfb
commit 008ae6ddfd
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 96 additions and 72 deletions

View file

@ -123,56 +123,6 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
open Opentelemetry_emitter module Internal_ = struct
let mtime_dummy_ = mtime_dummy_
(** Emit current batch, if the conditions are met *) end
let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~mtime : unit =
match pop_if_ready self ~force:false ~mtime with
| None -> ()
| Some l -> Emitter.emit e l
let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
(* we need to be able to close this emitter before we close [e]. This
will become [true] when we close, then we call [Emitter.flush_and_close e],
then [e] itself will be closed. *)
let closed_here = Atomic.make false in
let enabled () = (not (Atomic.get closed_here)) && e.enabled () in
let closed () = Atomic.get closed_here || e.closed () in
let flush_and_close () =
if not (Atomic.exchange closed_here true) then (
(* NOTE: we need to close this wrapping emitter first, to prevent
further pushes; then write the content to [e]; then
flusn and close [e]. In this order. *)
(match pop_if_ready self ~force:true ~mtime:mtime_dummy_ with
| None -> ()
| Some l -> Emitter.emit e l);
(* now we can close [e], nothing remains in [self] *)
Emitter.flush_and_close e
)
in
let tick ~mtime =
if not (Atomic.get closed_here) then (
(* first, check if batch has timed out *)
maybe_emit_ self ~e ~mtime;
(* only then, tick the underlying emitter *)
Emitter.tick e ~mtime
)
in
let emit l =
if l <> [] && not (Atomic.get closed_here) then (
(* Printf.eprintf "otel.batch.add %d items\n%!" (List.length l); *)
push' self l;
(* we only check for size here, not for timeout. The [tick] function is
enough for timeouts, whereas [emit] is in the hot path of every single
span/metric/log *)
maybe_emit_ self ~e ~mtime:mtime_dummy_
)
in
{ Emitter.closed; enabled; flush_and_close; tick; emit }

View file

@ -56,8 +56,10 @@ val push' : 'a t -> 'a list -> unit
val cur_size : _ t -> int val cur_size : _ t -> int
(** Number of elements in the current batch *) (** Number of elements in the current batch *)
open Opentelemetry_emitter (**/**)
val wrap_emitter : 'a t -> 'a Emitter.t -> 'a Emitter.t module Internal_ : sig
(** [wrap_emitter batch e] is an emitter that uses batch [batch] to gather val mtime_dummy_ : Mtime.t
signals into larger lists before passing them to [e]. *) end
(**/**)

View file

@ -0,0 +1,64 @@
open Opentelemetry_emitter
(** Emit current batch, if the conditions are met *)
let maybe_emit_ (b : _ Batch.t) ~(e : _ Emitter.t) ~mtime : unit =
match Batch.pop_if_ready b ~force:false ~mtime with
| None -> ()
| Some l -> Emitter.emit e l
let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t =
(* we need to be able to close this emitter before we close [e]. This
will become [true] when we close, then we call [Emitter.flush_and_close e],
then [e] itself will be closed. *)
let closed_here = Atomic.make false in
let enabled () = (not (Atomic.get closed_here)) && e.enabled () in
let closed () = Atomic.get closed_here || e.closed () in
let flush_and_close () =
if not (Atomic.exchange closed_here true) then (
(* NOTE: we need to close this wrapping emitter first, to prevent
further pushes; then write the content to [e]; then
flusn and close [e]. In this order. *)
(match
Batch.pop_if_ready self ~force:true ~mtime:Batch.Internal_.mtime_dummy_
with
| None -> ()
| Some l -> Emitter.emit e l);
(* now we can close [e], nothing remains in [self] *)
Emitter.flush_and_close e
)
in
let tick ~mtime =
if not (Atomic.get closed_here) then (
(* first, check if batch has timed out *)
maybe_emit_ self ~e ~mtime;
(* only then, tick the underlying emitter *)
Emitter.tick e ~mtime
)
in
let emit l =
if l <> [] && not (Atomic.get closed_here) then (
(* Printf.eprintf "otel.batch.add %d items\n%!" (List.length l); *)
Batch.push' self l;
(* we only check for size here, not for timeout. The [tick] function is
enough for timeouts, whereas [emit] is in the hot path of every single
span/metric/log *)
maybe_emit_ self ~e ~mtime:Batch.Internal_.mtime_dummy_
)
in
{ Emitter.closed; enabled; flush_and_close; tick; emit }
let add_batching ~timeout ~batch_size (emitter : 'a Emitter.t) : 'a Emitter.t =
let b = Batch.make ~batch:batch_size ~timeout () in
wrap_emitter_with_batch b emitter
let add_batching_opt ~timeout ~batch_size:(b : int option) e =
match b with
| None -> e
| Some b -> add_batching ~timeout ~batch_size:b e

View file

@ -0,0 +1,11 @@
open Opentelemetry_emitter
val wrap_emitter_with_batch : 'a Batch.t -> 'a Emitter.t -> 'a Emitter.t
(** [wrap_emitter_with_batch batch e] is an emitter that uses batch [batch] to
gather signals into larger lists before passing them to [e]. *)
val add_batching :
timeout:Mtime.span -> batch_size:int -> 'a Emitter.t -> 'a Emitter.t
val add_batching_opt :
timeout:Mtime.span -> batch_size:int option -> 'a Emitter.t -> 'a Emitter.t

View file

@ -6,26 +6,23 @@
open Common_ open Common_
open struct
let add_batch ~timeout batch (emitter : 'a OTEL.Emitter.t) : 'a OTEL.Emitter.t
=
let b = Batch.make ~batch ~timeout () in
Batch.wrap_emitter b emitter
end
(** Given an exporter, add batches for each emitter according to [config]. *) (** Given an exporter, add batches for each emitter according to [config]. *)
let add_batching ~(config : Http_config.t) (exp : OTEL.Exporter.t) : let add_batching ~(config : Http_config.t) (exp : OTEL.Exporter.t) :
OTEL.Exporter.t = OTEL.Exporter.t =
let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in
let add_batch_opt (b : int option) e =
match b with
| None -> e
| Some b -> add_batch ~timeout b e
in
let emit_spans = add_batch_opt config.batch_traces exp.emit_spans in let emit_spans =
let emit_metrics = add_batch_opt config.batch_metrics exp.emit_metrics in Emitter_add_batching.add_batching_opt ~timeout
let emit_logs = add_batch_opt config.batch_logs exp.emit_logs in ~batch_size:config.batch_traces exp.emit_spans
in
let emit_metrics =
Emitter_add_batching.add_batching_opt ~timeout
~batch_size:config.batch_metrics exp.emit_metrics
in
let emit_logs =
Emitter_add_batching.add_batching_opt ~timeout ~batch_size:config.batch_logs
exp.emit_logs
in
let active = exp.active in let active = exp.active in
let tick = exp.tick in let tick = exp.tick in