From 124ba09b2bc454b3118448333f022fbc3d7849f3 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:01:41 -0400 Subject: [PATCH 01/16] feat: implement background thread, collector config, batching --- src/client/dune | 2 +- src/client/opentelemetry_client_ocurl.ml | 372 ++++++++++++++++++---- src/client/opentelemetry_client_ocurl.mli | 48 ++- src/opentelemetry.ml | 14 +- 4 files changed, 363 insertions(+), 73 deletions(-) diff --git a/src/client/dune b/src/client/dune index c4f4685d..153744a8 100644 --- a/src/client/dune +++ b/src/client/dune @@ -2,5 +2,5 @@ (library (name opentelemetry_client_ocurl) (public_name opentelemetry-client-ocurl) - (libraries opentelemetry curl ocaml-protoc)) + (libraries opentelemetry curl ocaml-protoc threads mtime mtime.clock.os)) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 4800c2d8..8a241b1f 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -23,42 +23,82 @@ let set_mutex ~lock ~unlock : unit = lock_ := lock; unlock_ := unlock +module Config = struct + type t = { + debug: bool; + url: string; + batch_traces: int option; + batch_metrics: int option; + thread: bool; + } + + let pp out self = + let ppiopt = Format.pp_print_option Format.pp_print_int in + let {debug; url; batch_traces; batch_metrics; thread} = self in + Format.fprintf out "{@[ debug=%B;@ url=%S;@ \ + batch_traces=%a;@ batch_metrics=%a;@ thread=%B @]}" + debug url ppiopt batch_traces ppiopt batch_metrics + thread + + let make + ?(debug= !debug_) + ?(url= get_url()) + ?(batch_traces=Some 400) + ?(batch_metrics=None) + ?(thread=true) + () : t = + { debug; url; batch_traces; batch_metrics; thread; } +end + +(* critical section for [f()] *) let[@inline] with_lock_ f = !lock_(); Fun.protect ~finally:!unlock_ f -let _init = lazy ( +let[@inline] with_mutex_ m f = + Mutex.lock m; + Fun.protect ~finally:(fun () -> Mutex.unlock m) f + +let _init_curl = lazy ( Curl.global_init Curl.CURLINIT_GLOBALALL; at_exit Curl.global_cleanup; ) -module Backend() : Opentelemetry.Collector.BACKEND = struct - let() = Lazy.force _init +type error = [ + | `Status of int * Opentelemetry.Proto.Status.status + | `Failure of string +] - (* TODO: use Curl.Multi, etc. *) +let report_err_ = function + | `Failure msg -> + Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg + | `Status (code, status) -> + Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." + code Proto.Status.pp_status status + +module type CURL = sig + val send : path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + val cleanup : unit -> unit +end + +(* create a curl client *) +module Curl() : CURL = struct + open Opentelemetry.Proto + let() = Lazy.force _init_curl - let encoder = Pbrt.Encoder.create() let buf_res = Buffer.create 256 - let rand_ = Random.State.make_self_init() + (* TODO: use Curl.Multi, etc. instead? *) (* http client *) let curl : Curl.t = Curl.init () let cleanup () = Curl.cleanup curl - open Opentelemetry.Proto - open Opentelemetry.Collector - - type error = [ - | `Status of int * Status.status - | `Failure of string - ] - (* TODO: use Curl multi *) (* send the content to the remote endpoint/path *) - let send_ ~path ~decode (bod:string) : ('a, error) result = + let send ~path ~decode (bod:string) : ('a, error) result = Curl.reset curl; if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); @@ -100,48 +140,52 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct ~code:(Int32.of_int code) ~message:(Bytes.unsafe_of_string msg) () in Error(`Status (code, status)) with e -> Error (`Failure (Printexc.to_string e)) +end - let report_err_ = function - | `Failure msg -> - Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status (code, status) -> - Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." - code Status.pp_status status +module type PUSH = sig + type elt + val push : elt -> unit + val is_empty : unit -> bool + val is_big_enough : unit -> bool + val pop_iter_all : (elt -> unit) -> unit +end - let send_trace : Trace_service.export_trace_service_request sender = { - send=fun tr ~over ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send trace %a@." Trace_service.pp_export_trace_service_request tr; - Pbrt.Encoder.reset encoder; - Trace_service.encode_export_trace_service_request tr encoder; - begin match - send_ ~path:"/v1/traces" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) - with - | Ok () -> () - | Error err -> report_err_ err - end; - over(); - ret() +(* queue of fixed size *) +module FQueue : sig + type 'a t + val create : dummy:'a -> int -> 'a t + val size : _ t -> int + val push : 'a t -> 'a -> unit + val pop_iter_all : 'a t -> ('a -> unit) -> unit +end = struct + type 'a t = { + arr: 'a array; + mutable i: int; } - let send_metrics : Metrics_service.export_metrics_service_request sender = { - send=fun m ~over ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send metrics %a@." Metrics_service.pp_export_metrics_service_request m; - Pbrt.Encoder.reset encoder; - Metrics_service.encode_export_metrics_service_request m encoder; - begin - match - send_ ~path:"/v1/metrics" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder); - with - | Ok () -> () - | Error err -> report_err_ err - end; - over(); - ret() - } + let create ~dummy n : _ t = + assert (n >= 1); + { arr=Array.make n dummy; + i=0; + } + + let[@inline] size self = self.i + + let push (self:_ t) x : unit = + assert (self.i < Array.length self.arr); + self.arr.(self.i) <- x; + self.i <- 1 + self.i + + let pop_iter_all (self: _ t) f = + for j=0 to self.i-1 do + f self.arr.(j) + done; + self.i <- 0 +end + +(* generate random IDs *) +module Gen_ids() = struct + let rand_ = Random.State.make_self_init() let rand_bytes_8 () : bytes = let@() = with_lock_ in @@ -171,15 +215,225 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct b end -let setup_ () = - let module B = Backend() in +(** Callback for when an event is properly sent to the collector *) +type over_cb = unit -> unit + +(** An emitter. This is used by {!Backend} below to forward traces/metrics/… + from the program to whatever collector client we have. *) +module type EMITTER = sig + open Opentelemetry.Proto + + val push_trace : Trace.resource_spans list -> over:over_cb -> unit + val push_metrics : Metrics.resource_metrics list -> over:over_cb -> unit + + val cleanup : unit -> unit +end + +type 'a push = (module PUSH with type elt = 'a) +type on_full_cb = (unit -> unit) + +(* make a "push" object, along with a setter for a callback to call when + it's ready to emit a batch *) +let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -> unit) = + let on_full: on_full_cb ref = ref ignore in + let push = + match batch with + | None -> + let r = ref None in + let module M = struct + type elt = a + let is_empty () = !r == None + let is_big_enough () = !r != None + let push x = + r := Some x; !on_full() + let pop_iter_all f = Option.iter f !r; r := None + end in + (module M : PUSH with type elt = a) + + | Some n -> + let q = FQueue.create ~dummy:(Obj.magic 0) (2 * n) in + let module M = struct + type elt = a + let is_empty () = FQueue.size q = 0 + let is_big_enough () = FQueue.size q >= n + let push x = + FQueue.push q x; + if FQueue.size q > n then ( + !on_full() + ) + let pop_iter_all f = FQueue.pop_iter_all q f + end in + (module M : PUSH with type elt = a) + + in + push, ((:=) on_full) + +let mk_emitter ~(config:Config.t) () : (module EMITTER) = + let open Proto in + + let continue = ref true in + + let ((module E_trace) : (Trace.resource_spans list * over_cb) push), on_trace_full = + mk_push ?batch:config.batch_traces () in + let ((module E_metrics) : (Metrics.resource_metrics list * over_cb) push), on_metrics_full = + mk_push ?batch:config.batch_metrics () in + + let encoder = Pbrt.Encoder.create() in + + let emit_metrics (module C:CURL) (l:(Metrics.resource_metrics list*over_cb) list) = + Pbrt.Encoder.reset encoder; + let resource_metrics = + List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in + Metrics_service.encode_export_metrics_service_request + (Metrics_service.default_export_metrics_service_request + ~resource_metrics ()) + encoder; + begin match + C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder) + with + | Ok () -> () + | Error err -> report_err_ err + end; + (* signal completion *) + List.iter (fun (_,over) -> over()) l; + in + + let emit_traces (module C: CURL) (l:(Trace.resource_spans list * over_cb) list) = + Pbrt.Encoder.reset encoder; + let resource_spans = + List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in + Trace_service.encode_export_trace_service_request + (Trace_service.default_export_trace_service_request ~resource_spans ()) + encoder; + begin match + C.send ~path:"/v1/traces" ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder) + with + | Ok () -> () + | Error err -> report_err_ err + end; + (* signal completion *) + List.iter (fun (_,over) -> over()) l; + in + + if config.thread then ( + begin + let m = Mutex.create() in + set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m); + end; + + let module C = Curl() in + + let m = Mutex.create() in + let cond = Condition.create() in + let last_wakeup = ref (Mtime_clock.now()) in + + (* TODO: move this into config *) + let batch_timeout() : bool = + let elapsed = Mtime.span (Mtime_clock.now()) !last_wakeup in + Mtime.Span.compare elapsed Mtime.Span.(200 * ms) >= 0 + in + + let emit_metrics ?(force=false) () : bool = + if (force && not (E_metrics.is_empty())) || + (not force && E_metrics.is_big_enough ()) then ( + let batch = ref [] in + E_metrics.pop_iter_all (fun l -> batch := l :: !batch); + emit_metrics (module C) !batch; + true + ) else false + in + let emit_traces ?(force=false) () : bool = + if (force && not (E_trace.is_empty())) || + (not force && E_trace.is_big_enough ()) then ( + let batch = ref [] in + E_trace.pop_iter_all (fun l -> batch := l :: !batch); + emit_traces (module C) !batch; + true + ) else false + in + + let bg_thread () = + while !continue do + if emit_metrics () then () + else if emit_traces () then () + else ( + (* wait *) + let@ () = with_mutex_ m in + Condition.wait cond m; + ) + done; + (* flush remaining events *) + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + C.cleanup(); + in + + let _: Thread.t = Thread.create bg_thread () in + + let wakeup () = + last_wakeup := Mtime_clock.now(); + with_mutex_ m (fun () -> Condition.signal cond) + in + + (* wake up if a batch is full *) + on_metrics_full wakeup; + on_trace_full wakeup; + + let module M = struct + let push_trace e ~over = + E_trace.push (e,over); + if batch_timeout() then wakeup() + let push_metrics e ~over = + E_metrics.push (e,over); + if batch_timeout() then wakeup() + let cleanup () = + continue := false; + with_mutex_ m (fun () -> Condition.broadcast cond) + end in + (module M) + ) else ( + assert false + ) + +module Backend(Arg : sig val config : Config.t end)() + : Opentelemetry.Collector.BACKEND += struct + include Gen_ids() + + include (val mk_emitter ~config:Arg.config ()) + + open Opentelemetry.Proto + open Opentelemetry.Collector + + let send_trace : Trace.resource_spans list sender = { + send=fun l ~over ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; + push_trace l ~over; + ret() + } + + let send_metrics : Metrics.resource_metrics list sender = { + send=fun m ~over ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + push_metrics m ~over; + ret() + } +end + +let setup_ ~(config:Config.t) () = + debug_ := config.debug; + let module B = Backend(struct let config=config end)() in Opentelemetry.Collector.backend := Some (module B); B.cleanup -let setup() = - let cleanup = setup_() in +let setup ?(config=Config.make()) () = + let cleanup = setup_ ~config () in at_exit cleanup -let with_setup f = - let cleanup = setup_() in +let with_setup ?(config=Config.make()) f = + let cleanup = setup_ ~config () in Fun.protect ~finally:cleanup f diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index d933a982..7a631305 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -11,8 +11,50 @@ val set_url : string -> unit or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit +(** Set a lock/unlock pair to protect the critical sections + of {!Opentelemetry.Collector.BACKEND} *) -val setup : unit -> unit -(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. *) +module Config : sig + type t = { + debug: bool; -val with_setup : (unit -> 'a) -> 'a + url: string; + (** Url of the endpoint. Default is "http://localhost:4318", + or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) + + batch_traces: int option; + (** Batch traces? If [Some i], then this produces batches of (at most) + [i] items. If [None], there is no batching. + + Note that traces and metrics are batched separately. + Default [Some 400]. + *) + + batch_metrics: int option; + (** Batch metrics? If [Some i], then this produces batches of (at most) + [i] items. If [None], there is no batching. + + Note that traces and metrics are batched separately. + Default [None]. + *) + + thread: bool; + (** Is there a background thread? Default [true] *) + } + + val make : + ?debug:bool -> ?url:string -> + ?batch_traces:int option -> + ?batch_metrics:int option -> + ?thread:bool -> + unit -> t + (** Make a configuration *) + + val pp : Format.formatter -> t -> unit +end + +val setup : ?config:Config.t -> unit -> unit +(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. + @param config configuration to use *) + +val with_setup : ?config:Config.t -> (unit -> 'a) -> 'a diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 28cf5126..7a0bb4b1 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -79,9 +79,9 @@ module Collector = struct (** Collector client interface. *) module type BACKEND = sig - val send_trace : Trace_service.export_trace_service_request sender + val send_trace : Trace.resource_spans list sender - val send_metrics : Metrics_service.export_metrics_service_request sender + val send_metrics : Metrics.resource_metrics list sender val rand_bytes_16 : unit -> bytes (** Generate 16 bytes of random data *) @@ -102,18 +102,12 @@ module Collector = struct let send_trace (l:Trace.resource_spans list) ~over ~ret = match !backend with | None -> over(); ret() - | Some (module B) -> - let ev = Trace_service.default_export_trace_service_request - ~resource_spans:l () in - B.send_trace.send ev ~over ~ret + | Some (module B) -> B.send_trace.send l ~over ~ret let send_metrics (l:Metrics.resource_metrics list) ~over ~ret = match !backend with | None -> over(); ret() - | Some (module B) -> - let ev = Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () in - B.send_metrics.send ev ~over ~ret + | Some (module B) -> B.send_metrics.send l ~over ~ret let rand_bytes_16 () = match !backend with From 396ef4c36614633bc8df6150ac2f2d12c31cacaa Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:01:56 -0400 Subject: [PATCH 02/16] test(emit1): knobs to change `sleep`, batch size, collector config etc. --- tests/emit1.ml | 57 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/tests/emit1.ml b/tests/emit1.ml index cf63333d..4e65fd21 100644 --- a/tests/emit1.ml +++ b/tests/emit1.ml @@ -3,11 +3,30 @@ module T = Opentelemetry let spf = Printf.sprintf let (let@) f x = f x +let sleep_inner = ref 0.1 +let sleep_outer = ref 2.0 + let run () = Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url()); + + (* regularly emit some metrics *) + let emit_gc() = + let gc = Gc.stat() in + T.Metrics.( + emit [ + gauge ~name:"ocaml_opentracing.test.major_heap_words" [int gc.Gc.heap_words]; + sum ~name:"ocaml_opentracing.test.minor_allocated" [float gc.Gc.minor_words]; + ]); + in + let _al = Gc.create_alarm emit_gc in + + let@ scope = T.Trace.with_ "run" in + let i = ref 0 in while true do - let@ scope = T.Trace.with_ ~kind:T.Span.Span_kind_producer + let@ scope = + T.Trace.with_ ~trace_id:scope.trace_id ~parent:scope.span_id + ~kind:T.Span.Span_kind_producer "loop.outer" ~attrs:["i", `Int !i] in for j=0 to 4 do @@ -16,15 +35,7 @@ let run () = ~trace_id:scope.trace_id ~parent:scope.span_id ~attrs:["j", `Int j] "loop.inner" in - Unix.sleepf 2.; - - let gc = Gc.stat() in - T.Metrics.( - emit [ - gauge ~name:"ocaml_opentracing.test.i" [int !i]; - gauge ~name:"ocaml_opentracing.test.major_heap_words" [int gc.Gc.heap_words]; - sum ~name:"ocaml_opentracing.test.minor_allocated" [float gc.Gc.minor_words]; - ]); + Unix.sleepf !sleep_outer; incr i; @@ -36,7 +47,7 @@ let run () = (* allocate some stuff *) let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in ignore _arr; - Unix.sleepf 0.1; + Unix.sleepf !sleep_inner; if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); @@ -49,4 +60,26 @@ let () = Sys.catch_break true; T.Globals.service_name := "t1"; T.Globals.service_namespace := Some "ocaml-otel.test"; - Opentelemetry_client_ocurl.with_setup run + + let thread = ref true in + let batch_traces = ref 400 in + let batch_metrics = ref 3 in + let opts = [ + "--thread", Arg.Bool ((:=) thread), " use a background thread"; + "--batch-traces", Arg.Int ((:=) batch_traces), " size of traces batch"; + "--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch"; + "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; + "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + ] |> Arg.align in + + Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + + let some_if_nzero r = if !r > 0 then Some !r else None in + let config = Opentelemetry_client_ocurl.Config.make + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~thread:!thread () in + Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." + !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; + + Opentelemetry_client_ocurl.with_setup ~config run From a00d4d238370d8d9af4ce7e6d0e752c99f3c4094 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:06:38 -0400 Subject: [PATCH 03/16] collector: if queue is full, drop item, and wakeup thread --- src/client/opentelemetry_client_ocurl.ml | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 8a241b1f..a907735a 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -155,7 +155,7 @@ module FQueue : sig type 'a t val create : dummy:'a -> int -> 'a t val size : _ t -> int - val push : 'a t -> 'a -> unit + val push : 'a t -> 'a -> bool (* true iff it could write element *) val pop_iter_all : 'a t -> ('a -> unit) -> unit end = struct type 'a t = { @@ -170,11 +170,15 @@ end = struct } let[@inline] size self = self.i + let[@inline] is_full self = self.i = Array.length self.arr - let push (self:_ t) x : unit = - assert (self.i < Array.length self.arr); - self.arr.(self.i) <- x; - self.i <- 1 + self.i + let push (self:_ t) x : bool = + if is_full self then false + else ( + self.arr.(self.i) <- x; + self.i <- 1 + self.i; + true + ) let pop_iter_all (self: _ t) f = for j=0 to self.i-1 do @@ -251,15 +255,14 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb - (module M : PUSH with type elt = a) | Some n -> - let q = FQueue.create ~dummy:(Obj.magic 0) (2 * n) in + let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in let module M = struct type elt = a let is_empty () = FQueue.size q = 0 let is_big_enough () = FQueue.size q >= n let push x = - FQueue.push q x; - if FQueue.size q > n then ( - !on_full() + if not (FQueue.push q x) || FQueue.size q > n then ( + !on_full(); (* drop *) ) let pop_iter_all f = FQueue.pop_iter_all q f end in @@ -374,7 +377,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let wakeup () = last_wakeup := Mtime_clock.now(); - with_mutex_ m (fun () -> Condition.signal cond) + with_mutex_ m (fun () -> Condition.signal cond); + Thread.yield() in (* wake up if a batch is full *) From c030bf9c212ce640f660382fff6356d6debc1631 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:20:20 -0400 Subject: [PATCH 04/16] depend on atomic shims, vendor them --- .gitmodules | 3 +++ dune-project | 2 ++ opentelemetry-client-ocurl.opam | 2 ++ vendor/atomic | 1 + vendor/dune | 1 + 5 files changed, 9 insertions(+) create mode 160000 vendor/atomic create mode 100644 vendor/dune diff --git a/.gitmodules b/.gitmodules index 6a111cdb..1554e2c4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "vendor/opentelemetry-proto"] path = vendor/opentelemetry-proto url = https://github.com/open-telemetry/opentelemetry-proto +[submodule "vendor/atomic"] + path = vendor/atomic + url = https://github.com/c-cube/ocaml-atomic.git diff --git a/dune-project b/dune-project index d5a78af6..03b38470 100644 --- a/dune-project +++ b/dune-project @@ -38,6 +38,8 @@ (depends (ocaml (>= "4.08")) (dune (>= "2.3")) + mtime + atomic (opentelemetry (= :version)) (ocaml-protoc (>= 2.1)) (odoc :with-doc) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index 7c4924a1..52aca892 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -10,6 +10,8 @@ bug-reports: depends: [ "ocaml" {>= "4.08"} "dune" {>= "2.3"} + "mtime" + "atomic" "opentelemetry" {= version} "ocaml-protoc" {>= "2.1"} "odoc" {with-doc} diff --git a/vendor/atomic b/vendor/atomic new file mode 160000 index 00000000..7802992a --- /dev/null +++ b/vendor/atomic @@ -0,0 +1 @@ +Subproject commit 7802992a568bf86c940e1c08a208e85102a0df6b diff --git a/vendor/dune b/vendor/dune new file mode 100644 index 00000000..b4ba0b2b --- /dev/null +++ b/vendor/dune @@ -0,0 +1 @@ +(vendored_dirs atomic) From fb0778805d72c182d9b68da66d0345e420645d88 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:20:31 -0400 Subject: [PATCH 05/16] ocurl backend: implement non-threaded mode, add config for batch timeout emit batches, even if they're not full, after given timeout --- src/client/dune | 3 +- src/client/opentelemetry_client_ocurl.ml | 103 ++++++++++++++-------- src/client/opentelemetry_client_ocurl.mli | 7 ++ 3 files changed, 77 insertions(+), 36 deletions(-) diff --git a/src/client/dune b/src/client/dune index 153744a8..11696169 100644 --- a/src/client/dune +++ b/src/client/dune @@ -2,5 +2,6 @@ (library (name opentelemetry_client_ocurl) (public_name opentelemetry-client-ocurl) - (libraries opentelemetry curl ocaml-protoc threads mtime mtime.clock.os)) + (libraries opentelemetry curl ocaml-protoc threads + atomic mtime mtime.clock.os)) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index a907735a..c3f61011 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -29,25 +29,30 @@ module Config = struct url: string; batch_traces: int option; batch_metrics: int option; + batch_timeout_ms: int; thread: bool; } let pp out self = let ppiopt = Format.pp_print_option Format.pp_print_int in - let {debug; url; batch_traces; batch_metrics; thread} = self in + let {debug; url; batch_traces; batch_metrics; + batch_timeout_ms; thread} = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ \ - batch_traces=%a;@ batch_metrics=%a;@ thread=%B @]}" + batch_traces=%a;@ batch_metrics=%a;@ + batch_timeout_ms=%d; thread=%B @]}" debug url ppiopt batch_traces ppiopt batch_metrics - thread + batch_timeout_ms thread let make ?(debug= !debug_) ?(url= get_url()) ?(batch_traces=Some 400) ?(batch_metrics=None) + ?(batch_timeout_ms=500) ?(thread=true) () : t = - { debug; url; batch_traces; batch_metrics; thread; } + { debug; url; batch_traces; batch_metrics; batch_timeout_ms; + thread; } end (* critical section for [f()] *) @@ -283,7 +288,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let encoder = Pbrt.Encoder.create() in - let emit_metrics (module C:CURL) (l:(Metrics.resource_metrics list*over_cb) list) = + let ((module C) as curl) = (module Curl() : CURL) in + + let emit_metrics (l:(Metrics.resource_metrics list*over_cb) list) = Pbrt.Encoder.reset encoder; let resource_metrics = List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in @@ -302,7 +309,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = List.iter (fun (_,over) -> over()) l; in - let emit_traces (module C: CURL) (l:(Trace.resource_spans list * over_cb) list) = + let emit_traces (l:(Trace.resource_spans list * over_cb) list) = Pbrt.Encoder.reset encoder; let resource_spans = List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in @@ -320,42 +327,49 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = List.iter (fun (_,over) -> over()) l; in + let last_wakeup = Atomic.make (Mtime_clock.now()) in + let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in + let batch_timeout() : bool = + let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in + Mtime.Span.compare elapsed timeout >= 0 + in + + let emit_metrics ?(force=false) () : bool = + if (force && not (E_metrics.is_empty())) || + (not force && E_metrics.is_big_enough ()) then ( + let batch = ref [] in + E_metrics.pop_iter_all (fun l -> batch := l :: !batch); + emit_metrics !batch; + Atomic.set last_wakeup (Mtime_clock.now()); + true + ) else false + in + let emit_traces ?(force=false) () : bool = + if (force && not (E_trace.is_empty())) || + (not force && E_trace.is_big_enough ()) then ( + let batch = ref [] in + E_trace.pop_iter_all (fun l -> batch := l :: !batch); + emit_traces !batch; + Atomic.set last_wakeup (Mtime_clock.now()); + true + ) else false + in + + let emit_all_force () = + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + in + if config.thread then ( begin let m = Mutex.create() in set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m); end; - let module C = Curl() in + let ((module C) as curl) = (module Curl() : CURL) in let m = Mutex.create() in let cond = Condition.create() in - let last_wakeup = ref (Mtime_clock.now()) in - - (* TODO: move this into config *) - let batch_timeout() : bool = - let elapsed = Mtime.span (Mtime_clock.now()) !last_wakeup in - Mtime.Span.compare elapsed Mtime.Span.(200 * ms) >= 0 - in - - let emit_metrics ?(force=false) () : bool = - if (force && not (E_metrics.is_empty())) || - (not force && E_metrics.is_big_enough ()) then ( - let batch = ref [] in - E_metrics.pop_iter_all (fun l -> batch := l :: !batch); - emit_metrics (module C) !batch; - true - ) else false - in - let emit_traces ?(force=false) () : bool = - if (force && not (E_trace.is_empty())) || - (not force && E_trace.is_big_enough ()) then ( - let batch = ref [] in - E_trace.pop_iter_all (fun l -> batch := l :: !batch); - emit_traces (module C) !batch; - true - ) else false - in let bg_thread () = while !continue do @@ -376,7 +390,6 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let _: Thread.t = Thread.create bg_thread () in let wakeup () = - last_wakeup := Mtime_clock.now(); with_mutex_ m (fun () -> Condition.signal cond); Thread.yield() in @@ -398,7 +411,27 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = end in (module M) ) else ( - assert false + + on_metrics_full (fun () -> + ignore (emit_metrics () : bool)); + on_trace_full (fun () -> + ignore (emit_traces () : bool)); + + let cleanup () = + emit_all_force(); + C.cleanup(); + in + + let module M = struct + let push_trace e ~over = + E_trace.push (e,over); + if batch_timeout() then emit_all_force() + let push_metrics e ~over = + E_metrics.push (e,over); + if batch_timeout() then emit_all_force() + let cleanup = cleanup + end in + (module M) ) module Backend(Arg : sig val config : Config.t end)() diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 7a631305..b8c9a902 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -38,6 +38,12 @@ module Config : sig Default [None]. *) + batch_timeout_ms: int; + (** Number of milliseconds after which we will emit a batch, even + incomplete. + Note that the batch might take longer than that, because this is + only checked when a new event occurs. Default 500. *) + thread: bool; (** Is there a background thread? Default [true] *) } @@ -46,6 +52,7 @@ module Config : sig ?debug:bool -> ?url:string -> ?batch_traces:int option -> ?batch_metrics:int option -> + ?batch_timeout_ms:int -> ?thread:bool -> unit -> t (** Make a configuration *) From 7ba310913106a623db07490d97efa1a837cc9e00 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 15:44:20 -0400 Subject: [PATCH 06/16] try to fix CI --- .github/workflows/main.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7d306492..bea829a0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -33,6 +33,8 @@ jobs: ocaml-compiler: ${{ matrix.ocaml-compiler }} opam-depext-flags: --with-test + - run: opam pin vendor/atomic/#HEAD -y -n + - run: opam install . --deps-only --with-test - run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl From 83cd095dcfc315d7716284f51747558395074883 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 16:42:40 -0400 Subject: [PATCH 07/16] try to fix CI --- .github/workflows/main.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bea829a0..d1396197 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,9 +32,13 @@ jobs: with: ocaml-compiler: ${{ matrix.ocaml-compiler }} opam-depext-flags: --with-test + opam-pin: false + opam-depext: false - run: opam pin vendor/atomic/#HEAD -y -n + - run: opam pin . -n + - run: opam install . --deps-only --with-test - run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl From 11aa0a32475c49caa8e1b9fe0290358441831b97 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 21 Mar 2022 16:58:55 -0400 Subject: [PATCH 08/16] try to fix CI --- .github/workflows/main.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d1396197..a5d6e086 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,12 +32,7 @@ jobs: with: ocaml-compiler: ${{ matrix.ocaml-compiler }} opam-depext-flags: --with-test - opam-pin: false - opam-depext: false - - - run: opam pin vendor/atomic/#HEAD -y -n - - - run: opam pin . -n + opam-pin: "*.opam,vendor/atomic/*.opam" - run: opam install . --deps-only --with-test From 3bb840c3636a752fd976baaeee23282cdb63463d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 09:58:53 -0400 Subject: [PATCH 09/16] wip ci --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a5d6e086..a8ced97d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,7 +32,7 @@ jobs: with: ocaml-compiler: ${{ matrix.ocaml-compiler }} opam-depext-flags: --with-test - opam-pin: "*.opam,vendor/atomic/*.opam" + opam-local-packages: ["*.opam", "vendor/atomic/*.opam"] - run: opam install . --deps-only --with-test From f84f16b34a0db23ad1c633f4a8d2bbedcc08094a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 10:01:19 -0400 Subject: [PATCH 10/16] update atomic, do not list it as opam dep --- .github/workflows/main.yml | 1 - dune-project | 2 +- opentelemetry-client-ocurl.opam | 1 - vendor/atomic | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a8ced97d..7d306492 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -32,7 +32,6 @@ jobs: with: ocaml-compiler: ${{ matrix.ocaml-compiler }} opam-depext-flags: --with-test - opam-local-packages: ["*.opam", "vendor/atomic/*.opam"] - run: opam install . --deps-only --with-test diff --git a/dune-project b/dune-project index 03b38470..75f525eb 100644 --- a/dune-project +++ b/dune-project @@ -39,7 +39,7 @@ (ocaml (>= "4.08")) (dune (>= "2.3")) mtime - atomic + ; atomic ; vendored (opentelemetry (= :version)) (ocaml-protoc (>= 2.1)) (odoc :with-doc) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index 52aca892..e6d4b97a 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -11,7 +11,6 @@ depends: [ "ocaml" {>= "4.08"} "dune" {>= "2.3"} "mtime" - "atomic" "opentelemetry" {= version} "ocaml-protoc" {>= "2.1"} "odoc" {with-doc} diff --git a/vendor/atomic b/vendor/atomic index 7802992a..12dc7c84 160000 --- a/vendor/atomic +++ b/vendor/atomic @@ -1 +1 @@ -Subproject commit 7802992a568bf86c940e1c08a208e85102a0df6b +Subproject commit 12dc7c84f79606a8a0026ba1eb7856b3cdf6cab6 From a768c55eace9632b92c54b2de16b6ca4622eea08 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 11:10:44 -0400 Subject: [PATCH 11/16] chore: put lower bound on mtime --- dune-project | 2 +- opentelemetry-client-ocurl.opam | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dune-project b/dune-project index 75f525eb..21e3f5a2 100644 --- a/dune-project +++ b/dune-project @@ -38,7 +38,7 @@ (depends (ocaml (>= "4.08")) (dune (>= "2.3")) - mtime + (mtime (>= "1.4")) ; for spans ; atomic ; vendored (opentelemetry (= :version)) (ocaml-protoc (>= 2.1)) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index e6d4b97a..7133660b 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -10,7 +10,7 @@ bug-reports: depends: [ "ocaml" {>= "4.08"} "dune" {>= "2.3"} - "mtime" + "mtime" {>= "1.4"} "opentelemetry" {= version} "ocaml-protoc" {>= "2.1"} "odoc" {with-doc} From b03ff26ba65882091928515c0b5346b84cf20877 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 11:18:35 -0400 Subject: [PATCH 12/16] change signature of with_setup --- src/client/opentelemetry_client_ocurl.ml | 2 +- src/client/opentelemetry_client_ocurl.mli | 4 +++- tests/emit1.ml | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index c3f61011..a7dc599f 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -471,6 +471,6 @@ let setup ?(config=Config.make()) () = let cleanup = setup_ ~config () in at_exit cleanup -let with_setup ?(config=Config.make()) f = +let with_setup ?(config=Config.make()) () f = let cleanup = setup_ ~config () in Fun.protect ~finally:cleanup f diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index b8c9a902..29a14538 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -64,4 +64,6 @@ val setup : ?config:Config.t -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param config configuration to use *) -val with_setup : ?config:Config.t -> (unit -> 'a) -> 'a +val with_setup : ?config:Config.t -> unit -> (unit -> 'a) -> 'a +(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up + after [f()] returns. *) diff --git a/tests/emit1.ml b/tests/emit1.ml index 4e65fd21..38d06042 100644 --- a/tests/emit1.ml +++ b/tests/emit1.ml @@ -82,4 +82,4 @@ let () = Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; - Opentelemetry_client_ocurl.with_setup ~config run + Opentelemetry_client_ocurl.with_setup ~config () run From 000292cd171ceeedc8339047f697de5fcff0854a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 11:26:24 -0400 Subject: [PATCH 13/16] catch errors in backend --- src/client/opentelemetry_client_ocurl.ml | 28 +++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index a7dc599f..8ac17e9e 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -276,6 +276,11 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb - in push, ((:=) on_full) + +(* make an emitter. + + exceptions inside should be caught, see + https://opentelemetry.io/docs/reference/specification/error-handling/ *) let mk_emitter ~(config:Config.t) () : (module EMITTER) = let open Proto in @@ -355,11 +360,20 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = ) else false in + let[@inline] guard f = + try f() + with e -> + Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" + (Printexc.to_string e) + in + let emit_all_force () = + let@ () = guard in ignore (emit_traces ~force:true () : bool); ignore (emit_metrics ~force:true () : bool); in + if config.thread then ( begin let m = Mutex.create() in @@ -373,6 +387,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let bg_thread () = while !continue do + let@ () = guard in if emit_metrics () then () else if emit_traces () then () else ( @@ -382,9 +397,12 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = ) done; (* flush remaining events *) - ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); - C.cleanup(); + begin + let@ () = guard in + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + C.cleanup(); + end in let _: Thread.t = Thread.create bg_thread () in @@ -424,11 +442,15 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let module M = struct let push_trace e ~over = + let@() = guard in E_trace.push (e,over); if batch_timeout() then emit_all_force() + let push_metrics e ~over = + let@() = guard in E_metrics.push (e,over); if batch_timeout() then emit_all_force() + let cleanup = cleanup end in (module M) From bcbb07027f3568129135c2760a4a187c70893b71 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 11:33:12 -0400 Subject: [PATCH 14/16] add `?enable` to the ocurl client --- src/client/opentelemetry_client_ocurl.ml | 16 ++++++++++------ src/client/opentelemetry_client_ocurl.mli | 7 +++++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 8ac17e9e..b7beb90e 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -489,10 +489,14 @@ let setup_ ~(config:Config.t) () = Opentelemetry.Collector.backend := Some (module B); B.cleanup -let setup ?(config=Config.make()) () = - let cleanup = setup_ ~config () in - at_exit cleanup +let setup ?(config=Config.make()) ?(enable=true) () = + if enable then ( + let cleanup = setup_ ~config () in + at_exit cleanup + ) -let with_setup ?(config=Config.make()) () f = - let cleanup = setup_ ~config () in - Fun.protect ~finally:cleanup f +let with_setup ?(config=Config.make()) ?(enable=true) () f = + if enable then ( + let cleanup = setup_ ~config () in + Fun.protect ~finally:cleanup f + ) else f() diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 29a14538..e8ac12d9 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -60,10 +60,13 @@ module Config : sig val pp : Format.formatter -> t -> unit end -val setup : ?config:Config.t -> unit -> unit +val setup : ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. + @param enable actually setup the backend (default true). This can + be used to enable/disable the setup depending on CLI arguments + or environment. @param config configuration to use *) -val with_setup : ?config:Config.t -> unit -> (unit -> 'a) -> 'a +val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns. *) From 3a72a73c15b74ff9ff7b670a2b67e8d9e1a8f47f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 12:38:29 -0400 Subject: [PATCH 15/16] self exported metrics --- src/client/opentelemetry_client_ocurl.ml | 48 ++++++++++++++++++++++-- src/opentelemetry.ml | 12 ++++-- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index b7beb90e..f90f5a30 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -6,6 +6,7 @@ (* TODO *) +module OT = Opentelemetry open Opentelemetry let[@inline] (let@) f x = f x @@ -74,6 +75,9 @@ type error = [ | `Failure of string ] +let n_errors = Atomic.make 0 +let n_dropped = Atomic.make 0 + let report_err_ = function | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg @@ -267,7 +271,10 @@ let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb - let is_big_enough () = FQueue.size q >= n let push x = if not (FQueue.push q x) || FQueue.size q > n then ( - !on_full(); (* drop *) + !on_full(); + if not (FQueue.push q x) then ( + Atomic.incr n_dropped; (* drop item *) + ) ) let pop_iter_all f = FQueue.pop_iter_all q f end in @@ -308,7 +315,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = (Pbrt.Encoder.to_string encoder) with | Ok () -> () - | Error err -> report_err_ err + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err end; (* signal completion *) List.iter (fun (_,over) -> over()) l; @@ -326,7 +336,10 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = (Pbrt.Encoder.to_string encoder) with | Ok () -> () - | Error err -> report_err_ err + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err end; (* signal completion *) List.iter (fun (_,over) -> over()) l; @@ -474,10 +487,39 @@ module Backend(Arg : sig val config : Config.t end)() ret() } + let last_sent_metrics = Atomic.make (Mtime_clock.now()) + let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) + + let additional_metrics () : _ list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now() in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + if add_own_metrics then ( + let open OT.Metrics in + Atomic.set last_sent_metrics now; + [make_resource_metrics [ + sum ~name:"otel-export.dropped" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]] + ) else [] + let send_metrics : Metrics.resource_metrics list sender = { send=fun m ~over ~ret -> let@() = with_lock_ in if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + + let m = List.rev_append (additional_metrics()) m in push_metrics m ~over; ret() } diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 7a0bb4b1..8ce4f77d 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -486,13 +486,17 @@ module Metrics = struct (* TODO: summary *) (* TODO: exemplar *) - (** Emit some metrics to the collector (sync). *) - let emit (l:t list) : unit = + (** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *) + let make_resource_metrics (l:t list) : resource_metrics = let lm = default_instrumentation_library_metrics ~instrumentation_library:(Some Globals.instrumentation_library) ~metrics:l () in - let rm = default_resource_metrics - ~instrumentation_library_metrics:[lm] () in + default_resource_metrics + ~instrumentation_library_metrics:[lm] () + + (** Emit some metrics to the collector (sync). *) + let emit (l:t list) : unit = + let rm = make_resource_metrics l in Collector.send_metrics [rm] ~over:ignore ~ret:ignore end From 03313e9121c513402ad970de434d768b86b058b3 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 22 Mar 2022 13:00:06 -0400 Subject: [PATCH 16/16] fix(ocurl): check for batch timeouts in exporter as well --- src/client/opentelemetry_client_ocurl.ml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index f90f5a30..d0ea0d31 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -401,8 +401,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let bg_thread () = while !continue do let@ () = guard in - if emit_metrics () then () - else if emit_traces () then () + let timeout = batch_timeout() in + if emit_metrics ~force:timeout () then () + else if emit_traces ~force:timeout () then () else ( (* wait *) let@ () = with_mutex_ m in