feat: pass ?stop atomic; allow for multiple background threads

This commit is contained in:
Simon Cruanes 2022-07-06 12:53:00 -04:00
parent 331ae94547
commit 9c3e2a7076
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 160 additions and 98 deletions

View file

@ -8,7 +8,7 @@ type t = {
batch_metrics: int option; batch_metrics: int option;
batch_logs: int option; batch_logs: int option;
batch_timeout_ms: int; batch_timeout_ms: int;
thread: bool; bg_threads: int;
ticker_thread: bool; ticker_thread: bool;
} }
@ -24,20 +24,30 @@ let pp out self =
batch_metrics; batch_metrics;
batch_logs; batch_logs;
batch_timeout_ms; batch_timeout_ms;
thread; bg_threads;
ticker_thread; ticker_thread;
} = } =
self self
in in
Format.fprintf out Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms thread ticker_thread batch_logs batch_timeout_ms bg_threads ticker_thread
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400) ?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) ?(thread = true) ?(ticker_thread = true) () : t = ?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads
?(ticker_thread = true) () : t =
let bg_threads =
match bg_threads with
| Some n -> max n 0
| None ->
if thread then
4
else
0
in
{ {
debug; debug;
url; url;
@ -46,6 +56,6 @@ let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
batch_metrics; batch_metrics;
batch_timeout_ms; batch_timeout_ms;
batch_logs; batch_logs;
thread; bg_threads;
ticker_thread; ticker_thread;
} }

View file

