diff --git a/src/client/dune b/src/client/dune index 153744a8..11696169 100644 --- a/src/client/dune +++ b/src/client/dune @@ -2,5 +2,6 @@ (library (name opentelemetry_client_ocurl) (public_name opentelemetry-client-ocurl) - (libraries opentelemetry curl ocaml-protoc threads mtime mtime.clock.os)) + (libraries opentelemetry curl ocaml-protoc threads + atomic mtime mtime.clock.os)) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index a907735a..c3f61011 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -29,25 +29,30 @@ module Config = struct url: string; batch_traces: int option; batch_metrics: int option; + batch_timeout_ms: int; thread: bool; } let pp out self = let ppiopt = Format.pp_print_option Format.pp_print_int in - let {debug; url; batch_traces; batch_metrics; thread} = self in + let {debug; url; batch_traces; batch_metrics; + batch_timeout_ms; thread} = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ \ - batch_traces=%a;@ batch_metrics=%a;@ thread=%B @]}" + batch_traces=%a;@ batch_metrics=%a;@ + batch_timeout_ms=%d; thread=%B @]}" debug url ppiopt batch_traces ppiopt batch_metrics - thread + batch_timeout_ms thread let make ?(debug= !debug_) ?(url= get_url()) ?(batch_traces=Some 400) ?(batch_metrics=None) + ?(batch_timeout_ms=500) ?(thread=true) () : t = - { debug; url; batch_traces; batch_metrics; thread; } + { debug; url; batch_traces; batch_metrics; batch_timeout_ms; + thread; } end (* critical section for [f()] *) @@ -283,7 +288,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let encoder = Pbrt.Encoder.create() in - let emit_metrics (module C:CURL) (l:(Metrics.resource_metrics list*over_cb) list) = + let ((module C) as curl) = (module Curl() : CURL) in + + let emit_metrics (l:(Metrics.resource_metrics list*over_cb) list) = Pbrt.Encoder.reset encoder; let resource_metrics = List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in @@ -302,7 +309,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = List.iter (fun (_,over) -> over()) l; in - let emit_traces (module C: CURL) (l:(Trace.resource_spans list * over_cb) list) = + let emit_traces (l:(Trace.resource_spans list * over_cb) list) = Pbrt.Encoder.reset encoder; let resource_spans = List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in @@ -320,42 +327,49 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = List.iter (fun (_,over) -> over()) l; in + let last_wakeup = Atomic.make (Mtime_clock.now()) in + let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in + let batch_timeout() : bool = + let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in + Mtime.Span.compare elapsed timeout >= 0 + in + + let emit_metrics ?(force=false) () : bool = + if (force && not (E_metrics.is_empty())) || + (not force && E_metrics.is_big_enough ()) then ( + let batch = ref [] in + E_metrics.pop_iter_all (fun l -> batch := l :: !batch); + emit_metrics !batch; + Atomic.set last_wakeup (Mtime_clock.now()); + true + ) else false + in + let emit_traces ?(force=false) () : bool = + if (force && not (E_trace.is_empty())) || + (not force && E_trace.is_big_enough ()) then ( + let batch = ref [] in + E_trace.pop_iter_all (fun l -> batch := l :: !batch); + emit_traces !batch; + Atomic.set last_wakeup (Mtime_clock.now()); + true + ) else false + in + + let emit_all_force () = + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + in + if config.thread then ( begin let m = Mutex.create() in set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m); end; - let module C = Curl() in + let ((module C) as curl) = (module Curl() : CURL) in let m = Mutex.create() in let cond = Condition.create() in - let last_wakeup = ref (Mtime_clock.now()) in - - (* TODO: move this into config *) - let batch_timeout() : bool = - let elapsed = Mtime.span (Mtime_clock.now()) !last_wakeup in - Mtime.Span.compare elapsed Mtime.Span.(200 * ms) >= 0 - in - - let emit_metrics ?(force=false) () : bool = - if (force && not (E_metrics.is_empty())) || - (not force && E_metrics.is_big_enough ()) then ( - let batch = ref [] in - E_metrics.pop_iter_all (fun l -> batch := l :: !batch); - emit_metrics (module C) !batch; - true - ) else false - in - let emit_traces ?(force=false) () : bool = - if (force && not (E_trace.is_empty())) || - (not force && E_trace.is_big_enough ()) then ( - let batch = ref [] in - E_trace.pop_iter_all (fun l -> batch := l :: !batch); - emit_traces (module C) !batch; - true - ) else false - in let bg_thread () = while !continue do @@ -376,7 +390,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let _: Thread.t = Thread.create bg_thread () in let wakeup () = - last_wakeup := Mtime_clock.now(); with_mutex_ m (fun () -> Condition.signal cond); Thread.yield() in @@ -398,7 +411,27 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = end in (module M) ) else ( - assert false + + on_metrics_full (fun () -> + ignore (emit_metrics () : bool)); + on_trace_full (fun () -> + ignore (emit_traces () : bool)); + + let cleanup () = + emit_all_force(); + C.cleanup(); + in + + let module M = struct + let push_trace e ~over = + E_trace.push (e,over); + if batch_timeout() then emit_all_force() + let push_metrics e ~over = + E_metrics.push (e,over); + if batch_timeout() then emit_all_force() + let cleanup = cleanup + end in + (module M) ) module Backend(Arg : sig val config : Config.t end)() diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 7a631305..b8c9a902 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -38,6 +38,12 @@ module Config : sig Default [None]. *) + batch_timeout_ms: int; + (** Number of milliseconds after which we will emit a batch, even + incomplete. + Note that the batch might take longer than that, because this is + only checked when a new event occurs. Default 500. *) + thread: bool; (** Is there a background thread? Default [true] *) } @@ -46,6 +52,7 @@ module Config : sig ?debug:bool -> ?url:string -> ?batch_traces:int option -> ?batch_metrics:int option -> + ?batch_timeout_ms:int -> ?thread:bool -> unit -> t (** Make a configuration *)