From ca31707395cb4f84a231af7f03518f0e3963d9c6 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Fri, 27 Jun 2025 21:48:55 -0400 Subject: [PATCH 1/4] Factor batching logic out of the cohttp-lwt client This will allow resuing the batching logic in the Eio client. As a followup, we should refactor the ocurl client to use the same batcher. --- .../opentelemetry_client_cohttp_lwt.ml | 110 +++--------------- src/client/batch.ml | 64 ++++++++++ src/client/batch.mli | 38 ++++++ src/client/dune | 2 +- 4 files changed, 118 insertions(+), 96 deletions(-) create mode 100644 src/client/batch.ml create mode 100644 src/client/batch.mli diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index fcdc611a..51b186b6 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -6,6 +6,7 @@ module OT = Opentelemetry module Config = Config module Signal = Opentelemetry_client.Signal +module Batch = Opentelemetry_client.Batch open Opentelemetry open Common_ @@ -164,88 +165,6 @@ end = struct ) end -(** Batch of resources to be pushed later. - - This type is thread-safe. *) -module Batch : sig - type 'a t - - val push' : 'a t -> 'a -> unit - - val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option - (** Is the batch ready to be emitted? If batching is disabled, this is true as - soon as {!is_empty} is false. If a timeout is provided for this batch, - then it will be ready if an element has been in it for at least the - timeout. - @param now passed to implement timeout *) - - val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t - (** Create a new batch *) -end = struct - type 'a t = { - mutable size: int; - mutable q: 'a list; - batch: int option; - high_watermark: int; - timeout: Mtime.span option; - mutable start: Mtime.t; - } - - let make ?batch ?timeout () : _ t = - Option.iter (fun b -> assert (b > 0)) batch; - let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in - { - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; - } - - let timeout_expired_ ~now self : bool = - match self.timeout with - | Some t -> - let elapsed = Mtime.span now self.start in - Mtime.Span.compare elapsed t >= 0 - | None -> false - - let is_full_ self : bool = - match self.batch with - | None -> self.size > 0 - | Some b -> self.size >= b - - let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) - then ( - let l = self.q in - self.q <- []; - self.size <- 0; - assert (l <> []); - Some l - ) else - None - - let push (self : _ t) x : bool = - if self.size >= self.high_watermark then ( - (* drop this to prevent queue from growing too fast *) - Atomic.incr n_dropped; - true - ) else ( - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); - - (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready - ) - - let push' self x = ignore (push self x : bool) -end - (** An emitter. This is used by {!Backend} below to forward traces/metrics/… from the program to whatever collector client we have. *) module type EMITTER = sig @@ -280,13 +199,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = else None - let batch_traces : Trace.resource_spans list Batch.t = + let batch_traces : Trace.resource_spans Batch.t = Batch.make ?batch:config.batch_traces ?timeout () - let batch_metrics : Metrics.resource_metrics list Batch.t = + let batch_metrics : Metrics.resource_metrics Batch.t = Batch.make ?batch:config.batch_metrics ?timeout () - let batch_logs : Logs.resource_logs list Batch.t = + let batch_logs : Logs.resource_logs Batch.t = Batch.make ?batch:config.batch_logs ?timeout () let on_tick_cbs_ = Atomic.make (AList.make ()) @@ -317,13 +236,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let send_logs_http client (l : Logs.resource_logs list) = Conv.logs l |> send_http_ client ~url:config.url_logs - let maybe_pop ?force ~now batch = - Batch.pop_if_ready ?force ~now batch - |> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) []) - (* emit metrics, if the batch is full or timeout lapsed *) let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = - match maybe_pop ?force ~now batch_metrics with + match Batch.pop_if_ready ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> let batch = !gc_metrics @ l in @@ -332,14 +247,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = true let emit_traces_maybe ~now ?force httpc : bool Lwt.t = - match maybe_pop ?force ~now batch_traces with + match Batch.pop_if_ready ?force ~now batch_traces with | None -> Lwt.return false | Some l -> let+ () = send_traces_http httpc l in true let emit_logs_maybe ~now ?force httpc : bool Lwt.t = - match maybe_pop ?force ~now batch_logs with + match Batch.pop_if_ready ?force ~now batch_logs with | None -> Lwt.return false | Some l -> let+ () = send_logs_http httpc l in @@ -381,9 +296,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = can also be several user threads that produce spans and call the emit functions. *) + let push_to_batch b e = + match Batch.push b e with + | `Ok -> () + | `Dropped -> Atomic.incr n_errors + let push_trace e = let@ () = guard_exn_ "push trace" in - Batch.push' batch_traces e; + push_to_batch batch_traces e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_traces_maybe ~now httpc in @@ -392,7 +312,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_metrics e = let@ () = guard_exn_ "push metrics" in sample_gc_metrics_if_needed (); - Batch.push' batch_metrics e; + push_to_batch batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_metrics_maybe ~now httpc in @@ -400,7 +320,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let push_logs e = let@ () = guard_exn_ "push logs" in - Batch.push' batch_logs e; + push_to_batch batch_logs e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_logs_maybe ~now httpc in diff --git a/src/client/batch.ml b/src/client/batch.ml new file mode 100644 index 00000000..600f1b43 --- /dev/null +++ b/src/client/batch.ml @@ -0,0 +1,64 @@ +type 'a t = { + mutable size: int; + mutable q: 'a list list; + batch: int; (* Minimum size to batch before popping *) + high_watermark: int; + timeout: Mtime.span option; + mutable start: Mtime.t; +} + +let make ?(batch = 1) ?timeout () : _ t = + assert (batch > 0); + let high_watermark = + if batch = 1 then + 100 + else + batch * 10 + in + { + size = 0; + start = Mtime_clock.now (); + q = []; + batch; + timeout; + high_watermark; + } + +let timeout_expired_ ~now self : bool = + match self.timeout with + | Some t -> + let elapsed = Mtime.span now self.start in + Mtime.Span.compare elapsed t >= 0 + | None -> false + +(* Big enough to send a batch *) +let is_full_ self : bool = self.size >= self.batch + +let ready_to_pop ~force ~now self = + self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) + +let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + if ready_to_pop ~force ~now self then ( + let l = self.q in + self.q <- []; + self.size <- 0; + assert (l <> []); + let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + Some ls + ) else + None + +let push (self : _ t) x : [ `Dropped | `Ok ] = + if self.size >= self.high_watermark then + (* drop this to prevent queue from growing too fast *) + `Dropped + else ( + if self.size = 0 && Option.is_some self.timeout then + (* current batch starts now *) + self.start <- Mtime_clock.now (); + + (* add to queue *) + self.size <- 1 + self.size; + self.q <- x :: self.q; + `Ok + ) diff --git a/src/client/batch.mli b/src/client/batch.mli new file mode 100644 index 00000000..37b12857 --- /dev/null +++ b/src/client/batch.mli @@ -0,0 +1,38 @@ +(** A thread-safe batch of resources to be popper when ready . *) + +type 'a t + +val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t +(** [make ()] is a new batch + + @param batch + the number of elements after which the batch will be considered {b full}, + and ready to pop. A "high water mark" is also derived form the batch as + [if batch = 1 then 100 else batch * 10]. This sets a limit after which new + elements will be [`Dropped] by {!push}. Set to [0] to disable batching. + Default [1]. + + @param timeout + the time span after which a batch is ready to pop, whether or not it is + {b full}. *) + +val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option +(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements + {!push}ed since the last batch, if the batch ready to be emitted. + + A batch is ready to pop if it contains some elements and + + - batching is disabled, and any elements have been batched, or batching was + enabled and at least [batch] elements have been pushed, or + - a [timeout] was provided, and more than a [timeout] span has passed since + the last pop was ready, or + - the pop is [force]d, + + @param now the current time + + @param force override the other batch conditions *) + +val push : 'a t -> 'a list -> [ `Dropped | `Ok ] +(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch + [b], or [`Dropped] if the current size of the batch has exceeded the high water + mark determined by the [batch] argument to {!make}]. ) *) diff --git a/src/client/dune b/src/client/dune index da204f79..095f71fa 100644 --- a/src/client/dune +++ b/src/client/dune @@ -1,5 +1,5 @@ (library (name opentelemetry_client) (public_name opentelemetry.client) - (libraries opentelemetry pbrt) + (libraries opentelemetry pbrt mtime mtime.clock.os) (synopsis "Common types and logic shared between client implementations")) From 18f58c3ac5d5cdff5a805c02f843f256a0a2b16c Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 30 Jun 2025 22:01:13 -0400 Subject: [PATCH 2/4] Allow configuring high_watermark --- src/client/batch.ml | 15 ++++++++------- src/client/batch.mli | 15 ++++++++++----- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 600f1b43..005644c2 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -7,14 +7,15 @@ type 'a t = { mutable start: Mtime.t; } -let make ?(batch = 1) ?timeout () : _ t = +let high_watermark batch_size = + if batch_size = 1 then + 100 + else + batch_size * 10 + +let make ?(batch = 1) ?(high_watermark = high_watermark batch) ?timeout () : _ t + = assert (batch > 0); - let high_watermark = - if batch = 1 then - 100 - else - batch * 10 - in { size = 0; start = Mtime_clock.now (); diff --git a/src/client/batch.mli b/src/client/batch.mli index 37b12857..39fdd4d4 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -2,15 +2,20 @@ type 'a t -val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t +val make : + ?batch:int -> ?high_watermark:int -> ?timeout:Mtime.span -> unit -> 'a t (** [make ()] is a new batch @param batch the number of elements after which the batch will be considered {b full}, - and ready to pop. A "high water mark" is also derived form the batch as - [if batch = 1 then 100 else batch * 10]. This sets a limit after which new - elements will be [`Dropped] by {!push}. Set to [0] to disable batching. - Default [1]. + and ready to pop. Set to [0] to disable batching. It is required that + [batch >= 0]. Default [1]. + + @param high_watermark + the batch size limit after which new elements will be [`Dropped] by + {!push}. This prevents the queue from growing too fast for effective + transmission in case of signal floods. Default + [if batch = 1 then 100 else batch * 10]. @param timeout the time span after which a batch is ready to pop, whether or not it is From 31a712dd30108673f55a21fba0b7ebb36ce50408 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 30 Jun 2025 22:28:30 -0400 Subject: [PATCH 3/4] Allowing configuring start time --- src/client/batch.ml | 24 +++++++++++++----------- src/client/batch.mli | 13 +++++++++++-- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 005644c2..be4d7c46 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -7,23 +7,25 @@ type 'a t = { mutable start: Mtime.t; } -let high_watermark batch_size = +let default_high_watermark batch_size = if batch_size = 1 then 100 else batch_size * 10 -let make ?(batch = 1) ?(high_watermark = high_watermark batch) ?timeout () : _ t - = +let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = + let high_watermark = + match high_watermark with + | Some x -> x + | None -> default_high_watermark batch + in + let start = + match now with + | Some x -> x + | None -> Mtime_clock.now () + in assert (batch > 0); - { - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; - } + { size = 0; q = []; start; batch; timeout; high_watermark } let timeout_expired_ ~now self : bool = match self.timeout with diff --git a/src/client/batch.mli b/src/client/batch.mli index 39fdd4d4..def675b1 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -3,7 +3,12 @@ type 'a t val make : - ?batch:int -> ?high_watermark:int -> ?timeout:Mtime.span -> unit -> 'a t + ?batch:int -> + ?high_watermark:int -> + ?now:Mtime.t -> + ?timeout:Mtime.span -> + unit -> + 'a t (** [make ()] is a new batch @param batch @@ -17,6 +22,8 @@ val make : transmission in case of signal floods. Default [if batch = 1 then 100 else batch * 10]. + @param now the current time. Default [Mtime_clock.now ()]. + @param timeout the time span after which a batch is ready to pop, whether or not it is {b full}. *) @@ -35,7 +42,9 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option @param now the current time - @param force override the other batch conditions *) + @param force + override the other batch conditions, for when when we just want to emit + batches before exit or because the user asks for it *) val push : 'a t -> 'a list -> [ `Dropped | `Ok ] (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch From 8b488434592f37c2976d1fdeca58a42aa7dae5ab Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 30 Jun 2025 22:52:49 -0400 Subject: [PATCH 4/4] Correct size and optimize representation Since we need to traverse the elements added to count up the new size, we can use that pass to add the elements onto our FIFO queue, and then drain the queue in one last pass to reverse. IIUC, this should give us liner complexity of the batch retrieval. --- src/client/batch.ml | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index be4d7c46..ba17e090 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,6 +1,7 @@ type 'a t = { mutable size: int; - mutable q: 'a list list; + mutable q: 'a list; + (* The queue is a FIFO represented as a list in reverse order *) batch: int; (* Minimum size to batch before popping *) high_watermark: int; timeout: Mtime.span option; @@ -42,16 +43,23 @@ let ready_to_pop ~force ~now self = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = if ready_to_pop ~force ~now self then ( - let l = self.q in + assert (self.q <> []); + let batch = + (* Reverse the list to retrieve the FIFO order. *) + List.rev self.q + in self.q <- []; self.size <- 0; - assert (l <> []); - let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Some ls + Some batch ) else None -let push (self : _ t) x : [ `Dropped | `Ok ] = +(* Helper so we can count new elements and prepend them onto the existing [q] in + one pass. *) +let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = + elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) + +let push (self : _ t) elems : [ `Dropped | `Ok ] = if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped @@ -60,8 +68,9 @@ let push (self : _ t) x : [ `Dropped | `Ok ] = (* current batch starts now *) self.start <- Mtime_clock.now (); + let count, q' = append_with_count ~elems ~q:self.q in (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; + self.size <- self.size + count; + self.q <- q'; `Ok )