From 901730583c5626e66b2fee8b21861e4b399d9602 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 24 Mar 2022 10:59:37 -0400 Subject: [PATCH] remove `over` from backend --- src/client/opentelemetry_client_ocurl.ml | 56 ++++++++++++------------ src/lwt/opentelemetry_lwt.ml | 8 ---- src/opentelemetry.ml | 37 +++++++++++----- 3 files changed, 55 insertions(+), 46 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 275f4b0b..dfb2fed3 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -229,17 +229,15 @@ module Gen_ids() = struct b end -(** Callback for when an event is properly sent to the collector *) -type over_cb = unit -> unit - (** An emitter. This is used by {!Backend} below to forward traces/metrics/… from the program to whatever collector client we have. *) module type EMITTER = sig open Opentelemetry.Proto - val push_trace : Trace.resource_spans list -> over:over_cb -> unit - val push_metrics : Metrics.resource_metrics list -> over:over_cb -> unit + val push_trace : Trace.resource_spans list -> unit + val push_metrics : Metrics.resource_metrics list -> unit + val tick : unit -> unit val cleanup : unit -> unit end @@ -294,19 +292,19 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let continue = ref true in - let ((module E_trace) : (Trace.resource_spans list * over_cb) push), on_trace_full = + let ((module E_trace) : Trace.resource_spans list push), on_trace_full = mk_push ?batch:config.batch_traces () in - let ((module E_metrics) : (Metrics.resource_metrics list * over_cb) push), on_metrics_full = + let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = mk_push ?batch:config.batch_metrics () in let encoder = Pbrt.Encoder.create() in let ((module C) as curl) = (module Curl() : CURL) in - let emit_metrics (l:(Metrics.resource_metrics list*over_cb) list) = + let emit_metrics (l:Metrics.resource_metrics list list) = Pbrt.Encoder.reset encoder; let resource_metrics = - List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in + List.fold_left (fun acc l -> List.rev_append l acc) [] l in Metrics_service.encode_export_metrics_service_request (Metrics_service.default_export_metrics_service_request ~resource_metrics ()) @@ -321,14 +319,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = Atomic.incr n_errors; report_err_ err end; - (* signal completion *) - List.iter (fun (_,over) -> over()) l; in - let emit_traces (l:(Trace.resource_spans list * over_cb) list) = + let emit_traces (l:Trace.resource_spans list list) = Pbrt.Encoder.reset encoder; let resource_spans = - List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in + List.fold_left (fun acc l -> List.rev_append l acc) [] l in Trace_service.encode_export_trace_service_request (Trace_service.default_export_trace_service_request ~resource_spans ()) encoder; @@ -342,8 +338,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = Atomic.incr n_errors; report_err_ err end; - (* signal completion *) - List.iter (fun (_,over) -> over()) l; in let last_wakeup = Atomic.make (Mtime_clock.now()) in @@ -431,13 +425,18 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = on_metrics_full wakeup; on_trace_full wakeup; + let tick() = + if batch_timeout() then wakeup() + in + let module M = struct - let push_trace e ~over = - E_trace.push (e,over); + let push_trace e = + E_trace.push e; if batch_timeout() then wakeup() - let push_metrics e ~over = - E_metrics.push (e,over); + let push_metrics e = + E_metrics.push e; if batch_timeout() then wakeup() + let tick=tick let cleanup () = continue := false; with_mutex_ m (fun () -> Condition.broadcast cond) @@ -456,14 +455,17 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let module M = struct - let push_trace e ~over = + let push_trace e = let@() = guard in - E_trace.push (e,over); + E_trace.push e; if batch_timeout() then emit_all_force() - let push_metrics e ~over = + let push_metrics e = let@() = guard in - E_metrics.push (e,over); + E_metrics.push e; + if batch_timeout() then emit_all_force() + + let tick () = if batch_timeout() then emit_all_force() let cleanup = cleanup @@ -482,10 +484,10 @@ module Backend(Arg : sig val config : Config.t end)() open Opentelemetry.Collector let send_trace : Trace.resource_spans list sender = { - send=fun l ~over ~ret -> + send=fun l ~ret -> let@() = with_lock_ in if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; - push_trace l ~over; + push_trace l; ret() } @@ -517,12 +519,12 @@ module Backend(Arg : sig val config : Config.t end)() ) else [] let send_metrics : Metrics.resource_metrics list sender = { - send=fun m ~over ~ret -> + send=fun m ~ret -> let@() = with_lock_ in if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; let m = List.rev_append (additional_metrics()) m in - push_metrics m ~over; + push_metrics m; ret() } end diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index c3c01ad2..0bc4367d 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -46,12 +46,4 @@ end module Metrics = struct open Proto.Metrics include Metrics - - (** Emit some metrics to the collector. *) - let emit ?attrs (l:t list) : unit Lwt.t = - let fut, wake = Lwt.wait() in - let rm = make_resource_metrics ?attrs l in - Collector.send_metrics [rm] - ~over:(fun () -> Lwt.wakeup_later wake ()) - ~ret:(fun () -> fut) end diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 02531ce7..16959506 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -72,9 +72,18 @@ module Collector = struct (** Sender interface for a message of type [msg]. Inspired from Logs' reporter - (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) *) + (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) + but without [over] as it doesn't make much sense in presence + of batching. + + The [ret] callback is used to return the desired type (unit, or + a Lwt promise, or anything else) once the event has been transferred + to the backend. + It doesn't mean the event has been collected yet, it + could sit in a batch queue for a little while. + *) type 'msg sender = { - send: 'a. 'msg -> over:(unit -> unit) -> ret:(unit -> 'a) -> 'a; + send: 'a. 'msg -> ret:(unit -> 'a) -> 'a; } (** Collector client interface. *) @@ -89,6 +98,10 @@ module Collector = struct val rand_bytes_8 : unit -> bytes (** Generate 16 bytes of random data *) + val tick : unit -> unit + (** Should be called regularly for background processing, + timeout checks, etc. *) + val cleanup : unit -> unit end @@ -99,15 +112,15 @@ module Collector = struct (** Is there a configured backend? *) let[@inline] has_backend () : bool = !backend != None - let send_trace (l:Trace.resource_spans list) ~over ~ret = + let send_trace (l:Trace.resource_spans list) ~ret = match !backend with - | None -> over(); ret() - | Some (module B) -> B.send_trace.send l ~over ~ret + | None -> ret() + | Some (module B) -> B.send_trace.send l ~ret - let send_metrics (l:Metrics.resource_metrics list) ~over ~ret = + let send_metrics (l:Metrics.resource_metrics list) ~ret = match !backend with - | None -> over(); ret() - | Some (module B) -> B.send_metrics.send l ~over ~ret + | None -> ret() + | Some (module B) -> B.send_metrics.send l ~ret let rand_bytes_16 () = match !backend with @@ -419,7 +432,7 @@ module Trace = struct (** Sync emitter *) let emit ?service_name ?attrs (spans:span list) : unit = let rs = make_resource_spans ?service_name ?attrs spans in - Collector.send_trace [rs] ~over:(fun () -> ()) ~ret:(fun () -> ()) + Collector.send_trace [rs] ~ret:(fun () -> ()) (** Scope to be used with {!with_}. *) type scope = { @@ -559,8 +572,10 @@ module Metrics = struct default_resource_metrics ~instrumentation_library_metrics:[lm] ~resource:(Some resource) () - (** Emit some metrics to the collector (sync). *) + (** Emit some metrics to the collector (sync). This blocks until + the backend has pushed the metrics into some internal queue, or + discarded them. *) let emit ?attrs (l:t list) : unit = let rm = make_resource_metrics ?attrs l in - Collector.send_metrics [rm] ~over:ignore ~ret:ignore + Collector.send_metrics [rm] ~ret:ignore end