client: add sampler; batch and sampler are now emitter transformers

This commit is contained in:
Simon Cruanes 2025-12-04 10:16:55 -05:00
parent 114e2eb566
commit 689b932c63
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 127 additions and 1 deletions

View file

@ -87,3 +87,46 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
)
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
open Opentelemetry_emitter
let wrap_emitter (self : _ t) (e : _ Emitter.t) : _ Emitter.t =
let closed () = e.closed () in
let flush_and_close () =
(* FIXME: we need to close the batch first, to prevent
further pushes; then write the content to [e]; then
flusn and close [e]. In this order. *)
(match pop_if_ready self ~force:true ~now:Mtime.max_stamp with
| None -> ()
| Some l -> Emitter.emit e l);
Emitter.flush_and_close e
in
let maybe_emit ~now =
match pop_if_ready self ~force:false ~now with
| None -> ()
| Some l -> Emitter.emit e l
in
let tick ~now =
(* first, check if batch has timed out *)
maybe_emit ~now;
(* only then, tick the underlying emitter *)
Emitter.tick e ~now
in
let emit l =
if l <> [] then (
push' self l;
(* TODO: it'd be nice if we checked only for size here, not
for timeout. The [tick] function is enough for timeouts,
whereas [emit] is in the hot path of every single span/metric/log *)
let now = Mtime_clock.now () in
maybe_emit ~now
)
in
{ Emitter.closed; flush_and_close; tick; emit }

View file

@ -53,3 +53,9 @@ val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
val push' : 'a t -> 'a list -> unit
(** Like {!push} but ignores the result *)
open Opentelemetry_emitter
val wrap_emitter : 'a t -> 'a Emitter.t -> 'a Emitter.t
(** [batch_emitter batch e] is an emitter that uses batch [batch] to gather
signals into larger lists before passing them to [e]. *)

View file

@ -1,6 +1,12 @@
(library
(name opentelemetry_client)
(public_name opentelemetry.client)
(libraries opentelemetry opentelemetry.proto pbrt mtime mtime.clock.os)
(libraries
opentelemetry
opentelemetry.emitter
opentelemetry.proto
pbrt
mtime
mtime.clock.os)
(synopsis
"Basic exporters, as well as Common types and logic shared between exporters"))

46
src/client/sampler.ml Normal file
View file

@ -0,0 +1,46 @@
type t = {
proba_accept: float;
n_seen: int Atomic.t;
n_accepted: int Atomic.t;
}
let create ~proba_accept () : t =
(* FIXME: either czzry a random state and protect it, or make sure
we Random.self_init() in the current domain?? *)
if proba_accept < 0. || proba_accept > 1. then
invalid_arg "sampler: proba_accept must be in [0., 1.]";
{ proba_accept; n_seen = Atomic.make 0; n_accepted = Atomic.make 0 }
let[@inline] proba_accept self = self.proba_accept
let actual_rate (self : t) : float =
let accept = Atomic.get self.n_accepted in
let total = Atomic.get self.n_seen in
if total = 0 then
1.
else
float accept /. float total
let accept (self : t) : bool =
Atomic.incr self.n_seen;
let n = Random.float 1. in
let res = n < self.proba_accept in
if res then Atomic.incr self.n_accepted;
res
open Opentelemetry_emitter
let wrap_emitter (self : t) (e : _ Emitter.t) : _ Emitter.t =
let closed () = Emitter.closed e in
let flush_and_close () = Emitter.flush_and_close e in
let tick ~now = Emitter.tick e ~now in
let emit l =
let accepted = List.filter (fun _x -> accept self) l in
if accepted <> [] then Emitter.emit e accepted
in
{ Emitter.closed; flush_and_close; tick; emit }

25
src/client/sampler.mli Normal file
View file

@ -0,0 +1,25 @@
(** Basic random sampling *)
type t
val create : proba_accept:float -> unit -> t
(** [create ~proba_accept:n ()] makes a new sampler.
The sampler will accept signals with probability [n] (must be between 0 and
1).
@raise Invalid_argument if [n] is not between 0 and 1. *)
val accept : t -> bool
(** Do we accept a sample? This returns [true] with probability [proba_accept].
*)
val proba_accept : t -> float
val actual_rate : t -> float
(** The ratio of signals we actually accepted so far *)
open Opentelemetry_emitter
val wrap_emitter : t -> 'a Emitter.t -> 'a Emitter.t
(** [wrap_emitter sampler e] is a new emitter that uses the [sampler] on each
individual signal before passing them to [e]. *)