diff --git a/src/client/batch.ml b/src/client/batch.ml index 8be63d58..c1c70884 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -123,56 +123,6 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) -open Opentelemetry_emitter - -(** Emit current batch, if the conditions are met *) -let maybe_emit_ (self : _ t) ~(e : _ Emitter.t) ~mtime : unit = - match pop_if_ready self ~force:false ~mtime with - | None -> () - | Some l -> Emitter.emit e l - -let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = - (* we need to be able to close this emitter before we close [e]. This - will become [true] when we close, then we call [Emitter.flush_and_close e], - then [e] itself will be closed. *) - let closed_here = Atomic.make false in - - let enabled () = (not (Atomic.get closed_here)) && e.enabled () in - let closed () = Atomic.get closed_here || e.closed () in - let flush_and_close () = - if not (Atomic.exchange closed_here true) then ( - (* NOTE: we need to close this wrapping emitter first, to prevent - further pushes; then write the content to [e]; then - flusn and close [e]. In this order. *) - (match pop_if_ready self ~force:true ~mtime:mtime_dummy_ with - | None -> () - | Some l -> Emitter.emit e l); - - (* now we can close [e], nothing remains in [self] *) - Emitter.flush_and_close e - ) - in - - let tick ~mtime = - if not (Atomic.get closed_here) then ( - (* first, check if batch has timed out *) - maybe_emit_ self ~e ~mtime; - - (* only then, tick the underlying emitter *) - Emitter.tick e ~mtime - ) - in - - let emit l = - if l <> [] && not (Atomic.get closed_here) then ( - (* Printf.eprintf "otel.batch.add %d items\n%!" (List.length l); *) - push' self l; - - (* we only check for size here, not for timeout. The [tick] function is - enough for timeouts, whereas [emit] is in the hot path of every single - span/metric/log *) - maybe_emit_ self ~e ~mtime:mtime_dummy_ - ) - in - - { Emitter.closed; enabled; flush_and_close; tick; emit } +module Internal_ = struct + let mtime_dummy_ = mtime_dummy_ +end diff --git a/src/client/batch.mli b/src/client/batch.mli index 08ae88db..dc8e69a2 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -56,8 +56,10 @@ val push' : 'a t -> 'a list -> unit val cur_size : _ t -> int (** Number of elements in the current batch *) -open Opentelemetry_emitter +(**/**) -val wrap_emitter : 'a t -> 'a Emitter.t -> 'a Emitter.t -(** [wrap_emitter batch e] is an emitter that uses batch [batch] to gather - signals into larger lists before passing them to [e]. *) +module Internal_ : sig + val mtime_dummy_ : Mtime.t +end + +(**/**) diff --git a/src/client/emitter_add_batching.ml b/src/client/emitter_add_batching.ml new file mode 100644 index 00000000..bd2106d0 --- /dev/null +++ b/src/client/emitter_add_batching.ml @@ -0,0 +1,64 @@ +open Opentelemetry_emitter + +(** Emit current batch, if the conditions are met *) +let maybe_emit_ (b : _ Batch.t) ~(e : _ Emitter.t) ~mtime : unit = + match Batch.pop_if_ready b ~force:false ~mtime with + | None -> () + | Some l -> Emitter.emit e l + +let wrap_emitter_with_batch (self : _ Batch.t) (e : _ Emitter.t) : _ Emitter.t = + (* we need to be able to close this emitter before we close [e]. This + will become [true] when we close, then we call [Emitter.flush_and_close e], + then [e] itself will be closed. *) + let closed_here = Atomic.make false in + + let enabled () = (not (Atomic.get closed_here)) && e.enabled () in + let closed () = Atomic.get closed_here || e.closed () in + let flush_and_close () = + if not (Atomic.exchange closed_here true) then ( + (* NOTE: we need to close this wrapping emitter first, to prevent + further pushes; then write the content to [e]; then + flusn and close [e]. In this order. *) + (match + Batch.pop_if_ready self ~force:true ~mtime:Batch.Internal_.mtime_dummy_ + with + | None -> () + | Some l -> Emitter.emit e l); + + (* now we can close [e], nothing remains in [self] *) + Emitter.flush_and_close e + ) + in + + let tick ~mtime = + if not (Atomic.get closed_here) then ( + (* first, check if batch has timed out *) + maybe_emit_ self ~e ~mtime; + + (* only then, tick the underlying emitter *) + Emitter.tick e ~mtime + ) + in + + let emit l = + if l <> [] && not (Atomic.get closed_here) then ( + (* Printf.eprintf "otel.batch.add %d items\n%!" (List.length l); *) + Batch.push' self l; + + (* we only check for size here, not for timeout. The [tick] function is + enough for timeouts, whereas [emit] is in the hot path of every single + span/metric/log *) + maybe_emit_ self ~e ~mtime:Batch.Internal_.mtime_dummy_ + ) + in + + { Emitter.closed; enabled; flush_and_close; tick; emit } + +let add_batching ~timeout ~batch_size (emitter : 'a Emitter.t) : 'a Emitter.t = + let b = Batch.make ~batch:batch_size ~timeout () in + wrap_emitter_with_batch b emitter + +let add_batching_opt ~timeout ~batch_size:(b : int option) e = + match b with + | None -> e + | Some b -> add_batching ~timeout ~batch_size:b e diff --git a/src/client/emitter_add_batching.mli b/src/client/emitter_add_batching.mli new file mode 100644 index 00000000..1e919b6a --- /dev/null +++ b/src/client/emitter_add_batching.mli @@ -0,0 +1,11 @@ +open Opentelemetry_emitter + +val wrap_emitter_with_batch : 'a Batch.t -> 'a Emitter.t -> 'a Emitter.t +(** [wrap_emitter_with_batch batch e] is an emitter that uses batch [batch] to + gather signals into larger lists before passing them to [e]. *) + +val add_batching : + timeout:Mtime.span -> batch_size:int -> 'a Emitter.t -> 'a Emitter.t + +val add_batching_opt : + timeout:Mtime.span -> batch_size:int option -> 'a Emitter.t -> 'a Emitter.t diff --git a/src/client/exporter_add_batching.ml b/src/client/exporter_add_batching.ml index 4c5efe95..d5f495af 100644 --- a/src/client/exporter_add_batching.ml +++ b/src/client/exporter_add_batching.ml @@ -6,26 +6,23 @@ open Common_ -open struct - let add_batch ~timeout batch (emitter : 'a OTEL.Emitter.t) : 'a OTEL.Emitter.t - = - let b = Batch.make ~batch ~timeout () in - Batch.wrap_emitter b emitter -end - (** Given an exporter, add batches for each emitter according to [config]. *) let add_batching ~(config : Http_config.t) (exp : OTEL.Exporter.t) : OTEL.Exporter.t = let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in - let add_batch_opt (b : int option) e = - match b with - | None -> e - | Some b -> add_batch ~timeout b e - in - let emit_spans = add_batch_opt config.batch_traces exp.emit_spans in - let emit_metrics = add_batch_opt config.batch_metrics exp.emit_metrics in - let emit_logs = add_batch_opt config.batch_logs exp.emit_logs in + let emit_spans = + Emitter_add_batching.add_batching_opt ~timeout + ~batch_size:config.batch_traces exp.emit_spans + in + let emit_metrics = + Emitter_add_batching.add_batching_opt ~timeout + ~batch_size:config.batch_metrics exp.emit_metrics + in + let emit_logs = + Emitter_add_batching.add_batching_opt ~timeout ~batch_size:config.batch_logs + exp.emit_logs + in let active = exp.active in let tick = exp.tick in