@ -28,12 +28,13 @@ type t = private {
incomplete. incomplete.
Note that the batch might take longer than that, because this is Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *) only checked when a new event occurs. Default 500. *)
thread: bool; (** Is there a background thread? Default [true] *) bg_threads: int;
(** Are there background threads, and how many? Default [4] *)
ticker_thread: bool; ticker_thread: bool;
(** Is there a ticker thread? Default [true]. (** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector. sure it makes progress, and regularly send events to the collector.
This option is ignored if [thread=false]. *) This option is ignored if [bg_threads=0]. *)
} }
(** Configuration. (** Configuration.
@ -49,9 +50,15 @@ val make :
?batch_logs:int option -> ?batch_logs:int option ->
?batch_timeout_ms:int -> ?batch_timeout_ms:int ->
?thread:bool -> ?thread:bool ->
?bg_threads:int ->
?ticker_thread:bool -> ?ticker_thread:bool ->
unit -> unit ->
t t
(** Make a configuration *) (** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
*)
val pp : Format.formatter -> t -> unit val pp : Format.formatter -> t -> unit

View file

@ -44,37 +44,51 @@ let report_err_ = function
| `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!"
| `Failure msg -> | `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status (code, status) -> | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details })
->
let pp_details out l =
List.iter
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
l
in
Format.eprintf Format.eprintf
"@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code "@[<2>opentelemetry: export failed with@ http code=%d@ status \
Proto.Status.pp_status status {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@."
code scode
(Bytes.unsafe_to_string message)
pp_details details
module Httpc : sig
type t
val create : unit -> t
module type CURL = sig
val send : val send :
path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result t ->
path:string ->
decode:(Pbrt.Decoder.t -> 'a) ->
string ->
('a, error) result
val cleanup : unit -> unit val cleanup : t -> unit
end end = struct
(* create a curl client *)
module Curl () : CURL = struct
open Opentelemetry.Proto open Opentelemetry.Proto
let () = Lazy.force _init_curl let () = Lazy.force _init_curl
let buf_res = Buffer.create 256
(* TODO: use Curl.Multi, etc. instead? *) (* TODO: use Curl.Multi, etc. instead? *)
type t = {
buf_res: Buffer.t;
curl: Curl.t;
}
(* http client *) let create () : t = { buf_res = Buffer.create 256; curl = Curl.init () }
let curl : Curl.t = Curl.init ()
let cleanup () = Curl.cleanup curl let cleanup self = Curl.cleanup self.curl
(* TODO: use Curl multi *)
(* send the content to the remote endpoint/path *) (* send the content to the remote endpoint/path *)
let send ~path ~decode (bod : string) : ('a, error) result = let send (self : t) ~path ~decode (bod : string) : ('a, error) result =
let { curl; buf_res } = self in
Curl.reset curl; Curl.reset curl;
if !debug_ then Curl.set_verbose curl true; if !debug_ then Curl.set_verbose curl true;
Curl.set_url curl (!url ^ path); Curl.set_url curl (!url ^ path);
@ -251,12 +265,10 @@ let batch_is_empty = List.for_all l_is_empty
exceptions inside should be caught, see exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *) https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~(config : Config.t) () : (module EMITTER) = let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in let open Proto in
(* local helpers *) (* local helpers *)
let open struct let open struct
let continue = Atomic.make true
let timeout = let timeout =
if config.batch_timeout_ms > 0 then if config.batch_timeout_ms > 0 then
Some Mtime.Span.(config.batch_timeout_ms * ms) Some Mtime.Span.(config.batch_timeout_ms * ms)
@ -272,76 +284,71 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
let batch_logs : Logs.resource_logs list Batch.t = let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout () Batch.make ?batch:config.batch_logs ?timeout ()
let encoder = Pbrt.Encoder.create ()
let curl = (module Curl () : CURL)
module C = (val curl)
let on_tick_cbs_ = ref (ref []) let on_tick_cbs_ = ref (ref [])
let set_on_tick_callbacks = ( := ) on_tick_cbs_ let set_on_tick_callbacks = ( := ) on_tick_cbs_
let send_http_ ~path ~encode x : unit = let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit =
Pbrt.Encoder.reset encoder; Pbrt.Encoder.reset encoder;
encode x encoder; encode x encoder;
let data = Pbrt.Encoder.to_string encoder in let data = Pbrt.Encoder.to_string encoder in
match C.send ~path ~decode:(fun _ -> ()) data with match Httpc.send httpc ~path ~decode:(fun _ -> ()) data with
| Ok () -> () | Ok () -> ()
| Error `Sysbreak -> | Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!"; Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set continue false Atomic.set stop true
| Error err -> | Error err ->
(* TODO: log error _via_ otel? *) (* TODO: log error _via_ otel? *)
Atomic.incr n_errors; Atomic.incr n_errors;
report_err_ err report_err_ err
let send_metrics_http (l : Metrics.resource_metrics list list) = let send_metrics_http curl encoder (l : Metrics.resource_metrics list list)
=
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Metrics_service.default_export_metrics_service_request Metrics_service.default_export_metrics_service_request
~resource_metrics:l () ~resource_metrics:l ()
in in
send_http_ ~path:"/v1/metrics" send_http_ curl encoder ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x ~encode:Metrics_service.encode_export_metrics_service_request x
let send_traces_http (l : Trace.resource_spans list list) = let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Trace_service.default_export_trace_service_request ~resource_spans:l () Trace_service.default_export_trace_service_request ~resource_spans:l ()
in in
send_http_ ~path:"/v1/traces" send_http_ curl encoder ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x ~encode:Trace_service.encode_export_trace_service_request x
let send_logs_http (l : Logs.resource_logs list list) = let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Logs_service.default_export_logs_service_request ~resource_logs:l () Logs_service.default_export_logs_service_request ~resource_logs:l ()
in in
send_http_ ~path:"/v1/logs" send_http_ curl encoder ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x ~encode:Logs_service.encode_export_logs_service_request x
(* emit metrics, if the batch is full or timeout lapsed *) (* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force () : bool = let emit_metrics_maybe ~now ?force httpc encoder : bool =
match Batch.pop_if_ready ?force ~now batch_metrics with match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> false | None -> false
| Some l -> | Some l ->
let batch = AList.pop_all gc_metrics :: l in let batch = AList.pop_all gc_metrics :: l in
send_metrics_http batch; send_metrics_http httpc encoder batch;
true true
let emit_traces_maybe ~now ?force () : bool = let emit_traces_maybe ~now ?force httpc encoder : bool =
match Batch.pop_if_ready ?force ~now batch_traces with match Batch.pop_if_ready ?force ~now batch_traces with
| None -> false | None -> false
| Some l -> | Some l ->
send_traces_http l; send_traces_http httpc encoder l;
true true
let emit_logs_maybe ~now ?force () : bool = let emit_logs_maybe ~now ?force httpc encoder : bool =
match Batch.pop_if_ready ?force ~now batch_logs with match Batch.pop_if_ready ?force ~now batch_logs with
| None -> false | None -> false
| Some l -> | Some l ->
send_logs_http l; send_logs_http httpc encoder l;
true true
let[@inline] guard_exn_ f = let[@inline] guard_exn_ f =
@ -350,13 +357,13 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!"
(Printexc.to_string e) (Printexc.to_string e)
let emit_all_force () = let emit_all_force (httpc : Httpc.t) encoder =
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now ~force:true () : bool); ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool);
ignore (emit_logs_maybe ~now ~force:true () : bool); ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool);
ignore (emit_metrics_maybe ~now ~force:true () : bool) ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool)
end in end in
if config.thread then ( if config.bg_threads > 0 then (
(let m = Mutex.create () in (let m = Mutex.create () in
Lock.set_mutex Lock.set_mutex
~lock:(fun () -> Mutex.lock m) ~lock:(fun () -> Mutex.lock m)
@ -367,13 +374,15 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
(* loop for the thread that processes events and sends them to collector *) (* loop for the thread that processes events and sends them to collector *)
let bg_thread () = let bg_thread () =
while Atomic.get continue do let httpc = Httpc.create () in
let encoder = Pbrt.Encoder.create () in
while not @@ Atomic.get stop do
let@ () = guard_exn_ in let@ () = guard_exn_ in
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
let do_metrics = emit_metrics_maybe ~now () in let do_metrics = emit_metrics_maybe ~now httpc encoder in
let do_traces = emit_traces_maybe ~now () in let do_traces = emit_traces_maybe ~now httpc encoder in
let do_logs = emit_logs_maybe ~now () in let do_logs = emit_logs_maybe ~now httpc encoder in
if (not do_metrics) && (not do_traces) && not do_logs then if (not do_metrics) && (not do_traces) && not do_logs then
(* wait for something to happen *) (* wait for something to happen *)
let@ () = with_mutex_ m in let@ () = with_mutex_ m in
@ -381,14 +390,21 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
done; done;
(* flush remaining events once we exit *) (* flush remaining events once we exit *)
let@ () = guard_exn_ in let@ () = guard_exn_ in
emit_all_force (); emit_all_force httpc encoder;
C.cleanup () Httpc.cleanup httpc
in in
start_bg_thread bg_thread;
for _i = 1 to config.bg_threads do
start_bg_thread bg_thread
done;
(* if the bg thread waits, this will wake it up so it can send batches *) (* if the bg thread waits, this will wake it up so it can send batches *)
let wakeup () = let wakeup ~all () =
with_mutex_ m (fun () -> Condition.broadcast cond); with_mutex_ m (fun () ->
if all then
Condition.broadcast cond
else
Condition.signal cond);
Thread.yield () Thread.yield ()
in in
@ -403,90 +419,97 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
!(!on_tick_cbs_); !(!on_tick_cbs_);
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
if if Atomic.get stop then
(not (Atomic.get continue)) wakeup ~all:true ()
|| Batch.is_ready ~now batch_metrics else if
Batch.is_ready ~now batch_metrics
|| Batch.is_ready ~now batch_traces || Batch.is_ready ~now batch_traces
|| Batch.is_ready ~now batch_logs || Batch.is_ready ~now batch_logs
then then
wakeup () wakeup ~all:false ()
in in
if config.ticker_thread then ( if config.ticker_thread then (
(* thread that calls [tick()] regularly, to help enforce timeouts *) (* thread that calls [tick()] regularly, to help enforce timeouts *)
let tick_thread () = let tick_thread () =
while Atomic.get continue do while not @@ Atomic.get stop do
Thread.delay 0.5; Thread.delay 0.5;
tick () tick ()
done; done;
wakeup () wakeup ~all:true ()
in in
start_bg_thread tick_thread start_bg_thread tick_thread
); );
let module M = struct let module M = struct
let push_trace e = if Batch.push batch_traces e then wakeup () let push_trace e = if Batch.push batch_traces e then wakeup ~all:false ()
let push_metrics e = if Batch.push batch_metrics e then wakeup () let push_metrics e =
if Batch.push batch_metrics e then wakeup ~all:false ()
let push_logs e = if Batch.push batch_logs e then wakeup () let push_logs e = if Batch.push batch_logs e then wakeup ~all:false ()
let set_on_tick_callbacks = set_on_tick_callbacks let set_on_tick_callbacks = set_on_tick_callbacks
let tick = tick let tick = tick
let cleanup () = let cleanup () =
Atomic.set continue false; Atomic.set stop true;
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
(* wakeup everyone *)
with_mutex_ m (fun () -> Condition.broadcast cond) with_mutex_ m (fun () -> Condition.broadcast cond)
end in end in
(module M) (module M)
) else ( ) else (
let cleanup () = let httpc = Httpc.create () in
emit_all_force (); let encoder = Pbrt.Encoder.create () in
C.cleanup ()
in
let module M = struct let module M = struct
let push_trace e = let push_trace e =
let@ () = guard_exn_ in let@ () = guard_exn_ in
Batch.push' batch_traces e; Batch.push' batch_traces e;
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now () : bool) ignore (emit_traces_maybe ~now httpc encoder : bool)
let push_metrics e = let push_metrics e =
let@ () = guard_exn_ in let@ () = guard_exn_ in
if Atomic.get needs_gc_metrics then sample_gc_metrics (); if Atomic.get needs_gc_metrics then sample_gc_metrics ();
Batch.push' batch_metrics e; Batch.push' batch_metrics e;
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
ignore (emit_metrics_maybe ~now () : bool) ignore (emit_metrics_maybe ~now httpc encoder : bool)
let push_logs e = let push_logs e =
let@ () = guard_exn_ in let@ () = guard_exn_ in
Batch.push' batch_logs e; Batch.push' batch_logs e;
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
ignore (emit_logs_maybe ~now () : bool) ignore (emit_logs_maybe ~now httpc encoder : bool)
let set_on_tick_callbacks = set_on_tick_callbacks let set_on_tick_callbacks = set_on_tick_callbacks
let tick () = let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics (); if Atomic.get needs_gc_metrics then sample_gc_metrics ();
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now () : bool); ignore (emit_traces_maybe ~now httpc encoder : bool);
ignore (emit_metrics_maybe ~now () : bool); ignore (emit_metrics_maybe ~now httpc encoder : bool);
ignore (emit_logs_maybe ~now () : bool); ignore (emit_logs_maybe ~now httpc encoder : bool);
() ()
let cleanup = cleanup let cleanup () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
emit_all_force httpc encoder;
Httpc.cleanup httpc
end in end in
(module M) (module M)
) )
module Backend (Arg : sig module Backend (Arg : sig
val stop : bool Atomic.t
val config : Config.t val config : Config.t
end) end)
() : Opentelemetry.Collector.BACKEND = struct () : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~config:Arg.config ()) include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
open Opentelemetry.Proto open Opentelemetry.Proto
open Opentelemetry.Collector open Opentelemetry.Collector
@ -574,11 +597,13 @@ end)
} }
end end
let setup_ ~(config : Config.t) () = let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
debug_ := config.debug; debug_ := config.debug;
let module B = let module B =
Backend Backend
(struct (struct
let stop = stop
let config = config let config = config
end) end)
() ()
@ -586,15 +611,15 @@ let setup_ ~(config : Config.t) () =
Opentelemetry.Collector.set_backend (module B); Opentelemetry.Collector.set_backend (module B);
B.cleanup B.cleanup
let setup ?(config = Config.make ()) ?(enable = true) () = let setup ?stop ?(config = Config.make ()) ?(enable = true) () =
if enable then ( if enable then (
let cleanup = setup_ ~config () in let cleanup = setup_ ?stop ~config () in
at_exit cleanup at_exit cleanup
) )
let with_setup ?(config = Config.make ()) ?(enable = true) () f = let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f =
if enable then ( if enable then (
let cleanup = setup_ ~config () in let cleanup = setup_ ?stop ~config () in
Fun.protect ~finally:cleanup f Fun.protect ~finally:cleanup f
) else ) else
f () f ()

