emitter: add enabled() field, and tap

This commit is contained in:
Simon Cruanes 2025-12-04 10:56:04 -05:00
parent ebed5d7ce8
commit df4d657c1a
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 27 additions and 5 deletions

View file

@ -7,7 +7,10 @@
exception Closed
type 'a t = {
type -'a t = {
enabled: unit -> bool;
(** Return [true] if [emit] has a chance of doing something with the
signals it's given. *)
emit: 'a list -> unit;
(** Emit signals. @raise Closed if the emitter is closed. *)
tick: now:Mtime.t -> unit;
@ -20,6 +23,8 @@ type 'a t = {
}
(** An emitter for values of type ['a]. *)
let[@inline] enabled self : bool = self.enabled ()
let[@inline] emit (self : _ t) l : unit = if l <> [] then self.emit l
let[@inline] tick (self : _ t) ~now : unit = self.tick ~now
@ -33,7 +38,20 @@ let[@inline] flush_and_close (self : _ t) : unit = self.flush_and_close ()
let map (f : 'a -> 'b) (self : 'b t) : 'a t =
{ self with emit = (fun l -> self.emit (List.map f l)) }
(* TODO: batching, either regular or sharded to reduce contention *)
(* TODO: sampling *)
(** [tap f e] is like [e], but every signal is passed to [f] *)
let tap (f : 'a -> unit) (self : 'a t) : 'a t =
let emit l =
List.iter f l;
self.emit l
in
{ self with emit }
(* TODO: use in Opentelemetry, and also for Tracer, Logger, etc. *)
let dummy () : _ t =
let closed = Atomic.make false in
{
enabled = (fun () -> false);
emit = ignore;
tick = (fun ~now:_ -> ());
closed = (fun () -> Atomic.get closed);
flush_and_close = (fun () -> Atomic.set closed true);
}

View file

@ -3,7 +3,11 @@
let to_list (l : 'a list ref) : 'a Emitter.t =
let closed = Atomic.make false in
{
emit = (fun sigs -> l := List.rev_append sigs !l);
enabled = (fun () -> not (Atomic.get closed));
emit =
(fun sigs ->
if Atomic.get closed then raise Emitter.Closed;
l := List.rev_append sigs !l);
tick = (fun ~now:_ -> ());
closed = (fun () -> Atomic.get closed);
flush_and_close = (fun () -> Atomic.set closed true);