diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index fcdc611a..51b186b6 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -6,6 +6,7 @@ module OT = Opentelemetry module Config = Config module Signal = Opentelemetry_client.Signal +module Batch = Opentelemetry_client.Batch open Opentelemetry open Common_ @@ -164,88 +165,6 @@ end = struct ) end -(** Batch of resources to be pushed later. - - This type is thread-safe. *) -module Batch : sig - type 'a t - - val push' : 'a t -> 'a -> unit - - val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option - (** Is the batch ready to be emitted? If batching is disabled, this is true as - soon as {!is_empty} is false. If a timeout is provided for this batch, - then it will be ready if an element has been in it for at least the - timeout. - @param now passed to implement timeout *) - - val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t - (** Create a new batch *) -end = struct - type 'a t = { - mutable size: int; - mutable q: 'a list; - batch: int option; - high_watermark: int; - timeout: Mtime.span option; - mutable start: Mtime.t; - } - - let make ?batch ?timeout () : _ t = - Option.iter (fun b -> assert (b > 0)) batch; - let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in - { - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; - } - - let timeout_expired_ ~now self : bool = - match self.timeout with - | Some t -> - let elapsed = Mtime.span now self.start in - Mtime.Span.compare elapsed t >= 0 - | None -> false - - let is_full_ self : bool = - match self.batch with - | None -> self.size > 0 - | Some b -> self.size >= b - - let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) - then ( - let l = self.q in - self.q <- []; - self.size <- 0; - assert (l <> []); - Some l - ) else - None - - let push (self : _ t) x : bool = - if self.size >= self.high_watermark then ( - (* drop this to prevent queue from growing too fast *) - Atomic.incr n_dropped; - true - ) else ( - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); - - (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready - ) - - let push' self x = ignore (push self x : bool) -end - (** An emitter. This is used by {!Backend} below to forward traces/metrics/… from the program to whatever collector client we have. *) module type EMITTER = sig @@ -280,13 +199,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = else None - let batch_traces : Trace.resource_spans list Batch.t = + let batch_traces : Trace.resource_spans Batch.t = Batch.make ?batch:config.batch_traces ?timeout () - let batch_metrics : Metrics.resource_metrics list Batch.t = + let batch_metrics : Metrics.resource_metrics Batch.t = Batch.make ?batch:config.batch_metrics ?timeout () - let batch_logs : Logs.resource_logs list Batch.t = + let batch_logs : Logs.resource_logs Batch.t = Batch.make ?batch:config.batch_logs ?timeout () let on_tick_cbs_ = Atomic.make (AList.make ()) @@ -317,13 +236,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let send_logs_http client (l : Logs.resource_logs list) = Conv.logs l |> send_http_ client ~url:config.url_logs - let maybe_pop ?force ~now batch = - Batch.pop_if_ready ?force ~now batch - |> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) []) - (* emit metrics, if the batch is full or timeout lapsed *) let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = - match maybe_pop ?force ~now batch_metrics with + match Batch.pop_if_ready ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> let batch = !gc_metrics @ l in @@ -332,14 +247,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = true let emit_traces_maybe ~now ?force httpc : bool Lwt.t = - match maybe_pop ?force ~now batch_traces with + match Batch.pop_if_ready ?force ~now batch_traces with | None -> Lwt.return false | Some l -> let+ () = send_traces_http httpc l in true let emit_logs_maybe ~now ?force httpc : bool Lwt.t = - match maybe_pop ?force ~now batch_logs with + match Batch.pop_if_ready ?force ~now batch_logs with | None -> Lwt.return false | Some l -> let+ () = send_logs_http httpc l in @@ -381,9 +296,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = can also be several user threads that produce spans and call the emit functions. *) + let push_to_batch b e = + match Batch.push b e with + | `Ok -> () + | `Dropped -> Atomic.incr n_errors + let push_trace e = let@ () = guard_exn_ "push trace" in - Batch.push' batch_traces e; + push_to_batch batch_traces e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_traces_maybe ~now httpc in @@ -392,7 +312,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_metrics e = let@ () = guard_exn_ "push metrics" in sample_gc_metrics_if_needed (); - Batch.push' batch_metrics e; + push_to_batch batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_metrics_maybe ~now httpc in @@ -400,7 +320,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_logs e = let@ () = guard_exn_ "push logs" in - Batch.push' batch_logs e; + push_to_batch batch_logs e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_logs_maybe ~now httpc in diff --git a/src/client/batch.ml b/src/client/batch.ml new file mode 100644 index 00000000..600f1b43 --- /dev/null +++ b/src/client/batch.ml @@ -0,0 +1,64 @@ +type 'a t = { + mutable size: int; + mutable q: 'a list list; + batch: int; (* Minimum size to batch before popping *) + high_watermark: int; + timeout: Mtime.span option; + mutable start: Mtime.t; +} + +let make ?(batch = 1) ?timeout () : _ t = + assert (batch > 0); + let high_watermark = + if batch = 1 then + 100 + else + batch * 10 + in + { + size = 0; + start = Mtime_clock.now (); + q = []; + batch; + timeout; + high_watermark; + } + +let timeout_expired_ ~now self : bool = + match self.timeout with + | Some t -> + let elapsed = Mtime.span now self.start in + Mtime.Span.compare elapsed t >= 0 + | None -> false + +(* Big enough to send a batch *) +let is_full_ self : bool = self.size >= self.batch + +let ready_to_pop ~force ~now self = + self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) + +let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + if ready_to_pop ~force ~now self then ( + let l = self.q in + self.q <- []; + self.size <- 0; + assert (l <> []); + let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + Some ls + ) else + None + +let push (self : _ t) x : [ `Dropped | `Ok ] = + if self.size >= self.high_watermark then + (* drop this to prevent queue from growing too fast *) + `Dropped + else ( + if self.size = 0 && Option.is_some self.timeout then + (* current batch starts now *) + self.start <- Mtime_clock.now (); + + (* add to queue *) + self.size <- 1 + self.size; + self.q <- x :: self.q; + `Ok + ) diff --git a/src/client/batch.mli b/src/client/batch.mli new file mode 100644 index 00000000..37b12857 --- /dev/null +++ b/src/client/batch.mli @@ -0,0 +1,38 @@ +(** A thread-safe batch of resources to be popper when ready . *) + +type 'a t + +val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t +(** [make ()] is a new batch + + @param batch + the number of elements after which the batch will be considered {b full}, + and ready to pop. A "high water mark" is also derived form the batch as + [if batch = 1 then 100 else batch * 10]. This sets a limit after which new + elements will be [`Dropped] by {!push}. Set to [0] to disable batching. + Default [1]. + + @param timeout + the time span after which a batch is ready to pop, whether or not it is + {b full}. *) + +val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option +(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements + {!push}ed since the last batch, if the batch ready to be emitted. + + A batch is ready to pop if it contains some elements and + + - batching is disabled, and any elements have been batched, or batching was + enabled and at least [batch] elements have been pushed, or + - a [timeout] was provided, and more than a [timeout] span has passed since + the last pop was ready, or + - the pop is [force]d, + + @param now the current time + + @param force override the other batch conditions *) + +val push : 'a t -> 'a list -> [ `Dropped | `Ok ] +(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch + [b], or [`Dropped] if the current size of the batch has exceeded the high water + mark determined by the [batch] argument to {!make}]. ) *) diff --git a/src/client/dune b/src/client/dune index da204f79..095f71fa 100644 --- a/src/client/dune +++ b/src/client/dune @@ -1,5 +1,5 @@ (library (name opentelemetry_client) (public_name opentelemetry.client) - (libraries opentelemetry pbrt) + (libraries opentelemetry pbrt mtime mtime.clock.os) (synopsis "Common types and logic shared between client implementations"))