subscriber: tee a whole array at once

This commit is contained in:
Simon Cruanes 2025-04-23 16:41:07 -04:00
parent 44fdc9557d
commit 3c1360677a
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 80 additions and 82 deletions

View file

@ -17,105 +17,97 @@ type t =
let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () } let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }
open struct open struct
module Tee_cb : Callbacks.S with type st = t * t = struct module Tee_cb : Callbacks.S with type st = t array = struct
type nonrec st = t * t type nonrec st = t array
let on_init let on_init st ~time_ns =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_init s1 ~time_ns; CB.on_init s ~time_ns
CB2.on_init s2 ~time_ns done
let on_shutdown let on_shutdown st ~time_ns =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_shutdown s1 ~time_ns; CB.on_shutdown s ~time_ns
CB2.on_shutdown s2 ~time_ns done
let on_name_thread let on_name_thread st ~time_ns ~tid ~name =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_name_thread s1 ~time_ns ~tid ~name; CB.on_name_thread s ~time_ns ~tid ~name
CB2.on_name_thread s2 ~time_ns ~tid ~name done
let on_name_process let on_name_process st ~time_ns ~tid ~name =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_name_process s1 ~time_ns ~tid ~name; CB.on_name_process s ~time_ns ~tid ~name
CB2.on_name_process s2 ~time_ns ~tid ~name done
let on_enter_span let on_enter_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
( Sub { st = s1; callbacks = (module CB1) }, ~name span =
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__ for i = 0 to Array.length st - 1 do
~__LINE__ ~time_ns ~tid ~data ~name span = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_enter_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data CB.on_enter_span s ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span; ~name span
CB2.on_enter_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data done
~name span
let on_exit_span let on_exit_span st ~time_ns ~tid span =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid span = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_exit_span s1 ~time_ns ~tid span; CB.on_exit_span s ~time_ns ~tid span
CB2.on_exit_span s2 ~time_ns ~tid span done
let on_add_data let on_add_data st ~data span =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~data span = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_add_data s1 ~data span; CB.on_add_data s ~data span
CB2.on_add_data s2 ~data span done
let on_message let on_message st ~time_ns ~tid ~span ~data msg =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~span ~data let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
msg = CB.on_message s ~time_ns ~tid ~span ~data msg
CB1.on_message s1 ~time_ns ~tid ~span ~data msg; done
CB2.on_message s2 ~time_ns ~tid ~span ~data msg
let on_counter let on_counter st ~time_ns ~tid ~data ~name n =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~data ~name let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
n = CB.on_counter s ~time_ns ~tid ~data ~name n
CB1.on_counter s1 ~time_ns ~tid ~data ~name n; done
CB2.on_counter s2 ~time_ns ~tid ~data ~name n
let on_enter_manual_span let on_enter_manual_span st ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid
( Sub { st = s1; callbacks = (module CB1) }, ~parent ~data ~name ~flavor ~trace_id span =
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__ for i = 0 to Array.length st - 1 do
~__LINE__ ~time_ns ~tid ~parent ~data ~name ~flavor ~trace_id span = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_enter_manual_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns CB.on_enter_manual_span s ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span; ~tid ~parent ~data ~name ~flavor ~trace_id span
CB2.on_enter_manual_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns done
~tid ~parent ~data ~name ~flavor ~trace_id span
let on_exit_manual_span let on_exit_manual_span st ~time_ns ~tid ~name ~data ~flavor ~trace_id span
( Sub { st = s1; callbacks = (module CB1) }, =
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name ~data for i = 0 to Array.length st - 1 do
~flavor ~trace_id span = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_exit_manual_span s1 ~time_ns ~tid ~name ~data ~flavor ~trace_id CB.on_exit_manual_span s ~time_ns ~tid ~name ~data ~flavor ~trace_id
span; span
CB2.on_exit_manual_span s2 ~time_ns ~tid ~name ~data ~flavor ~trace_id done
span
let on_extension_event let on_extension_event st ~time_ns ~tid ev : unit =
( Sub { st = s1; callbacks = (module CB1) }, for i = 0 to Array.length st - 1 do
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ev : unit = let (Sub { st = s; callbacks = (module CB) }) = Array.get st i in
CB1.on_extension_event s1 ~time_ns ~tid ev; CB.on_extension_event s ~time_ns ~tid ev
CB2.on_extension_event s2 ~time_ns ~tid ev done
end end
end end
(** [tee s1 s2] is a subscriber that forwards every call to [s1] and [s2] both.
*)
let tee (s1 : t) (s2 : t) : t =
let st = s1, s2 in
Sub { st; callbacks = (module Tee_cb) }
(** Tee multiple subscribers, ie return a subscriber that forwards to all the (** Tee multiple subscribers, ie return a subscriber that forwards to all the
subscribers in [subs]. *) subscribers in [subs]. *)
let rec tee_l (subs : t list) : t = let tee_l (subs : t list) : t =
match subs with match subs with
| [] -> dummy | [] -> dummy
| [ s ] -> s | [ s ] -> s
| [ s1; s2 ] -> tee s1 s2 | l -> Sub { st = Array.of_list l; callbacks = (module Tee_cb) }
| s1 :: s2 :: tl -> tee (tee s1 s2) (tee_l tl)
(** [tee s1 s2] is a subscriber that forwards every call to [s1] and [s2] both.
*)
let tee (s1 : t) (s2 : t) : t = tee_l [ s1; s2 ]

View file

@ -16,6 +16,12 @@ end
(** {2 Main API} *) (** {2 Main API} *)
type t = Subscriber.t type t = Subscriber.t
(** A trace subscriber. It pairs a set of callbacks with the state they need
(which can contain a file handle, a socket to write events to, config,
etc.).
The design goal for this is that it should be possible to avoid allocations
whenever the trace collector invokes the callbacks. *)
val collector : t -> Trace_core.collector val collector : t -> Trace_core.collector
(** A collector that calls the subscriber's callbacks. (** A collector that calls the subscriber's callbacks.