ocurl: add ticker_interval_ms config

ticker interval will affect how often metrics are emitted,
this doesn't need be related to batch timeouts.
This commit is contained in:
Simon Cruanes 2023-12-20 15:57:53 -05:00
parent d4186f64f4
commit f0310530a3
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 22 additions and 11 deletions

View file

@ -7,6 +7,7 @@ type t = {
batch_timeout_ms: int; batch_timeout_ms: int;
bg_threads: int; bg_threads: int;
ticker_thread: bool; ticker_thread: bool;
ticker_interval_ms: int;
self_trace: bool; self_trace: bool;
} }
@ -20,19 +21,20 @@ let pp out self =
batch_timeout_ms; batch_timeout_ms;
bg_threads; bg_threads;
ticker_thread; ticker_thread;
ticker_interval_ms;
self_trace; self_trace;
} = } =
self self
in in
Format.fprintf out Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \ "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
ticker_thread=%B;@ self_trace=%B @]}" ticker_thread=%B;@ ticker_interval_ms=%d;@ self_trace=%B @]}"
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
self_trace ticker_interval_ms self_trace
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) ?(batch_timeout_ms = 2_000) ?(bg_threads = 4) ?(ticker_thread = true)
?(self_trace = true) () : t = ?(ticker_interval_ms = 500) ?(self_trace = true) () : t =
let bg_threads = max 2 (min bg_threads 32) in let bg_threads = max 2 (min bg_threads 32) in
{ {
debug; debug;
@ -41,5 +43,6 @@ let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
batch_timeout_ms; batch_timeout_ms;
bg_threads; bg_threads;
ticker_thread; ticker_thread;
ticker_interval_ms;
self_trace; self_trace;
} }

View file

@ -12,12 +12,18 @@ type t = private {
(** Number of milliseconds after which we will emit a batch, even (** Number of milliseconds after which we will emit a batch, even
incomplete. incomplete.
Note that the batch might take longer than that, because this is Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *) only checked when a new event occurs or when a tick
is emitted. Default 2_000. *)
bg_threads: int; bg_threads: int;
(** Are there background threads, and how many? Default [4] *) (** Are there background threads, and how many? Default [4] *)
ticker_thread: bool; ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should (** If true, start a thread that regularly checks if signals should
be sent to the collector. Default [true] *) be sent to the collector. Default [true] *)
ticker_interval_ms: int;
(** Interval for ticker thread, in milliseconds. This is
only useful if [ticker_thread] is [true].
Default 500.
@since NEXT_RELEASE *)
self_trace: bool; self_trace: bool;
(** If true, the OTEL library will also emit its own spans. (** If true, the OTEL library will also emit its own spans.
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
@ -34,6 +40,7 @@ val make :
?batch_timeout_ms:int -> ?batch_timeout_ms:int ->
?bg_threads:int -> ?bg_threads:int ->
?ticker_thread:bool -> ?ticker_thread:bool ->
?ticker_interval_ms:int ->
?self_trace:bool -> ?self_trace:bool ->
unit -> unit ->
t t

View file

@ -370,14 +370,14 @@ end = struct
| Event.E_metric m -> Batch.push batches.metrics m | Event.E_metric m -> Batch.push batches.metrics m
| Event.E_trace tr -> Batch.push batches.traces tr | Event.E_trace tr -> Batch.push batches.traces tr
| Event.E_logs logs -> Batch.push batches.logs logs | Event.E_logs logs -> Batch.push batches.logs logs
| Event.E_tick -> () | Event.E_tick ->
(* the only impact of "tick" is that it wakes us up regularly *)
()
| Event.E_flush_all -> must_flush_all := true | Event.E_flush_all -> must_flush_all := true
in in
while not (Queue.is_empty local_q) do Queue.iter process_ev local_q;
let ev = Queue.pop local_q in Queue.clear local_q;
process_ev ev
done;
if !must_flush_all then ( if !must_flush_all then (
if Batch.len batches.metrics > 0 then send_metrics (); if Batch.len batches.metrics > 0 then send_metrics ();
@ -545,7 +545,8 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
Atomic.set Self_trace.enabled config.self_trace; Atomic.set Self_trace.enabled config.self_trace;
if config.ticker_thread then ( if config.ticker_thread then (
let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in (* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
); );