mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-09 12:23:32 -04:00
add high watermark to ocurl client
This commit is contained in:
parent
9c3e2a7076
commit
6179c97e99
1 changed files with 15 additions and 4 deletions
|
|
@ -170,12 +170,14 @@ end = struct
|
|||
mutable size: int;
|
||||
mutable q: 'a list;
|
||||
batch: int option;
|
||||
high_watermark: int;
|
||||
timeout: Mtime.span option;
|
||||
mutable start: Mtime.t;
|
||||
}
|
||||
|
||||
let make ?batch ?timeout () : _ t =
|
||||
Option.iter (fun b -> assert (b > 0)) batch;
|
||||
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
|
||||
{
|
||||
lock = Mutex.create ();
|
||||
size = 0;
|
||||
|
|
@ -183,6 +185,7 @@ end = struct
|
|||
q = [];
|
||||
batch;
|
||||
timeout;
|
||||
high_watermark;
|
||||
}
|
||||
|
||||
let is_empty_ self = self.size = 0
|
||||
|
|
@ -219,11 +222,19 @@ end = struct
|
|||
let push (self : _ t) x : bool =
|
||||
let@ () = with_mutex_ self.lock in
|
||||
if self.size = 0 && Option.is_some self.timeout then
|
||||
(* current batch starts now *)
|
||||
self.start <- Mtime_clock.now ();
|
||||
self.size <- 1 + self.size;
|
||||
self.q <- x :: self.q;
|
||||
let ready = is_full_ self in
|
||||
ready
|
||||
if self.size >= self.high_watermark then (
|
||||
(* drop this to prevent queue from growing too fast *)
|
||||
Atomic.incr n_dropped;
|
||||
true
|
||||
) else (
|
||||
(* add to queue *)
|
||||
self.size <- 1 + self.size;
|
||||
self.q <- x :: self.q;
|
||||
let ready = is_full_ self in
|
||||
ready
|
||||
)
|
||||
|
||||
let push' self x = ignore (push self x : bool)
|
||||
end
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue