mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-09 12:23:32 -04:00
feat batch: get rid of Mutex
this should result in lower overhead for single threaded situations such as lwt or eio.
This commit is contained in:
parent
8b06ed208b
commit
a602a5c714
1 changed files with 65 additions and 38 deletions
|
|
@ -1,14 +1,18 @@
|
||||||
open Opentelemetry_util
|
open Opentelemetry_util
|
||||||
|
module Otel = Opentelemetry
|
||||||
|
module A = Opentelemetry_atomic.Atomic
|
||||||
|
|
||||||
|
type 'a state = {
|
||||||
|
start: Mtime.t;
|
||||||
|
size: int;
|
||||||
|
q: 'a list; (** The queue is a FIFO represented as a list in reverse order *)
|
||||||
|
}
|
||||||
|
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
mutable size: int;
|
st: 'a state A.t;
|
||||||
mutable q: 'a list;
|
|
||||||
(** The queue is a FIFO represented as a list in reverse order *)
|
|
||||||
batch: int; (** Minimum size to batch before popping *)
|
batch: int; (** Minimum size to batch before popping *)
|
||||||
high_watermark: int; (** Size above which we start dropping signals *)
|
high_watermark: int; (** Size above which we start dropping signals *)
|
||||||
timeout: Mtime.span option;
|
timeout: Mtime.span option;
|
||||||
mutable start: Mtime.t;
|
|
||||||
mutex: Mutex.t;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let default_high_watermark batch_size =
|
let default_high_watermark batch_size =
|
||||||
|
|
@ -17,6 +21,8 @@ let default_high_watermark batch_size =
|
||||||
else
|
else
|
||||||
batch_size * 10
|
batch_size * 10
|
||||||
|
|
||||||
|
let _dummy_start = Mtime.min_stamp
|
||||||
|
|
||||||
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
|
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
|
||||||
let high_watermark =
|
let high_watermark =
|
||||||
match high_watermark with
|
match high_watermark with
|
||||||
|
|
@ -26,36 +32,48 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
|
||||||
let start =
|
let start =
|
||||||
match now with
|
match now with
|
||||||
| Some x -> x
|
| Some x -> x
|
||||||
| None -> Mtime_clock.now ()
|
| None -> _dummy_start
|
||||||
in
|
in
|
||||||
let mutex = Mutex.create () in
|
|
||||||
assert (batch > 0);
|
assert (batch > 0);
|
||||||
{ size = 0; q = []; start; batch; timeout; high_watermark; mutex }
|
{ st = A.make { size = 0; q = []; start }; batch; timeout; high_watermark }
|
||||||
|
|
||||||
let timeout_expired_ ~now self : bool =
|
let timeout_expired_ ~now ~timeout (self : _ state) : bool =
|
||||||
match self.timeout with
|
match timeout with
|
||||||
| Some t ->
|
| Some t ->
|
||||||
let elapsed = Mtime.span now self.start in
|
let elapsed = Mtime.span now self.start in
|
||||||
Mtime.Span.compare elapsed t >= 0
|
Mtime.Span.compare elapsed t >= 0
|
||||||
| None -> false
|
| None -> false
|
||||||
|
|
||||||
(* Big enough to send a batch *)
|
(* Big enough to send a batch *)
|
||||||
let is_full_ self : bool = self.size >= self.batch
|
let[@inline] is_full_ ~batch (self : _ state) : bool = self.size >= batch
|
||||||
|
|
||||||
let ready_to_pop ~force ~now self =
|
let[@inline] ready_to_pop_ ~force ~now ~batch ~timeout (self : _ state) =
|
||||||
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
|
self.size > 0
|
||||||
|
&& (force || is_full_ ~batch self || timeout_expired_ ~now ~timeout self)
|
||||||
|
|
||||||
|
let[@inline] atomic_update_loop_ (type res) (self : _ t)
|
||||||
|
(f : 'a state -> 'a state * res) : res =
|
||||||
|
let exception Return of res in
|
||||||
|
try
|
||||||
|
while true do
|
||||||
|
let st = A.get self.st in
|
||||||
|
let new_st, res = f st in
|
||||||
|
if A.compare_and_set self.st st new_st then raise_notrace (Return res)
|
||||||
|
done
|
||||||
|
with Return res -> res
|
||||||
|
|
||||||
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
||||||
let rev_batch_opt =
|
let rev_batch_opt =
|
||||||
Util_mutex.protect self.mutex @@ fun () ->
|
atomic_update_loop_ self @@ fun state ->
|
||||||
if ready_to_pop ~force ~now self then (
|
let timeout = self.timeout in
|
||||||
assert (self.q <> []);
|
let batch = self.batch in
|
||||||
let batch = self.q in
|
if ready_to_pop_ ~force ~now ~batch ~timeout state then (
|
||||||
self.q <- [];
|
assert (state.q <> []);
|
||||||
self.size <- 0;
|
let batch = state.q in
|
||||||
Some batch
|
let new_st = { q = []; size = 0; start = _dummy_start } in
|
||||||
|
new_st, Some batch
|
||||||
) else
|
) else
|
||||||
None
|
state, None
|
||||||
in
|
in
|
||||||
match rev_batch_opt with
|
match rev_batch_opt with
|
||||||
| None -> None
|
| None -> None
|
||||||
|
|
@ -63,27 +81,36 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
||||||
(* Reverse the list to retrieve the FIFO order. *)
|
(* Reverse the list to retrieve the FIFO order. *)
|
||||||
Some (List.rev batch)
|
Some (List.rev batch)
|
||||||
|
|
||||||
let rec push_unprotected (self : _ t) ~(elems : _ list) : unit =
|
let[@inline] push_unprotected_ (self : _ state) (elems : _ list) : _ state =
|
||||||
match elems with
|
{
|
||||||
| [] -> ()
|
self with
|
||||||
| x :: xs ->
|
size = self.size + List.length elems;
|
||||||
self.q <- x :: self.q;
|
q = List.rev_append elems self.q;
|
||||||
self.size <- 1 + self.size;
|
}
|
||||||
push_unprotected self ~elems:xs
|
|
||||||
|
|
||||||
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
||||||
Util_mutex.protect self.mutex @@ fun () ->
|
if elems = [] then
|
||||||
if self.size >= self.high_watermark then
|
`Ok `Ok
|
||||||
(* drop this to prevent queue from growing too fast *)
|
|
||||||
`Dropped
|
|
||||||
else (
|
else (
|
||||||
if self.size = 0 && Option.is_some self.timeout then
|
let now = lazy (Mtime_clock.now ()) in
|
||||||
(* current batch starts now *)
|
atomic_update_loop_ self @@ fun state ->
|
||||||
self.start <- Mtime_clock.now ();
|
if state.size >= self.high_watermark then
|
||||||
|
(* drop this to prevent queue from growing too fast *)
|
||||||
|
state, `Dropped
|
||||||
|
else (
|
||||||
|
let state =
|
||||||
|
if state.size = 0 && Option.is_some self.timeout then
|
||||||
|
(* current batch starts now *)
|
||||||
|
{ state with start = Lazy.force now }
|
||||||
|
else
|
||||||
|
state
|
||||||
|
in
|
||||||
|
|
||||||
(* add to queue *)
|
(* add to queue *)
|
||||||
push_unprotected self ~elems;
|
let state = push_unprotected_ state elems in
|
||||||
`Ok
|
|
||||||
|
state, `Ok
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
|
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue