Factor batching logic out of the cohttp-lwt client

This will allow resuing the batching logic in the Eio client.
As a followup, we should refactor the ocurl client to use the same
batcher.
This commit is contained in:
Shon Feder 2025-06-27 21:48:55 -04:00
parent 4ee29d8504
commit ca31707395
No known key found for this signature in database
4 changed files with 118 additions and 96 deletions

View file

@ -6,6 +6,7 @@
module OT = Opentelemetry
module Config = Config
module Signal = Opentelemetry_client.Signal
module Batch = Opentelemetry_client.Batch
open Opentelemetry
open Common_
@ -164,88 +165,6 @@ end = struct
)
end
(** Batch of resources to be pushed later.
This type is thread-safe. *)
module Batch : sig
type 'a t
val push' : 'a t -> 'a -> unit
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled, this is true as
soon as {!is_empty} is false. If a timeout is provided for this batch,
then it will be ready if an element has been in it for at least the
timeout.
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *)
end = struct
type 'a t = {
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?batch ?timeout () : _ t =
Option.iter (fun b -> assert (b > 0)) batch;
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
{
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
then (
let l = self.q in
self.q <- [];
self.size <- 0;
assert (l <> []);
Some l
) else
None
let push (self : _ t) x : bool =
if self.size >= self.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
)
let push' self x = ignore (push self x : bool)
end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
from the program to whatever collector client we have. *)
module type EMITTER = sig
@ -280,13 +199,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
else
None
let batch_traces : Trace.resource_spans list Batch.t =
let batch_traces : Trace.resource_spans Batch.t =
Batch.make ?batch:config.batch_traces ?timeout ()
let batch_metrics : Metrics.resource_metrics list Batch.t =
let batch_metrics : Metrics.resource_metrics Batch.t =
Batch.make ?batch:config.batch_metrics ?timeout ()
let batch_logs : Logs.resource_logs list Batch.t =
let batch_logs : Logs.resource_logs Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()
let on_tick_cbs_ = Atomic.make (AList.make ())
@ -317,13 +236,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let send_logs_http client (l : Logs.resource_logs list) =
Conv.logs l |> send_http_ client ~url:config.url_logs
let maybe_pop ?force ~now batch =
Batch.pop_if_ready ?force ~now batch
|> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) [])
(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_metrics with
match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> Lwt.return false
| Some l ->
let batch = !gc_metrics @ l in
@ -332,14 +247,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
true
let emit_traces_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_traces with
match Batch.pop_if_ready ?force ~now batch_traces with
| None -> Lwt.return false
| Some l ->
let+ () = send_traces_http httpc l in
true
let emit_logs_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_logs with
match Batch.pop_if_ready ?force ~now batch_logs with
| None -> Lwt.return false
| Some l ->
let+ () = send_logs_http httpc l in
@ -381,9 +296,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
can also be several user threads that produce spans and call
the emit functions. *)
let push_to_batch b e =
match Batch.push b e with
| `Ok -> ()
| `Dropped -> Atomic.incr n_errors
let push_trace e =
let@ () = guard_exn_ "push trace" in
Batch.push' batch_traces e;
push_to_batch batch_traces e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_traces_maybe ~now httpc in
@ -392,7 +312,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let push_metrics e =
let@ () = guard_exn_ "push metrics" in
sample_gc_metrics_if_needed ();
Batch.push' batch_metrics e;
push_to_batch batch_metrics e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_metrics_maybe ~now httpc in
@ -400,7 +320,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let push_logs e =
let@ () = guard_exn_ "push logs" in
Batch.push' batch_logs e;
push_to_batch batch_logs e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_logs_maybe ~now httpc in

64
src/client/batch.ml Normal file
View file

@ -0,0 +1,64 @@
type 'a t = {
mutable size: int;
mutable q: 'a list list;
batch: int; (* Minimum size to batch before popping *)
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?(batch = 1) ?timeout () : _ t =
assert (batch > 0);
let high_watermark =
if batch = 1 then
100
else
batch * 10
in
{
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
(* Big enough to send a batch *)
let is_full_ self : bool = self.size >= self.batch
let ready_to_pop ~force ~now self =
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if ready_to_pop ~force ~now self then (
let l = self.q in
self.q <- [];
self.size <- 0;
assert (l <> []);
let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Some ls
) else
None
let push (self : _ t) x : [ `Dropped | `Ok ] =
if self.size >= self.high_watermark then
(* drop this to prevent queue from growing too fast *)
`Dropped
else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
`Ok
)

38
src/client/batch.mli Normal file
View file

@ -0,0 +1,38 @@
(** A thread-safe batch of resources to be popper when ready . *)
type 'a t
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** [make ()] is a new batch
@param batch
the number of elements after which the batch will be considered {b full},
and ready to pop. A "high water mark" is also derived form the batch as
[if batch = 1 then 100 else batch * 10]. This sets a limit after which new
elements will be [`Dropped] by {!push}. Set to [0] to disable batching.
Default [1].
@param timeout
the time span after which a batch is ready to pop, whether or not it is
{b full}. *)
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements
{!push}ed since the last batch, if the batch ready to be emitted.
A batch is ready to pop if it contains some elements and
- batching is disabled, and any elements have been batched, or batching was
enabled and at least [batch] elements have been pushed, or
- a [timeout] was provided, and more than a [timeout] span has passed since
the last pop was ready, or
- the pop is [force]d,
@param now the current time
@param force override the other batch conditions *)
val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch
[b], or [`Dropped] if the current size of the batch has exceeded the high water
mark determined by the [batch] argument to {!make}]. ) *)

View file

@ -1,5 +1,5 @@
(library
(name opentelemetry_client)
(public_name opentelemetry.client)
(libraries opentelemetry pbrt)
(libraries opentelemetry pbrt mtime mtime.clock.os)
(synopsis "Common types and logic shared between client implementations"))