ocurl backend: ticker thread

This commit is contained in:
Simon Cruanes 2022-03-25 11:06:34 -04:00
parent 078e8b416d
commit 2d220b20af
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 31 additions and 8 deletions

View file

@ -33,17 +33,19 @@ module Config = struct
batch_metrics: int option;
batch_timeout_ms: int;
thread: bool;
ticker_thread: bool;
}
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let {debug; url; batch_traces; batch_metrics;
batch_timeout_ms; thread} = self in
Format.fprintf out "{@[ debug=%B;@ url=%S;@ \
batch_traces=%a;@ batch_metrics=%a;@
batch_timeout_ms=%d; thread=%B @]}"
batch_timeout_ms; thread; ticker_thread} = self in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ \
batch_traces=%a;@ batch_metrics=%a;@ \
batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}"
debug url ppiopt batch_traces ppiopt batch_metrics
batch_timeout_ms thread
batch_timeout_ms thread ticker_thread
let make
?(debug= !debug_)
@ -52,9 +54,10 @@ module Config = struct
?(batch_metrics=None)
?(batch_timeout_ms=500)
?(thread=true)
?(ticker_thread=true)
() : t =
{ debug; url; batch_traces; batch_metrics; batch_timeout_ms;
thread; }
thread; ticker_thread; }
end
(* critical section for [f()] *)
@ -393,6 +396,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let m = Mutex.create() in
let cond = Condition.create() in
(* loop for the thread that processes events and sends them to collector *)
let bg_thread () =
while !continue do
let@ () = guard in
@ -413,8 +417,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
C.cleanup();
end
in
let _: Thread.t = Thread.create bg_thread () in
let _th_process_batches: Thread.t = Thread.create bg_thread () in
let wakeup () =
with_mutex_ m (fun () -> Condition.signal cond);
@ -429,6 +432,19 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
if batch_timeout() then wakeup()
in
if config.ticker_thread then (
(* thread that calls [tick()] regularly, to help enforce timeouts *)
let tick_thread () =
while true do
Thread.delay 0.5;
tick();
done
in
let _th_ticker : Thread.t = Thread.create tick_thread () in
()
);
let module M = struct
let push_trace e =
E_trace.push e;

View file

@ -46,6 +46,12 @@ module Config : sig
thread: bool;
(** Is there a background thread? Default [true] *)
ticker_thread: bool;
(** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector.
This option is ignored if [thread=false]. *)
}
val make :
@ -54,6 +60,7 @@ module Config : sig
?batch_metrics:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
?ticker_thread:bool ->
unit -> t
(** Make a configuration *)