ocurl backend: implement non-threaded mode, add config for batch timeout

emit batches, even if they're not full, after given timeout
This commit is contained in:
Simon Cruanes 2022-03-21 15:20:31 -04:00
parent c030bf9c21
commit fb0778805d
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 77 additions and 36 deletions

View file

@ -2,5 +2,6 @@
(library
(name opentelemetry_client_ocurl)
(public_name opentelemetry-client-ocurl)
(libraries opentelemetry curl ocaml-protoc threads mtime mtime.clock.os))
(libraries opentelemetry curl ocaml-protoc threads
atomic mtime mtime.clock.os))

View file

@ -29,25 +29,30 @@ module Config = struct
url: string;
batch_traces: int option;
batch_metrics: int option;
batch_timeout_ms: int;
thread: bool;
}
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let {debug; url; batch_traces; batch_metrics; thread} = self in
let {debug; url; batch_traces; batch_metrics;
batch_timeout_ms; thread} = self in
Format.fprintf out "{@[ debug=%B;@ url=%S;@ \
batch_traces=%a;@ batch_metrics=%a;@ thread=%B @]}"
batch_traces=%a;@ batch_metrics=%a;@
batch_timeout_ms=%d; thread=%B @]}"
debug url ppiopt batch_traces ppiopt batch_metrics
thread
batch_timeout_ms thread
let make
?(debug= !debug_)
?(url= get_url())
?(batch_traces=Some 400)
?(batch_metrics=None)
?(batch_timeout_ms=500)
?(thread=true)
() : t =
{ debug; url; batch_traces; batch_metrics; thread; }
{ debug; url; batch_traces; batch_metrics; batch_timeout_ms;
thread; }
end
(* critical section for [f()] *)
@ -283,7 +288,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let encoder = Pbrt.Encoder.create() in
let emit_metrics (module C:CURL) (l:(Metrics.resource_metrics list*over_cb) list) =
let ((module C) as curl) = (module Curl() : CURL) in
let emit_metrics (l:(Metrics.resource_metrics list*over_cb) list) =
Pbrt.Encoder.reset encoder;
let resource_metrics =
List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in
@ -302,7 +309,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
List.iter (fun (_,over) -> over()) l;
in
let emit_traces (module C: CURL) (l:(Trace.resource_spans list * over_cb) list) =
let emit_traces (l:(Trace.resource_spans list * over_cb) list) =
Pbrt.Encoder.reset encoder;
let resource_spans =
List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in
@ -320,42 +327,49 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
List.iter (fun (_,over) -> over()) l;
in
let last_wakeup = Atomic.make (Mtime_clock.now()) in
let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in
let batch_timeout() : bool =
let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in
Mtime.Span.compare elapsed timeout >= 0
in
let emit_metrics ?(force=false) () : bool =
if (force && not (E_metrics.is_empty())) ||
(not force && E_metrics.is_big_enough ()) then (
let batch = ref [] in
E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
emit_metrics !batch;
Atomic.set last_wakeup (Mtime_clock.now());
true
) else false
in
let emit_traces ?(force=false) () : bool =
if (force && not (E_trace.is_empty())) ||
(not force && E_trace.is_big_enough ()) then (
let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch);
emit_traces !batch;
Atomic.set last_wakeup (Mtime_clock.now());
true
) else false
in
let emit_all_force () =
ignore (emit_traces ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool);
in
if config.thread then (
begin
let m = Mutex.create() in
set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m);
end;
let module C = Curl() in
let ((module C) as curl) = (module Curl() : CURL) in
let m = Mutex.create() in
let cond = Condition.create() in
let last_wakeup = ref (Mtime_clock.now()) in
(* TODO: move this into config *)
let batch_timeout() : bool =
let elapsed = Mtime.span (Mtime_clock.now()) !last_wakeup in
Mtime.Span.compare elapsed Mtime.Span.(200 * ms) >= 0
in
let emit_metrics ?(force=false) () : bool =
if (force && not (E_metrics.is_empty())) ||
(not force && E_metrics.is_big_enough ()) then (
let batch = ref [] in
E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
emit_metrics (module C) !batch;
true
) else false
in
let emit_traces ?(force=false) () : bool =
if (force && not (E_trace.is_empty())) ||
(not force && E_trace.is_big_enough ()) then (
let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch);
emit_traces (module C) !batch;
true
) else false
in
let bg_thread () =
while !continue do
@ -376,7 +390,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let _: Thread.t = Thread.create bg_thread () in
let wakeup () =
last_wakeup := Mtime_clock.now();
with_mutex_ m (fun () -> Condition.signal cond);
Thread.yield()
in
@ -398,7 +411,27 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
end in
(module M)
) else (
assert false
on_metrics_full (fun () ->
ignore (emit_metrics () : bool));
on_trace_full (fun () ->
ignore (emit_traces () : bool));
let cleanup () =
emit_all_force();
C.cleanup();
in
let module M = struct
let push_trace e ~over =
E_trace.push (e,over);
if batch_timeout() then emit_all_force()
let push_metrics e ~over =
E_metrics.push (e,over);
if batch_timeout() then emit_all_force()
let cleanup = cleanup
end in
(module M)
)
module Backend(Arg : sig val config : Config.t end)()

View file

@ -38,6 +38,12 @@ module Config : sig
Default [None].
*)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
thread: bool;
(** Is there a background thread? Default [true] *)
}
@ -46,6 +52,7 @@ module Config : sig
?debug:bool -> ?url:string ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
unit -> t
(** Make a configuration *)