remove over from backend

This commit is contained in:
Simon Cruanes 2022-03-24 10:59:37 -04:00
parent 9e0cd0acc9
commit 901730583c
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 55 additions and 46 deletions

View file

@ -229,17 +229,15 @@ module Gen_ids() = struct
b b
end end
(** Callback for when an event is properly sent to the collector *)
type over_cb = unit -> unit
(** An emitter. This is used by {!Backend} below to forward traces/metrics/… (** An emitter. This is used by {!Backend} below to forward traces/metrics/…
from the program to whatever collector client we have. *) from the program to whatever collector client we have. *)
module type EMITTER = sig module type EMITTER = sig
open Opentelemetry.Proto open Opentelemetry.Proto
val push_trace : Trace.resource_spans list -> over:over_cb -> unit val push_trace : Trace.resource_spans list -> unit
val push_metrics : Metrics.resource_metrics list -> over:over_cb -> unit val push_metrics : Metrics.resource_metrics list -> unit
val tick : unit -> unit
val cleanup : unit -> unit val cleanup : unit -> unit
end end
@ -294,19 +292,19 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let continue = ref true in let continue = ref true in
let ((module E_trace) : (Trace.resource_spans list * over_cb) push), on_trace_full = let ((module E_trace) : Trace.resource_spans list push), on_trace_full =
mk_push ?batch:config.batch_traces () in mk_push ?batch:config.batch_traces () in
let ((module E_metrics) : (Metrics.resource_metrics list * over_cb) push), on_metrics_full = let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full =
mk_push ?batch:config.batch_metrics () in mk_push ?batch:config.batch_metrics () in
let encoder = Pbrt.Encoder.create() in let encoder = Pbrt.Encoder.create() in
let ((module C) as curl) = (module Curl() : CURL) in let ((module C) as curl) = (module Curl() : CURL) in
let emit_metrics (l:(Metrics.resource_metrics list*over_cb) list) = let emit_metrics (l:Metrics.resource_metrics list list) =
Pbrt.Encoder.reset encoder; Pbrt.Encoder.reset encoder;
let resource_metrics = let resource_metrics =
List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Metrics_service.encode_export_metrics_service_request Metrics_service.encode_export_metrics_service_request
(Metrics_service.default_export_metrics_service_request (Metrics_service.default_export_metrics_service_request
~resource_metrics ()) ~resource_metrics ())
@ -321,14 +319,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
Atomic.incr n_errors; Atomic.incr n_errors;
report_err_ err report_err_ err
end; end;
(* signal completion *)
List.iter (fun (_,over) -> over()) l;
in in
let emit_traces (l:(Trace.resource_spans list * over_cb) list) = let emit_traces (l:Trace.resource_spans list list) =
Pbrt.Encoder.reset encoder; Pbrt.Encoder.reset encoder;
let resource_spans = let resource_spans =
List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Trace_service.encode_export_trace_service_request Trace_service.encode_export_trace_service_request
(Trace_service.default_export_trace_service_request ~resource_spans ()) (Trace_service.default_export_trace_service_request ~resource_spans ())
encoder; encoder;
@ -342,8 +338,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
Atomic.incr n_errors; Atomic.incr n_errors;
report_err_ err report_err_ err
end; end;
(* signal completion *)
List.iter (fun (_,over) -> over()) l;
in in
let last_wakeup = Atomic.make (Mtime_clock.now()) in let last_wakeup = Atomic.make (Mtime_clock.now()) in
@ -431,13 +425,18 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
on_metrics_full wakeup; on_metrics_full wakeup;
on_trace_full wakeup; on_trace_full wakeup;
let tick() =
if batch_timeout() then wakeup()
in
let module M = struct let module M = struct
let push_trace e ~over = let push_trace e =
E_trace.push (e,over); E_trace.push e;
if batch_timeout() then wakeup() if batch_timeout() then wakeup()
let push_metrics e ~over = let push_metrics e =
E_metrics.push (e,over); E_metrics.push e;
if batch_timeout() then wakeup() if batch_timeout() then wakeup()
let tick=tick
let cleanup () = let cleanup () =
continue := false; continue := false;
with_mutex_ m (fun () -> Condition.broadcast cond) with_mutex_ m (fun () -> Condition.broadcast cond)
@ -456,14 +455,17 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
in in
let module M = struct let module M = struct
let push_trace e ~over = let push_trace e =
let@() = guard in let@() = guard in
E_trace.push (e,over); E_trace.push e;
if batch_timeout() then emit_all_force() if batch_timeout() then emit_all_force()
let push_metrics e ~over = let push_metrics e =
let@() = guard in let@() = guard in
E_metrics.push (e,over); E_metrics.push e;
if batch_timeout() then emit_all_force()
let tick () =
if batch_timeout() then emit_all_force() if batch_timeout() then emit_all_force()
let cleanup = cleanup let cleanup = cleanup
@ -482,10 +484,10 @@ module Backend(Arg : sig val config : Config.t end)()
open Opentelemetry.Collector open Opentelemetry.Collector
let send_trace : Trace.resource_spans list sender = { let send_trace : Trace.resource_spans list sender = {
send=fun l ~over ~ret -> send=fun l ~ret ->
let@() = with_lock_ in let@() = with_lock_ in
if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l;
push_trace l ~over; push_trace l;
ret() ret()
} }
@ -517,12 +519,12 @@ module Backend(Arg : sig val config : Config.t end)()
) else [] ) else []
let send_metrics : Metrics.resource_metrics list sender = { let send_metrics : Metrics.resource_metrics list sender = {
send=fun m ~over ~ret -> send=fun m ~ret ->
let@() = with_lock_ in let@() = with_lock_ in
if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m;
let m = List.rev_append (additional_metrics()) m in let m = List.rev_append (additional_metrics()) m in
push_metrics m ~over; push_metrics m;
ret() ret()
} }
end end

