diff --git a/.gitmodules b/.gitmodules index 6a111cdb..1554e2c4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "vendor/opentelemetry-proto"] path = vendor/opentelemetry-proto url = https://github.com/open-telemetry/opentelemetry-proto +[submodule "vendor/atomic"] + path = vendor/atomic + url = https://github.com/c-cube/ocaml-atomic.git diff --git a/dune-project b/dune-project index d5a78af6..21e3f5a2 100644 --- a/dune-project +++ b/dune-project @@ -38,6 +38,8 @@ (depends (ocaml (>= "4.08")) (dune (>= "2.3")) + (mtime (>= "1.4")) ; for spans + ; atomic ; vendored (opentelemetry (= :version)) (ocaml-protoc (>= 2.1)) (odoc :with-doc) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index 7c4924a1..7133660b 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -10,6 +10,7 @@ bug-reports: depends: [ "ocaml" {>= "4.08"} "dune" {>= "2.3"} + "mtime" {>= "1.4"} "opentelemetry" {= version} "ocaml-protoc" {>= "2.1"} "odoc" {with-doc} diff --git a/src/client/dune b/src/client/dune index c4f4685d..11696169 100644 --- a/src/client/dune +++ b/src/client/dune @@ -2,5 +2,6 @@ (library (name opentelemetry_client_ocurl) (public_name opentelemetry-client-ocurl) - (libraries opentelemetry curl ocaml-protoc)) + (libraries opentelemetry curl ocaml-protoc threads + atomic mtime mtime.clock.os)) diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 4800c2d8..d0ea0d31 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -6,6 +6,7 @@ (* TODO *) +module OT = Opentelemetry open Opentelemetry let[@inline] (let@) f x = f x @@ -23,42 +24,90 @@ let set_mutex ~lock ~unlock : unit = lock_ := lock; unlock_ := unlock +module Config = struct + type t = { + debug: bool; + url: string; + batch_traces: int option; + batch_metrics: int option; + batch_timeout_ms: int; + thread: bool; + } + + let pp out self = + let ppiopt = Format.pp_print_option Format.pp_print_int in + let {debug; url; batch_traces; batch_metrics; + batch_timeout_ms; thread} = self in + Format.fprintf out "{@[ debug=%B;@ url=%S;@ \ + batch_traces=%a;@ batch_metrics=%a;@ + batch_timeout_ms=%d; thread=%B @]}" + debug url ppiopt batch_traces ppiopt batch_metrics + batch_timeout_ms thread + + let make + ?(debug= !debug_) + ?(url= get_url()) + ?(batch_traces=Some 400) + ?(batch_metrics=None) + ?(batch_timeout_ms=500) + ?(thread=true) + () : t = + { debug; url; batch_traces; batch_metrics; batch_timeout_ms; + thread; } +end + +(* critical section for [f()] *) let[@inline] with_lock_ f = !lock_(); Fun.protect ~finally:!unlock_ f -let _init = lazy ( +let[@inline] with_mutex_ m f = + Mutex.lock m; + Fun.protect ~finally:(fun () -> Mutex.unlock m) f + +let _init_curl = lazy ( Curl.global_init Curl.CURLINIT_GLOBALALL; at_exit Curl.global_cleanup; ) -module Backend() : Opentelemetry.Collector.BACKEND = struct - let() = Lazy.force _init +type error = [ + | `Status of int * Opentelemetry.Proto.Status.status + | `Failure of string +] - (* TODO: use Curl.Multi, etc. *) +let n_errors = Atomic.make 0 +let n_dropped = Atomic.make 0 + +let report_err_ = function + | `Failure msg -> + Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg + | `Status (code, status) -> + Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." + code Proto.Status.pp_status status + +module type CURL = sig + val send : path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + val cleanup : unit -> unit +end + +(* create a curl client *) +module Curl() : CURL = struct + open Opentelemetry.Proto + let() = Lazy.force _init_curl - let encoder = Pbrt.Encoder.create() let buf_res = Buffer.create 256 - let rand_ = Random.State.make_self_init() + (* TODO: use Curl.Multi, etc. instead? *) (* http client *) let curl : Curl.t = Curl.init () let cleanup () = Curl.cleanup curl - open Opentelemetry.Proto - open Opentelemetry.Collector - - type error = [ - | `Status of int * Status.status - | `Failure of string - ] - (* TODO: use Curl multi *) (* send the content to the remote endpoint/path *) - let send_ ~path ~decode (bod:string) : ('a, error) result = + let send ~path ~decode (bod:string) : ('a, error) result = Curl.reset curl; if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); @@ -100,48 +149,56 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct ~code:(Int32.of_int code) ~message:(Bytes.unsafe_of_string msg) () in Error(`Status (code, status)) with e -> Error (`Failure (Printexc.to_string e)) +end - let report_err_ = function - | `Failure msg -> - Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status (code, status) -> - Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." - code Status.pp_status status +module type PUSH = sig + type elt + val push : elt -> unit + val is_empty : unit -> bool + val is_big_enough : unit -> bool + val pop_iter_all : (elt -> unit) -> unit +end - let send_trace : Trace_service.export_trace_service_request sender = { - send=fun tr ~over ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send trace %a@." Trace_service.pp_export_trace_service_request tr; - Pbrt.Encoder.reset encoder; - Trace_service.encode_export_trace_service_request tr encoder; - begin match - send_ ~path:"/v1/traces" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) - with - | Ok () -> () - | Error err -> report_err_ err - end; - over(); - ret() +(* queue of fixed size *) +module FQueue : sig + type 'a t + val create : dummy:'a -> int -> 'a t + val size : _ t -> int + val push : 'a t -> 'a -> bool (* true iff it could write element *) + val pop_iter_all : 'a t -> ('a -> unit) -> unit +end = struct + type 'a t = { + arr: 'a array; + mutable i: int; } - let send_metrics : Metrics_service.export_metrics_service_request sender = { - send=fun m ~over ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send metrics %a@." Metrics_service.pp_export_metrics_service_request m; - Pbrt.Encoder.reset encoder; - Metrics_service.encode_export_metrics_service_request m encoder; - begin - match - send_ ~path:"/v1/metrics" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder); - with - | Ok () -> () - | Error err -> report_err_ err - end; - over(); - ret() - } + let create ~dummy n : _ t = + assert (n >= 1); + { arr=Array.make n dummy; + i=0; + } + + let[@inline] size self = self.i + let[@inline] is_full self = self.i = Array.length self.arr + + let push (self:_ t) x : bool = + if is_full self then false + else ( + self.arr.(self.i) <- x; + self.i <- 1 + self.i; + true + ) + + let pop_iter_all (self: _ t) f = + for j=0 to self.i-1 do + f self.arr.(j) + done; + self.i <- 0 +end + +(* generate random IDs *) +module Gen_ids() = struct + let rand_ = Random.State.make_self_init() let rand_bytes_8 () : bytes = let@() = with_lock_ in @@ -171,15 +228,318 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct b end -let setup_ () = - let module B = Backend() in +(** Callback for when an event is properly sent to the collector *) +type over_cb = unit -> unit + +(** An emitter. This is used by {!Backend} below to forward traces/metrics/… + from the program to whatever collector client we have. *) +module type EMITTER = sig + open Opentelemetry.Proto + + val push_trace : Trace.resource_spans list -> over:over_cb -> unit + val push_metrics : Metrics.resource_metrics list -> over:over_cb -> unit + + val cleanup : unit -> unit +end + +type 'a push = (module PUSH with type elt = 'a) +type on_full_cb = (unit -> unit) + +(* make a "push" object, along with a setter for a callback to call when + it's ready to emit a batch *) +let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -> unit) = + let on_full: on_full_cb ref = ref ignore in + let push = + match batch with + | None -> + let r = ref None in + let module M = struct + type elt = a + let is_empty () = !r == None + let is_big_enough () = !r != None + let push x = + r := Some x; !on_full() + let pop_iter_all f = Option.iter f !r; r := None + end in + (module M : PUSH with type elt = a) + + | Some n -> + let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in + let module M = struct + type elt = a + let is_empty () = FQueue.size q = 0 + let is_big_enough () = FQueue.size q >= n + let push x = + if not (FQueue.push q x) || FQueue.size q > n then ( + !on_full(); + if not (FQueue.push q x) then ( + Atomic.incr n_dropped; (* drop item *) + ) + ) + let pop_iter_all f = FQueue.pop_iter_all q f + end in + (module M : PUSH with type elt = a) + + in + push, ((:=) on_full) + + +(* make an emitter. + + exceptions inside should be caught, see + https://opentelemetry.io/docs/reference/specification/error-handling/ *) +let mk_emitter ~(config:Config.t) () : (module EMITTER) = + let open Proto in + + let continue = ref true in + + let ((module E_trace) : (Trace.resource_spans list * over_cb) push), on_trace_full = + mk_push ?batch:config.batch_traces () in + let ((module E_metrics) : (Metrics.resource_metrics list * over_cb) push), on_metrics_full = + mk_push ?batch:config.batch_metrics () in + + let encoder = Pbrt.Encoder.create() in + + let ((module C) as curl) = (module Curl() : CURL) in + + let emit_metrics (l:(Metrics.resource_metrics list*over_cb) list) = + Pbrt.Encoder.reset encoder; + let resource_metrics = + List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in + Metrics_service.encode_export_metrics_service_request + (Metrics_service.default_export_metrics_service_request + ~resource_metrics ()) + encoder; + begin match + C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder) + with + | Ok () -> () + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err + end; + (* signal completion *) + List.iter (fun (_,over) -> over()) l; + in + + let emit_traces (l:(Trace.resource_spans list * over_cb) list) = + Pbrt.Encoder.reset encoder; + let resource_spans = + List.fold_left (fun acc (l,_) -> List.rev_append l acc) [] l in + Trace_service.encode_export_trace_service_request + (Trace_service.default_export_trace_service_request ~resource_spans ()) + encoder; + begin match + C.send ~path:"/v1/traces" ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder) + with + | Ok () -> () + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err + end; + (* signal completion *) + List.iter (fun (_,over) -> over()) l; + in + + let last_wakeup = Atomic.make (Mtime_clock.now()) in + let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in + let batch_timeout() : bool = + let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in + Mtime.Span.compare elapsed timeout >= 0 + in + + let emit_metrics ?(force=false) () : bool = + if (force && not (E_metrics.is_empty())) || + (not force && E_metrics.is_big_enough ()) then ( + let batch = ref [] in + E_metrics.pop_iter_all (fun l -> batch := l :: !batch); + emit_metrics !batch; + Atomic.set last_wakeup (Mtime_clock.now()); + true + ) else false + in + let emit_traces ?(force=false) () : bool = + if (force && not (E_trace.is_empty())) || + (not force && E_trace.is_big_enough ()) then ( + let batch = ref [] in + E_trace.pop_iter_all (fun l -> batch := l :: !batch); + emit_traces !batch; + Atomic.set last_wakeup (Mtime_clock.now()); + true + ) else false + in + + let[@inline] guard f = + try f() + with e -> + Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" + (Printexc.to_string e) + in + + let emit_all_force () = + let@ () = guard in + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + in + + + if config.thread then ( + begin + let m = Mutex.create() in + set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m); + end; + + let ((module C) as curl) = (module Curl() : CURL) in + + let m = Mutex.create() in + let cond = Condition.create() in + + let bg_thread () = + while !continue do + let@ () = guard in + let timeout = batch_timeout() in + if emit_metrics ~force:timeout () then () + else if emit_traces ~force:timeout () then () + else ( + (* wait *) + let@ () = with_mutex_ m in + Condition.wait cond m; + ) + done; + (* flush remaining events *) + begin + let@ () = guard in + ignore (emit_traces ~force:true () : bool); + ignore (emit_metrics ~force:true () : bool); + C.cleanup(); + end + in + + let _: Thread.t = Thread.create bg_thread () in + + let wakeup () = + with_mutex_ m (fun () -> Condition.signal cond); + Thread.yield() + in + + (* wake up if a batch is full *) + on_metrics_full wakeup; + on_trace_full wakeup; + + let module M = struct + let push_trace e ~over = + E_trace.push (e,over); + if batch_timeout() then wakeup() + let push_metrics e ~over = + E_metrics.push (e,over); + if batch_timeout() then wakeup() + let cleanup () = + continue := false; + with_mutex_ m (fun () -> Condition.broadcast cond) + end in + (module M) + ) else ( + + on_metrics_full (fun () -> + ignore (emit_metrics () : bool)); + on_trace_full (fun () -> + ignore (emit_traces () : bool)); + + let cleanup () = + emit_all_force(); + C.cleanup(); + in + + let module M = struct + let push_trace e ~over = + let@() = guard in + E_trace.push (e,over); + if batch_timeout() then emit_all_force() + + let push_metrics e ~over = + let@() = guard in + E_metrics.push (e,over); + if batch_timeout() then emit_all_force() + + let cleanup = cleanup + end in + (module M) + ) + +module Backend(Arg : sig val config : Config.t end)() + : Opentelemetry.Collector.BACKEND += struct + include Gen_ids() + + include (val mk_emitter ~config:Arg.config ()) + + open Opentelemetry.Proto + open Opentelemetry.Collector + + let send_trace : Trace.resource_spans list sender = { + send=fun l ~over ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; + push_trace l ~over; + ret() + } + + let last_sent_metrics = Atomic.make (Mtime_clock.now()) + let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) + + let additional_metrics () : _ list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now() in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + if add_own_metrics then ( + let open OT.Metrics in + Atomic.set last_sent_metrics now; + [make_resource_metrics [ + sum ~name:"otel-export.dropped" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true [ + int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]] + ) else [] + + let send_metrics : Metrics.resource_metrics list sender = { + send=fun m ~over ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + + let m = List.rev_append (additional_metrics()) m in + push_metrics m ~over; + ret() + } +end + +let setup_ ~(config:Config.t) () = + debug_ := config.debug; + let module B = Backend(struct let config=config end)() in Opentelemetry.Collector.backend := Some (module B); B.cleanup -let setup() = - let cleanup = setup_() in - at_exit cleanup +let setup ?(config=Config.make()) ?(enable=true) () = + if enable then ( + let cleanup = setup_ ~config () in + at_exit cleanup + ) -let with_setup f = - let cleanup = setup_() in - Fun.protect ~finally:cleanup f +let with_setup ?(config=Config.make()) ?(enable=true) () f = + if enable then ( + let cleanup = setup_ ~config () in + Fun.protect ~finally:cleanup f + ) else f() diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index d933a982..e8ac12d9 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -11,8 +11,62 @@ val set_url : string -> unit or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit +(** Set a lock/unlock pair to protect the critical sections + of {!Opentelemetry.Collector.BACKEND} *) -val setup : unit -> unit -(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. *) +module Config : sig + type t = { + debug: bool; -val with_setup : (unit -> 'a) -> 'a + url: string; + (** Url of the endpoint. Default is "http://localhost:4318", + or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) + + batch_traces: int option; + (** Batch traces? If [Some i], then this produces batches of (at most) + [i] items. If [None], there is no batching. + + Note that traces and metrics are batched separately. + Default [Some 400]. + *) + + batch_metrics: int option; + (** Batch metrics? If [Some i], then this produces batches of (at most) + [i] items. If [None], there is no batching. + + Note that traces and metrics are batched separately. + Default [None]. + *) + + batch_timeout_ms: int; + (** Number of milliseconds after which we will emit a batch, even + incomplete. + Note that the batch might take longer than that, because this is + only checked when a new event occurs. Default 500. *) + + thread: bool; + (** Is there a background thread? Default [true] *) + } + + val make : + ?debug:bool -> ?url:string -> + ?batch_traces:int option -> + ?batch_metrics:int option -> + ?batch_timeout_ms:int -> + ?thread:bool -> + unit -> t + (** Make a configuration *) + + val pp : Format.formatter -> t -> unit +end + +val setup : ?config:Config.t -> ?enable:bool -> unit -> unit +(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. + @param enable actually setup the backend (default true). This can + be used to enable/disable the setup depending on CLI arguments + or environment. + @param config configuration to use *) + +val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a +(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up + after [f()] returns. *) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 28cf5126..8ce4f77d 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -79,9 +79,9 @@ module Collector = struct (** Collector client interface. *) module type BACKEND = sig - val send_trace : Trace_service.export_trace_service_request sender + val send_trace : Trace.resource_spans list sender - val send_metrics : Metrics_service.export_metrics_service_request sender + val send_metrics : Metrics.resource_metrics list sender val rand_bytes_16 : unit -> bytes (** Generate 16 bytes of random data *) @@ -102,18 +102,12 @@ module Collector = struct let send_trace (l:Trace.resource_spans list) ~over ~ret = match !backend with | None -> over(); ret() - | Some (module B) -> - let ev = Trace_service.default_export_trace_service_request - ~resource_spans:l () in - B.send_trace.send ev ~over ~ret + | Some (module B) -> B.send_trace.send l ~over ~ret let send_metrics (l:Metrics.resource_metrics list) ~over ~ret = match !backend with | None -> over(); ret() - | Some (module B) -> - let ev = Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () in - B.send_metrics.send ev ~over ~ret + | Some (module B) -> B.send_metrics.send l ~over ~ret let rand_bytes_16 () = match !backend with @@ -492,13 +486,17 @@ module Metrics = struct (* TODO: summary *) (* TODO: exemplar *) - (** Emit some metrics to the collector (sync). *) - let emit (l:t list) : unit = + (** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *) + let make_resource_metrics (l:t list) : resource_metrics = let lm = default_instrumentation_library_metrics ~instrumentation_library:(Some Globals.instrumentation_library) ~metrics:l () in - let rm = default_resource_metrics - ~instrumentation_library_metrics:[lm] () in + default_resource_metrics + ~instrumentation_library_metrics:[lm] () + + (** Emit some metrics to the collector (sync). *) + let emit (l:t list) : unit = + let rm = make_resource_metrics l in Collector.send_metrics [rm] ~over:ignore ~ret:ignore end diff --git a/tests/emit1.ml b/tests/emit1.ml index cf63333d..38d06042 100644 --- a/tests/emit1.ml +++ b/tests/emit1.ml @@ -3,11 +3,30 @@ module T = Opentelemetry let spf = Printf.sprintf let (let@) f x = f x +let sleep_inner = ref 0.1 +let sleep_outer = ref 2.0 + let run () = Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url()); + + (* regularly emit some metrics *) + let emit_gc() = + let gc = Gc.stat() in + T.Metrics.( + emit [ + gauge ~name:"ocaml_opentracing.test.major_heap_words" [int gc.Gc.heap_words]; + sum ~name:"ocaml_opentracing.test.minor_allocated" [float gc.Gc.minor_words]; + ]); + in + let _al = Gc.create_alarm emit_gc in + + let@ scope = T.Trace.with_ "run" in + let i = ref 0 in while true do - let@ scope = T.Trace.with_ ~kind:T.Span.Span_kind_producer + let@ scope = + T.Trace.with_ ~trace_id:scope.trace_id ~parent:scope.span_id + ~kind:T.Span.Span_kind_producer "loop.outer" ~attrs:["i", `Int !i] in for j=0 to 4 do @@ -16,15 +35,7 @@ let run () = ~trace_id:scope.trace_id ~parent:scope.span_id ~attrs:["j", `Int j] "loop.inner" in - Unix.sleepf 2.; - - let gc = Gc.stat() in - T.Metrics.( - emit [ - gauge ~name:"ocaml_opentracing.test.i" [int !i]; - gauge ~name:"ocaml_opentracing.test.major_heap_words" [int gc.Gc.heap_words]; - sum ~name:"ocaml_opentracing.test.minor_allocated" [float gc.Gc.minor_words]; - ]); + Unix.sleepf !sleep_outer; incr i; @@ -36,7 +47,7 @@ let run () = (* allocate some stuff *) let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in ignore _arr; - Unix.sleepf 0.1; + Unix.sleepf !sleep_inner; if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); @@ -49,4 +60,26 @@ let () = Sys.catch_break true; T.Globals.service_name := "t1"; T.Globals.service_namespace := Some "ocaml-otel.test"; - Opentelemetry_client_ocurl.with_setup run + + let thread = ref true in + let batch_traces = ref 400 in + let batch_metrics = ref 3 in + let opts = [ + "--thread", Arg.Bool ((:=) thread), " use a background thread"; + "--batch-traces", Arg.Int ((:=) batch_traces), " size of traces batch"; + "--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch"; + "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; + "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + ] |> Arg.align in + + Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + + let some_if_nzero r = if !r > 0 then Some !r else None in + let config = Opentelemetry_client_ocurl.Config.make + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~thread:!thread () in + Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." + !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; + + Opentelemetry_client_ocurl.with_setup ~config () run diff --git a/vendor/atomic b/vendor/atomic new file mode 160000 index 00000000..12dc7c84 --- /dev/null +++ b/vendor/atomic @@ -0,0 +1 @@ +Subproject commit 12dc7c84f79606a8a0026ba1eb7856b3cdf6cab6 diff --git a/vendor/dune b/vendor/dune new file mode 100644 index 00000000..b4ba0b2b --- /dev/null +++ b/vendor/dune @@ -0,0 +1 @@ +(vendored_dirs atomic)