feat emitter: better docs, add a to_list emitter

This commit is contained in:
Simon Cruanes 2025-12-04 10:07:29 -05:00
parent b8228dfe25
commit 114e2eb566
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 25 additions and 5 deletions

View file

@ -1,5 +1,6 @@
(library (library
(name opentelemetry_emitter) (name opentelemetry_emitter)
(public_name opentelemetry.emitter) (public_name opentelemetry.emitter)
(libraries mtime mtime.clock.os) (libraries mtime mtime.clock.os opentelemetry.atomic)
(flags :standard -open Opentelemetry_atomic)
(synopsis "Modular emitters for a single signal at a time")) (synopsis "Modular emitters for a single signal at a time"))

View file

@ -1,4 +1,9 @@
(** Emitters *) (** Emitters.
This is the composable abstraction we use to represent how signals are
emitted, from their origin point (a site in user code or library code that
was instrumented, and just created a span or log record or metric), down to
the actual SDK exporter installed in the application. *)
exception Closed exception Closed
@ -6,10 +11,12 @@ type 'a t = {
emit: 'a list -> unit; emit: 'a list -> unit;
(** Emit signals. @raise Closed if the emitter is closed. *) (** Emit signals. @raise Closed if the emitter is closed. *)
tick: now:Mtime.t -> unit; tick: now:Mtime.t -> unit;
(** Call regularly to ensure background work is done *) (** Call regularly to ensure background work is done. The current
closed: unit -> bool; (** True if the emitter was closed *) timestamp is passed to improve testability. *)
closed: unit -> bool;
(** True if the emitter is already closed. Beware TOCTOU bugs. *)
flush_and_close: unit -> unit; flush_and_close: unit -> unit;
(** Flush internal buffered signals, then close *) (** Flush internally buffered signals, then close. *)
} }
(** An emitter for values of type ['a]. *) (** An emitter for values of type ['a]. *)
@ -21,6 +28,8 @@ let[@inline] closed self : bool = self.closed ()
let[@inline] flush_and_close (self : _ t) : unit = self.flush_and_close () let[@inline] flush_and_close (self : _ t) : unit = self.flush_and_close ()
(** [map f emitter] returns a new emitter that applies [f] to signals before
passing them to [emitter] *)
let map (f : 'a -> 'b) (self : 'b t) : 'a t = let map (f : 'a -> 'b) (self : 'b t) : 'a t =
{ self with emit = (fun l -> self.emit (List.map f l)) } { self with emit = (fun l -> self.emit (List.map f l)) }

10
src/emitter/to_list.ml Normal file
View file

@ -0,0 +1,10 @@
(** Emitter that stores signals into a list, in reverse order (most recent
signals first). *)
let to_list (l : 'a list ref) : 'a Emitter.t =
let closed = Atomic.make false in
{
emit = (fun sigs -> l := List.rev_append sigs !l);
tick = (fun ~now:_ -> ());
closed = (fun () -> Atomic.get closed);
flush_and_close = (fun () -> Atomic.set closed true);
}