From 114e2eb566c11222b020008087321e07b019af90 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 4 Dec 2025 10:07:29 -0500 Subject: [PATCH] feat emitter: better docs, add a `to_list` emitter --- src/emitter/dune | 3 ++- src/emitter/emitter.ml | 17 +++++++++++++---- src/emitter/to_list.ml | 10 ++++++++++ 3 files changed, 25 insertions(+), 5 deletions(-) create mode 100644 src/emitter/to_list.ml diff --git a/src/emitter/dune b/src/emitter/dune index 76f79dee..3844e185 100644 --- a/src/emitter/dune +++ b/src/emitter/dune @@ -1,5 +1,6 @@ (library (name opentelemetry_emitter) (public_name opentelemetry.emitter) - (libraries mtime mtime.clock.os) + (libraries mtime mtime.clock.os opentelemetry.atomic) + (flags :standard -open Opentelemetry_atomic) (synopsis "Modular emitters for a single signal at a time")) diff --git a/src/emitter/emitter.ml b/src/emitter/emitter.ml index 54999bd4..02d8822e 100644 --- a/src/emitter/emitter.ml +++ b/src/emitter/emitter.ml @@ -1,4 +1,9 @@ -(** Emitters *) +(** Emitters. + + This is the composable abstraction we use to represent how signals are + emitted, from their origin point (a site in user code or library code that + was instrumented, and just created a span or log record or metric), down to + the actual SDK exporter installed in the application. *) exception Closed @@ -6,10 +11,12 @@ type 'a t = { emit: 'a list -> unit; (** Emit signals. @raise Closed if the emitter is closed. *) tick: now:Mtime.t -> unit; - (** Call regularly to ensure background work is done *) - closed: unit -> bool; (** True if the emitter was closed *) + (** Call regularly to ensure background work is done. The current + timestamp is passed to improve testability. *) + closed: unit -> bool; + (** True if the emitter is already closed. Beware TOCTOU bugs. *) flush_and_close: unit -> unit; - (** Flush internal buffered signals, then close *) + (** Flush internally buffered signals, then close. *) } (** An emitter for values of type ['a]. *) @@ -21,6 +28,8 @@ let[@inline] closed self : bool = self.closed () let[@inline] flush_and_close (self : _ t) : unit = self.flush_and_close () +(** [map f emitter] returns a new emitter that applies [f] to signals before + passing them to [emitter] *) let map (f : 'a -> 'b) (self : 'b t) : 'a t = { self with emit = (fun l -> self.emit (List.map f l)) } diff --git a/src/emitter/to_list.ml b/src/emitter/to_list.ml new file mode 100644 index 00000000..61a42ab8 --- /dev/null +++ b/src/emitter/to_list.ml @@ -0,0 +1,10 @@ +(** Emitter that stores signals into a list, in reverse order (most recent + signals first). *) +let to_list (l : 'a list ref) : 'a Emitter.t = + let closed = Atomic.make false in + { + emit = (fun sigs -> l := List.rev_append sigs !l); + tick = (fun ~now:_ -> ()); + closed = (fun () -> Atomic.get closed); + flush_and_close = (fun () -> Atomic.set closed true); + }