View file

@ -20,13 +20,24 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
module Config = Config module Config = Config
val setup : ?config:Config.t -> ?enable:bool -> unit -> unit val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can @param enable actually setup the backend (default true). This can
be used to enable/disable the setup depending on CLI arguments be used to enable/disable the setup depending on CLI arguments
or environment. or environment.
@param config configuration to use *) @param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads
will all stop after a little while.
*)
val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a val with_setup :
?stop:bool Atomic.t ->
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a) ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns. *) after [f()] returns
See {!setup} for more details. *)

View file

@ -98,6 +98,7 @@ let () =
let debug = ref false in let debug = ref false in
let thread = ref true in let thread = ref true in
let n_bg_threads = ref 0 in
let batch_traces = ref 400 in let batch_traces = ref 400 in
let batch_metrics = ref 3 in let batch_metrics = ref 3 in
let opts = let opts =
@ -114,6 +115,7 @@ let () =
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"-j", Arg.Set_int n_jobs, " number of parallel jobs"; "-j", Arg.Set_int n_jobs, " number of parallel jobs";
"--bg-threads", Arg.Set_int n_bg_threads, " number of background threads";
] ]
|> Arg.align |> Arg.align
in in
@ -130,7 +132,14 @@ let () =
Opentelemetry_client_ocurl.Config.make ~debug:!debug Opentelemetry_client_ocurl.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces) ~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics) ~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread () ~thread:!thread
?bg_threads:
(let n = !n_bg_threads in
if n = 0 then
None
else
Some n)
()
in in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;
@ -142,4 +151,4 @@ let () =
Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!"
(Atomic.get num_tr) elapsed n_per_sec) (Atomic.get num_tr) elapsed n_per_sec)
in in
Opentelemetry_client_ocurl.with_setup ~config () run Opentelemetry_client_ocurl.with_setup ~stop ~config () run