client: add sampler; batch and sampler are now emitter transformers

This commit is contained in:
Simon Cruanes 2025-12-04 10:16:55 -05:00
parent 755e24a1e8
commit bb6d83483c
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 127 additions and 1 deletions

View file

@ -87,3 +87,46 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
) )
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
open Opentelemetry_emitter
let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
let closed () = e.closed () in
let flush_and_close () =
(* FIXME: we need to close the batch first, to prevent
further pushes; then write the content to [e]; then
flusn and close [e]. In this order. *)
(match pop_if_ready self ~force:true ~now:Mtime.max_stamp with
| None -> ()
| Some l -> Emitter.emit e l);
Emitter.flush_and_close e
in
let maybe_emit ~now =
match pop_if_ready self ~force:false ~now with
| None -> ()
| Some l -> Emitter.emit e l
in
let tick ~now =
(* first, check if batch has timed out *)
maybe_emit ~now;
(* only then, tick the underlying emitter *)
Emitter.tick e ~now
in
let emit l =
if l <> [] then (
push' self l;
(* TODO: it'd be nice if we checked only for size here, not
for timeout. The [tick] function is enough for timeouts,
whereas [emit] is in the hot path of every single span/metric/log *)
let now = Mtime_clock.now () in
maybe_emit ~now
)
in
{ Emitter.closed; flush_and_close; tick; emit }

View file

@ -53,3 +53,9 @@ val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
val push' : 'a t -> 'a list -> unit val push' : 'a t -> 'a list -> unit
(** Like {!push} but ignores the result *) (** Like {!push} but ignores the result *)
open Opentelemetry_emitter
val wrap_emitter : 'a t -> 'a Emitter.t -> 'a Emitter.t
(** [batch_emitter batch e] is an emitter that uses batch [batch] to gather
signals into larger lists before passing them to [e]. *)

View file

@ -1,6 +1,12 @@
(library (library
(name opentelemetry_client) (name opentelemetry_client)
(public_name opentelemetry.client) (public_name opentelemetry.client)
(libraries opentelemetry opentelemetry.proto pbrt mtime mtime.clock.os) (libraries
opentelemetry
opentelemetry.emitter
opentelemetry.proto
pbrt
mtime
mtime.clock.os)
(synopsis (synopsis
"Basic exporters, as well as Common types and logic shared between exporters")) "Basic exporters, as well as Common types and logic shared between exporters"))

46
src/client/sampler.ml Normal file
View file

@ -0,0 +1,46 @@
type t = {
proba_accept: float;
n_seen: int Atomic.t;
n_accepted: int Atomic.t;
}
let create ~proba_accept () : t =
(* FIXME: either czzry a random state and protect it, or make sure
we Random.self_init() in the current domain?? *)
if proba_accept < 0. || proba_accept > 1. then
invalid_arg "sampler: proba_accept must be in [0., 1.]";
{ proba_accept; n_seen = Atomic.make 0; n_accepted = Atomic.make 0 }
let[@inline] proba_accept self = self.proba_accept
let actual_rate (self : t) : float =
let accept = Atomic.get self.n_accepted in
let total = Atomic.get self.n_seen in
if total = 0 then
1.
else
float accept /. float total
let accept (self : t) : bool =
Atomic.incr self.n_seen;
let n = Random.float 1. in
let res = n < self.proba_accept in
if res then Atomic.incr self.n_accepted;
res
open Opentelemetry_emitter
let wrap_emitter (self : t) (e : _ Emitter.t) : _ Emitter.t =
let closed () = Emitter.closed e in
let flush_and_close () = Emitter.flush_and_close e in
let tick ~now = Emitter.tick e ~now in
let emit l =
let accepted = List.filter (fun _x -> accept self) l in
if accepted <> [] then Emitter.emit e accepted
in
{ Emitter.closed; flush_and_close; tick; emit }

25
src/client/sampler.mli Normal file
View file

@ -0,0 +1,25 @@
(** Basic random sampling *)
type t
val create : proba_accept:float -> unit -> t
(** [create ~proba_accept:n ()] makes a new sampler.
The sampler will accept signals with probability [n] (must be between 0 and
1).
@raise Invalid_argument if [n] is not between 0 and 1. *)
val accept : t -> bool
(** Do we accept a sample? This returns [true] with probability [proba_accept].
*)
val proba_accept : t -> float
val actual_rate : t -> float
(** The ratio of signals we actually accepted so far *)
open Opentelemetry_emitter
val wrap_emitter : t -> 'a Emitter.t -> 'a Emitter.t
(** [wrap_emitter sampler e] is a new emitter that uses the [sampler] on each
individual signal before passing them to [e]. *)