View file

@ -46,12 +46,4 @@ end
module Metrics = struct module Metrics = struct
open Proto.Metrics open Proto.Metrics
include Metrics include Metrics
(** Emit some metrics to the collector. *)
let emit ?attrs (l:t list) : unit Lwt.t =
let fut, wake = Lwt.wait() in
let rm = make_resource_metrics ?attrs l in
Collector.send_metrics [rm]
~over:(fun () -> Lwt.wakeup_later wake ())
~ret:(fun () -> fut)
end end

View file

@ -72,9 +72,18 @@ module Collector = struct
(** Sender interface for a message of type [msg]. (** Sender interface for a message of type [msg].
Inspired from Logs' reporter Inspired from Logs' reporter
(see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) *) (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc})
but without [over] as it doesn't make much sense in presence
of batching.
The [ret] callback is used to return the desired type (unit, or
a Lwt promise, or anything else) once the event has been transferred
to the backend.
It doesn't mean the event has been collected yet, it
could sit in a batch queue for a little while.
*)
type 'msg sender = { type 'msg sender = {
send: 'a. 'msg -> over:(unit -> unit) -> ret:(unit -> 'a) -> 'a; send: 'a. 'msg -> ret:(unit -> 'a) -> 'a;
} }
(** Collector client interface. *) (** Collector client interface. *)
@ -89,6 +98,10 @@ module Collector = struct
val rand_bytes_8 : unit -> bytes val rand_bytes_8 : unit -> bytes
(** Generate 16 bytes of random data *) (** Generate 16 bytes of random data *)
val tick : unit -> unit
(** Should be called regularly for background processing,
timeout checks, etc. *)
val cleanup : unit -> unit val cleanup : unit -> unit
end end
@ -99,15 +112,15 @@ module Collector = struct
(** Is there a configured backend? *) (** Is there a configured backend? *)
let[@inline] has_backend () : bool = !backend != None let[@inline] has_backend () : bool = !backend != None
let send_trace (l:Trace.resource_spans list) ~over ~ret = let send_trace (l:Trace.resource_spans list) ~ret =
match !backend with match !backend with
| None -> over(); ret() | None -> ret()
| Some (module B) -> B.send_trace.send l ~over ~ret | Some (module B) -> B.send_trace.send l ~ret
let send_metrics (l:Metrics.resource_metrics list) ~over ~ret = let send_metrics (l:Metrics.resource_metrics list) ~ret =
match !backend with match !backend with
| None -> over(); ret() | None -> ret()
| Some (module B) -> B.send_metrics.send l ~over ~ret | Some (module B) -> B.send_metrics.send l ~ret
let rand_bytes_16 () = let rand_bytes_16 () =
match !backend with match !backend with
@ -419,7 +432,7 @@ module Trace = struct
(** Sync emitter *) (** Sync emitter *)
let emit ?service_name ?attrs (spans:span list) : unit = let emit ?service_name ?attrs (spans:span list) : unit =
let rs = make_resource_spans ?service_name ?attrs spans in let rs = make_resource_spans ?service_name ?attrs spans in
Collector.send_trace [rs] ~over:(fun () -> ()) ~ret:(fun () -> ()) Collector.send_trace [rs] ~ret:(fun () -> ())
(** Scope to be used with {!with_}. *) (** Scope to be used with {!with_}. *)
type scope = { type scope = {
@ -559,8 +572,10 @@ module Metrics = struct
default_resource_metrics default_resource_metrics
~instrumentation_library_metrics:[lm] ~resource:(Some resource) () ~instrumentation_library_metrics:[lm] ~resource:(Some resource) ()
(** Emit some metrics to the collector (sync). *) (** Emit some metrics to the collector (sync). This blocks until
the backend has pushed the metrics into some internal queue, or
discarded them. *)
let emit ?attrs (l:t list) : unit = let emit ?attrs (l:t list) : unit =
let rm = make_resource_metrics ?attrs l in let rm = make_resource_metrics ?attrs l in
Collector.send_metrics [rm] ~over:ignore ~ret:ignore Collector.send_metrics [rm] ~ret:ignore
end end