From 689b932c63ac63d0d607f43c6ae23db27d2ba186 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 4 Dec 2025 10:16:55 -0500 Subject: [PATCH] client: add sampler; batch and sampler are now emitter transformers --- src/client/batch.ml | 43 +++++++++++++++++++++++++++++++++++++++ src/client/batch.mli | 6 ++++++ src/client/dune | 8 +++++++- src/client/sampler.ml | 46 ++++++++++++++++++++++++++++++++++++++++++ src/client/sampler.mli | 25 +++++++++++++++++++++++ 5 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 src/client/sampler.ml create mode 100644 src/client/sampler.mli diff --git a/src/client/batch.ml b/src/client/batch.ml index d26cb04f..62cd8703 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -87,3 +87,46 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = ) let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) + +open Opentelemetry_emitter + +let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t = + let closed () = e.closed () in + let flush_and_close () = + (* FIXME: we need to close the batch first, to prevent + further pushes; then write the content to [e]; then + flusn and close [e]. In this order. *) + (match pop_if_ready self ~force:true ~now:Mtime.max_stamp with + | None -> () + | Some l -> Emitter.emit e l); + + Emitter.flush_and_close e + in + + let maybe_emit ~now = + match pop_if_ready self ~force:false ~now with + | None -> () + | Some l -> Emitter.emit e l + in + + let tick ~now = + (* first, check if batch has timed out *) + maybe_emit ~now; + + (* only then, tick the underlying emitter *) + Emitter.tick e ~now + in + + let emit l = + if l <> [] then ( + push' self l; + + (* TODO: it'd be nice if we checked only for size here, not + for timeout. The [tick] function is enough for timeouts, + whereas [emit] is in the hot path of every single span/metric/log *) + let now = Mtime_clock.now () in + maybe_emit ~now + ) + in + + { Emitter.closed; flush_and_close; tick; emit } diff --git a/src/client/batch.mli b/src/client/batch.mli index c3b6f7e1..f50e1675 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -53,3 +53,9 @@ val push : 'a t -> 'a list -> [ `Dropped | `Ok ] val push' : 'a t -> 'a list -> unit (** Like {!push} but ignores the result *) + +open Opentelemetry_emitter + +val wrap_emitter : 'a t -> 'a Emitter.t -> 'a Emitter.t +(** [batch_emitter batch e] is an emitter that uses batch [batch] to gather + signals into larger lists before passing them to [e]. *) diff --git a/src/client/dune b/src/client/dune index 36f6ee5c..48e836d7 100644 --- a/src/client/dune +++ b/src/client/dune @@ -1,6 +1,12 @@ (library (name opentelemetry_client) (public_name opentelemetry.client) - (libraries opentelemetry opentelemetry.proto pbrt mtime mtime.clock.os) + (libraries + opentelemetry + opentelemetry.emitter + opentelemetry.proto + pbrt + mtime + mtime.clock.os) (synopsis "Basic exporters, as well as Common types and logic shared between exporters")) diff --git a/src/client/sampler.ml b/src/client/sampler.ml new file mode 100644 index 00000000..ecaece0a --- /dev/null +++ b/src/client/sampler.ml @@ -0,0 +1,46 @@ +type t = { + proba_accept: float; + n_seen: int Atomic.t; + n_accepted: int Atomic.t; +} + +let create ~proba_accept () : t = + (* FIXME: either czzry a random state and protect it, or make sure + we Random.self_init() in the current domain?? *) + if proba_accept < 0. || proba_accept > 1. then + invalid_arg "sampler: proba_accept must be in [0., 1.]"; + { proba_accept; n_seen = Atomic.make 0; n_accepted = Atomic.make 0 } + +let[@inline] proba_accept self = self.proba_accept + +let actual_rate (self : t) : float = + let accept = Atomic.get self.n_accepted in + let total = Atomic.get self.n_seen in + + if total = 0 then + 1. + else + float accept /. float total + +let accept (self : t) : bool = + Atomic.incr self.n_seen; + + let n = Random.float 1. in + let res = n < self.proba_accept in + + if res then Atomic.incr self.n_accepted; + res + +open Opentelemetry_emitter + +let wrap_emitter (self : t) (e : _ Emitter.t) : _ Emitter.t = + let closed () = Emitter.closed e in + let flush_and_close () = Emitter.flush_and_close e in + let tick ~now = Emitter.tick e ~now in + + let emit l = + let accepted = List.filter (fun _x -> accept self) l in + if accepted <> [] then Emitter.emit e accepted + in + + { Emitter.closed; flush_and_close; tick; emit } diff --git a/src/client/sampler.mli b/src/client/sampler.mli new file mode 100644 index 00000000..784c8346 --- /dev/null +++ b/src/client/sampler.mli @@ -0,0 +1,25 @@ +(** Basic random sampling *) + +type t + +val create : proba_accept:float -> unit -> t +(** [create ~proba_accept:n ()] makes a new sampler. + + The sampler will accept signals with probability [n] (must be between 0 and + 1). + @raise Invalid_argument if [n] is not between 0 and 1. *) + +val accept : t -> bool +(** Do we accept a sample? This returns [true] with probability [proba_accept]. +*) + +val proba_accept : t -> float + +val actual_rate : t -> float +(** The ratio of signals we actually accepted so far *) + +open Opentelemetry_emitter + +val wrap_emitter : t -> 'a Emitter.t -> 'a Emitter.t +(** [wrap_emitter sampler e] is a new emitter that uses the [sampler] on each + individual signal before passing them to [e]. *)