mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 20:07:55 -04:00
Merge pull request #97 from shonfeder/factor-out-lwt-batching
Factor batching logic out of the cohttp-lwt client
This commit is contained in:
commit
841d223ed2
4 changed files with 144 additions and 96 deletions
|
|
@ -6,6 +6,7 @@
|
||||||
module OT = Opentelemetry
|
module OT = Opentelemetry
|
||||||
module Config = Config
|
module Config = Config
|
||||||
module Signal = Opentelemetry_client.Signal
|
module Signal = Opentelemetry_client.Signal
|
||||||
|
module Batch = Opentelemetry_client.Batch
|
||||||
open Opentelemetry
|
open Opentelemetry
|
||||||
open Common_
|
open Common_
|
||||||
|
|
||||||
|
|
@ -164,88 +165,6 @@ end = struct
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
(** Batch of resources to be pushed later.
|
|
||||||
|
|
||||||
This type is thread-safe. *)
|
|
||||||
module Batch : sig
|
|
||||||
type 'a t
|
|
||||||
|
|
||||||
val push' : 'a t -> 'a -> unit
|
|
||||||
|
|
||||||
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
|
|
||||||
(** Is the batch ready to be emitted? If batching is disabled, this is true as
|
|
||||||
soon as {!is_empty} is false. If a timeout is provided for this batch,
|
|
||||||
then it will be ready if an element has been in it for at least the
|
|
||||||
timeout.
|
|
||||||
@param now passed to implement timeout *)
|
|
||||||
|
|
||||||
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
|
|
||||||
(** Create a new batch *)
|
|
||||||
end = struct
|
|
||||||
type 'a t = {
|
|
||||||
mutable size: int;
|
|
||||||
mutable q: 'a list;
|
|
||||||
batch: int option;
|
|
||||||
high_watermark: int;
|
|
||||||
timeout: Mtime.span option;
|
|
||||||
mutable start: Mtime.t;
|
|
||||||
}
|
|
||||||
|
|
||||||
let make ?batch ?timeout () : _ t =
|
|
||||||
Option.iter (fun b -> assert (b > 0)) batch;
|
|
||||||
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
|
|
||||||
{
|
|
||||||
size = 0;
|
|
||||||
start = Mtime_clock.now ();
|
|
||||||
q = [];
|
|
||||||
batch;
|
|
||||||
timeout;
|
|
||||||
high_watermark;
|
|
||||||
}
|
|
||||||
|
|
||||||
let timeout_expired_ ~now self : bool =
|
|
||||||
match self.timeout with
|
|
||||||
| Some t ->
|
|
||||||
let elapsed = Mtime.span now self.start in
|
|
||||||
Mtime.Span.compare elapsed t >= 0
|
|
||||||
| None -> false
|
|
||||||
|
|
||||||
let is_full_ self : bool =
|
|
||||||
match self.batch with
|
|
||||||
| None -> self.size > 0
|
|
||||||
| Some b -> self.size >= b
|
|
||||||
|
|
||||||
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
|
||||||
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
|
|
||||||
then (
|
|
||||||
let l = self.q in
|
|
||||||
self.q <- [];
|
|
||||||
self.size <- 0;
|
|
||||||
assert (l <> []);
|
|
||||||
Some l
|
|
||||||
) else
|
|
||||||
None
|
|
||||||
|
|
||||||
let push (self : _ t) x : bool =
|
|
||||||
if self.size >= self.high_watermark then (
|
|
||||||
(* drop this to prevent queue from growing too fast *)
|
|
||||||
Atomic.incr n_dropped;
|
|
||||||
true
|
|
||||||
) else (
|
|
||||||
if self.size = 0 && Option.is_some self.timeout then
|
|
||||||
(* current batch starts now *)
|
|
||||||
self.start <- Mtime_clock.now ();
|
|
||||||
|
|
||||||
(* add to queue *)
|
|
||||||
self.size <- 1 + self.size;
|
|
||||||
self.q <- x :: self.q;
|
|
||||||
let ready = is_full_ self in
|
|
||||||
ready
|
|
||||||
)
|
|
||||||
|
|
||||||
let push' self x = ignore (push self x : bool)
|
|
||||||
end
|
|
||||||
|
|
||||||
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
|
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
|
||||||
from the program to whatever collector client we have. *)
|
from the program to whatever collector client we have. *)
|
||||||
module type EMITTER = sig
|
module type EMITTER = sig
|
||||||
|
|
@ -280,13 +199,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
|
||||||
else
|
else
|
||||||
None
|
None
|
||||||
|
|
||||||
let batch_traces : Trace.resource_spans list Batch.t =
|
let batch_traces : Trace.resource_spans Batch.t =
|
||||||
Batch.make ?batch:config.batch_traces ?timeout ()
|
Batch.make ?batch:config.batch_traces ?timeout ()
|
||||||
|
|
||||||
let batch_metrics : Metrics.resource_metrics list Batch.t =
|
let batch_metrics : Metrics.resource_metrics Batch.t =
|
||||||
Batch.make ?batch:config.batch_metrics ?timeout ()
|
Batch.make ?batch:config.batch_metrics ?timeout ()
|
||||||
|
|
||||||
let batch_logs : Logs.resource_logs list Batch.t =
|
let batch_logs : Logs.resource_logs Batch.t =
|
||||||
Batch.make ?batch:config.batch_logs ?timeout ()
|
Batch.make ?batch:config.batch_logs ?timeout ()
|
||||||
|
|
||||||
let on_tick_cbs_ = Atomic.make (AList.make ())
|
let on_tick_cbs_ = Atomic.make (AList.make ())
|
||||||
|
|
@ -317,13 +236,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
|
||||||
let send_logs_http client (l : Logs.resource_logs list) =
|
let send_logs_http client (l : Logs.resource_logs list) =
|
||||||
Conv.logs l |> send_http_ client ~url:config.url_logs
|
Conv.logs l |> send_http_ client ~url:config.url_logs
|
||||||
|
|
||||||
let maybe_pop ?force ~now batch =
|
|
||||||
Batch.pop_if_ready ?force ~now batch
|
|
||||||
|> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) [])
|
|
||||||
|
|
||||||
(* emit metrics, if the batch is full or timeout lapsed *)
|
(* emit metrics, if the batch is full or timeout lapsed *)
|
||||||
let emit_metrics_maybe ~now ?force httpc : bool Lwt.t =
|
let emit_metrics_maybe ~now ?force httpc : bool Lwt.t =
|
||||||
match maybe_pop ?force ~now batch_metrics with
|
match Batch.pop_if_ready ?force ~now batch_metrics with
|
||||||
| None -> Lwt.return false
|
| None -> Lwt.return false
|
||||||
| Some l ->
|
| Some l ->
|
||||||
let batch = !gc_metrics @ l in
|
let batch = !gc_metrics @ l in
|
||||||
|
|
@ -332,14 +247,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
|
||||||
true
|
true
|
||||||
|
|
||||||
let emit_traces_maybe ~now ?force httpc : bool Lwt.t =
|
let emit_traces_maybe ~now ?force httpc : bool Lwt.t =
|
||||||
match maybe_pop ?force ~now batch_traces with
|
match Batch.pop_if_ready ?force ~now batch_traces with
|
||||||
| None -> Lwt.return false
|
| None -> Lwt.return false
|
||||||
| Some l ->
|
| Some l ->
|
||||||
let+ () = send_traces_http httpc l in
|
let+ () = send_traces_http httpc l in
|
||||||
true
|
true
|
||||||
|
|
||||||
let emit_logs_maybe ~now ?force httpc : bool Lwt.t =
|
let emit_logs_maybe ~now ?force httpc : bool Lwt.t =
|
||||||
match maybe_pop ?force ~now batch_logs with
|
match Batch.pop_if_ready ?force ~now batch_logs with
|
||||||
| None -> Lwt.return false
|
| None -> Lwt.return false
|
||||||
| Some l ->
|
| Some l ->
|
||||||
let+ () = send_logs_http httpc l in
|
let+ () = send_logs_http httpc l in
|
||||||
|
|
@ -381,9 +296,14 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
|
||||||
can also be several user threads that produce spans and call
|
can also be several user threads that produce spans and call
|
||||||
the emit functions. *)
|
the emit functions. *)
|
||||||
|
|
||||||
|
let push_to_batch b e =
|
||||||
|
match Batch.push b e with
|
||||||
|
| `Ok -> ()
|
||||||
|
| `Dropped -> Atomic.incr n_errors
|
||||||
|
|
||||||
let push_trace e =
|
let push_trace e =
|
||||||
let@ () = guard_exn_ "push trace" in
|
let@ () = guard_exn_ "push trace" in
|
||||||
Batch.push' batch_traces e;
|
push_to_batch batch_traces e;
|
||||||
let now = Mtime_clock.now () in
|
let now = Mtime_clock.now () in
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let+ (_ : bool) = emit_traces_maybe ~now httpc in
|
let+ (_ : bool) = emit_traces_maybe ~now httpc in
|
||||||
|
|
@ -392,7 +312,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
|
||||||
let push_metrics e =
|
let push_metrics e =
|
||||||
let@ () = guard_exn_ "push metrics" in
|
let@ () = guard_exn_ "push metrics" in
|
||||||
sample_gc_metrics_if_needed ();
|
sample_gc_metrics_if_needed ();
|
||||||
Batch.push' batch_metrics e;
|
push_to_batch batch_metrics e;
|
||||||
let now = Mtime_clock.now () in
|
let now = Mtime_clock.now () in
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let+ (_ : bool) = emit_metrics_maybe ~now httpc in
|
let+ (_ : bool) = emit_metrics_maybe ~now httpc in
|
||||||
|
|
@ -400,7 +320,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
|
||||||
|
|
||||||
let push_logs e =
|
let push_logs e =
|
||||||
let@ () = guard_exn_ "push logs" in
|
let@ () = guard_exn_ "push logs" in
|
||||||
Batch.push' batch_logs e;
|
push_to_batch batch_logs e;
|
||||||
let now = Mtime_clock.now () in
|
let now = Mtime_clock.now () in
|
||||||
Lwt.async (fun () ->
|
Lwt.async (fun () ->
|
||||||
let+ (_ : bool) = emit_logs_maybe ~now httpc in
|
let+ (_ : bool) = emit_logs_maybe ~now httpc in
|
||||||
|
|
|
||||||
76
src/client/batch.ml
Normal file
76
src/client/batch.ml
Normal file
|
|
@ -0,0 +1,76 @@
|
||||||
|
type 'a t = {
|
||||||
|
mutable size: int;
|
||||||
|
mutable q: 'a list;
|
||||||
|
(* The queue is a FIFO represented as a list in reverse order *)
|
||||||
|
batch: int; (* Minimum size to batch before popping *)
|
||||||
|
high_watermark: int;
|
||||||
|
timeout: Mtime.span option;
|
||||||
|
mutable start: Mtime.t;
|
||||||
|
}
|
||||||
|
|
||||||
|
let default_high_watermark batch_size =
|
||||||
|
if batch_size = 1 then
|
||||||
|
100
|
||||||
|
else
|
||||||
|
batch_size * 10
|
||||||
|
|
||||||
|
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
|
||||||
|
let high_watermark =
|
||||||
|
match high_watermark with
|
||||||
|
| Some x -> x
|
||||||
|
| None -> default_high_watermark batch
|
||||||
|
in
|
||||||
|
let start =
|
||||||
|
match now with
|
||||||
|
| Some x -> x
|
||||||
|
| None -> Mtime_clock.now ()
|
||||||
|
in
|
||||||
|
assert (batch > 0);
|
||||||
|
{ size = 0; q = []; start; batch; timeout; high_watermark }
|
||||||
|
|
||||||
|
let timeout_expired_ ~now self : bool =
|
||||||
|
match self.timeout with
|
||||||
|
| Some t ->
|
||||||
|
let elapsed = Mtime.span now self.start in
|
||||||
|
Mtime.Span.compare elapsed t >= 0
|
||||||
|
| None -> false
|
||||||
|
|
||||||
|
(* Big enough to send a batch *)
|
||||||
|
let is_full_ self : bool = self.size >= self.batch
|
||||||
|
|
||||||
|
let ready_to_pop ~force ~now self =
|
||||||
|
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
|
||||||
|
|
||||||
|
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
||||||
|
if ready_to_pop ~force ~now self then (
|
||||||
|
assert (self.q <> []);
|
||||||
|
let batch =
|
||||||
|
(* Reverse the list to retrieve the FIFO order. *)
|
||||||
|
List.rev self.q
|
||||||
|
in
|
||||||
|
self.q <- [];
|
||||||
|
self.size <- 0;
|
||||||
|
Some batch
|
||||||
|
) else
|
||||||
|
None
|
||||||
|
|
||||||
|
(* Helper so we can count new elements and prepend them onto the existing [q] in
|
||||||
|
one pass. *)
|
||||||
|
let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list =
|
||||||
|
elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q)
|
||||||
|
|
||||||
|
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
||||||
|
if self.size >= self.high_watermark then
|
||||||
|
(* drop this to prevent queue from growing too fast *)
|
||||||
|
`Dropped
|
||||||
|
else (
|
||||||
|
if self.size = 0 && Option.is_some self.timeout then
|
||||||
|
(* current batch starts now *)
|
||||||
|
self.start <- Mtime_clock.now ();
|
||||||
|
|
||||||
|
let count, q' = append_with_count ~elems ~q:self.q in
|
||||||
|
(* add to queue *)
|
||||||
|
self.size <- self.size + count;
|
||||||
|
self.q <- q';
|
||||||
|
`Ok
|
||||||
|
)
|
||||||
52
src/client/batch.mli
Normal file
52
src/client/batch.mli
Normal file
|
|
@ -0,0 +1,52 @@
|
||||||
|
(** A thread-safe batch of resources to be popper when ready . *)
|
||||||
|
|
||||||
|
type 'a t
|
||||||
|
|
||||||
|
val make :
|
||||||
|
?batch:int ->
|
||||||
|
?high_watermark:int ->
|
||||||
|
?now:Mtime.t ->
|
||||||
|
?timeout:Mtime.span ->
|
||||||
|
unit ->
|
||||||
|
'a t
|
||||||
|
(** [make ()] is a new batch
|
||||||
|
|
||||||
|
@param batch
|
||||||
|
the number of elements after which the batch will be considered {b full},
|
||||||
|
and ready to pop. Set to [0] to disable batching. It is required that
|
||||||
|
[batch >= 0]. Default [1].
|
||||||
|
|
||||||
|
@param high_watermark
|
||||||
|
the batch size limit after which new elements will be [`Dropped] by
|
||||||
|
{!push}. This prevents the queue from growing too fast for effective
|
||||||
|
transmission in case of signal floods. Default
|
||||||
|
[if batch = 1 then 100 else batch * 10].
|
||||||
|
|
||||||
|
@param now the current time. Default [Mtime_clock.now ()].
|
||||||
|
|
||||||
|
@param timeout
|
||||||
|
the time span after which a batch is ready to pop, whether or not it is
|
||||||
|
{b full}. *)
|
||||||
|
|
||||||
|
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
|
||||||
|
(** [pop_if_ready ~now b] is [Some xs], where is [xs] includes all the elements
|
||||||
|
{!push}ed since the last batch, if the batch ready to be emitted.
|
||||||
|
|
||||||
|
A batch is ready to pop if it contains some elements and
|
||||||
|
|
||||||
|
- batching is disabled, and any elements have been batched, or batching was
|
||||||
|
enabled and at least [batch] elements have been pushed, or
|
||||||
|
- a [timeout] was provided, and more than a [timeout] span has passed since
|
||||||
|
the last pop was ready, or
|
||||||
|
- the pop is [force]d,
|
||||||
|
|
||||||
|
@param now the current time
|
||||||
|
|
||||||
|
@param force
|
||||||
|
override the other batch conditions, for when when we just want to emit
|
||||||
|
batches before exit or because the user asks for it *)
|
||||||
|
|
||||||
|
val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
|
||||||
|
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch
|
||||||
|
[b], or [`Dropped] if the current size of the batch has exceeded the high water
|
||||||
|
mark determined by the [batch] argument to {!make}]. ) *)
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
(library
|
(library
|
||||||
(name opentelemetry_client)
|
(name opentelemetry_client)
|
||||||
(public_name opentelemetry.client)
|
(public_name opentelemetry.client)
|
||||||
(libraries opentelemetry pbrt)
|
(libraries opentelemetry pbrt mtime mtime.clock.os)
|
||||||
(synopsis "Common types and logic shared between client implementations"))
|
(synopsis "Common types and logic shared between client implementations"))
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue