mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
test binaries: updates
This commit is contained in:
parent
e4177c2843
commit
c9f5a27b22
17 changed files with 182 additions and 145 deletions
|
|
@ -7,6 +7,7 @@
|
|||
(libraries
|
||||
(re_export opentelemetry)
|
||||
(re_export opentelemetry-client)
|
||||
(re_export opentelemetry-client.sync)
|
||||
(re_export eio)
|
||||
(re_export eio.core)
|
||||
(re_export eio.unix)
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ let create_consumer ?(config = Config.make ()) ~sw ~env () :
|
|||
let create_exporter ?(config = Config.make ()) ~sw ~env () =
|
||||
let consumer = create_consumer ~config ~sw ~env () in
|
||||
let bq =
|
||||
Bounded_queue_sync.create
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~high_watermark:Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer ()
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@
|
|||
(re_export opentelemetry-lwt)
|
||||
(re_export opentelemetry-client)
|
||||
(re_export opentelemetry-client.lwt)
|
||||
(re_export opentelemetry-client.sync)
|
||||
(re_export lwt)
|
||||
(re_export lwt.unix)
|
||||
(re_export cohttp-lwt)
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ let create_consumer ?(config = Config.make ()) () =
|
|||
let create_exporter ?(config = Config.make ()) () =
|
||||
let consumer = create_consumer ~config () in
|
||||
let bq =
|
||||
Bounded_queue_sync.create
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~high_watermark:Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer ()
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
(re_export opentelemetry)
|
||||
opentelemetry.atomic
|
||||
(re_export opentelemetry-client)
|
||||
(re_export opentelemetry-client.sync)
|
||||
(re_export opentelemetry-client.lwt)
|
||||
threads
|
||||
pbrt
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ let create_consumer ?(config = Config.make ()) () =
|
|||
let create_exporter ?(config = Config.make ()) () =
|
||||
let consumer = create_consumer ~config () in
|
||||
let bq =
|
||||
Bounded_queue_sync.create
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~high_watermark:Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
Exporter_queued.create ~clock:Clock.ptime_clock ~q:bq ~consumer ()
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
(re_export opentelemetry)
|
||||
opentelemetry.atomic
|
||||
(re_export opentelemetry-client)
|
||||
(re_export opentelemetry-client.sync)
|
||||
(re_export curl)
|
||||
unix
|
||||
pbrt
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ let n_bytes_sent : int Atomic.t = Atomic.make 0
|
|||
type error = OTELC.Export_error.t
|
||||
|
||||
open struct
|
||||
module Notifier = OTELC.Notifier_sync
|
||||
module IO = OTELC.Io_sync
|
||||
module Notifier = Opentelemetry_client_sync.Notifier_sync
|
||||
module IO = Opentelemetry_client_sync.Io_sync
|
||||
end
|
||||
|
||||
module Httpc : OTELC.Generic_http_consumer.HTTPC with module IO = IO = struct
|
||||
|
|
@ -88,7 +88,7 @@ let consumer ?(config = Config.make ()) () :
|
|||
let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t =
|
||||
let consumer = consumer ~config () in
|
||||
let bq =
|
||||
OTELC.Bounded_queue_sync.create
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark ()
|
||||
in
|
||||
|
||||
|
|
@ -99,7 +99,7 @@ let create_backend = create_exporter
|
|||
|
||||
let shutdown_and_wait ?(after_shutdown = ignore) (self : OTEL.Exporter.t) : unit
|
||||
=
|
||||
let open Opentelemetry_client in
|
||||
let open Opentelemetry_client_sync in
|
||||
let sq = Sync_queue.create () in
|
||||
OTEL.Aswitch.on_turn_off (OTEL.Exporter.active self) (fun () ->
|
||||
Sync_queue.push sq ());
|
||||
|
|
@ -119,13 +119,14 @@ let setup_ ?(config : Config.t = Config.make ()) () : OTEL.Exporter.t =
|
|||
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
|
||||
let active = OTEL.Exporter.active exporter in
|
||||
ignore
|
||||
(OTELC.Util_thread.setup_ticker_thread ~active ~sleep_ms exporter ()
|
||||
(Opentelemetry_client_sync.Util_thread.setup_ticker_thread ~active
|
||||
~sleep_ms exporter ()
|
||||
: Thread.t)
|
||||
);
|
||||
exporter
|
||||
|
||||
let remove_exporter () : unit =
|
||||
let open Opentelemetry_client in
|
||||
let open Opentelemetry_client_sync in
|
||||
(* used to wait *)
|
||||
let sq = Sync_queue.create () in
|
||||
OTEL.Main_exporter.remove () ~on_done:(fun () -> Sync_queue.push sq ());
|
||||
|
|
|
|||
|
|
@ -18,21 +18,26 @@ module Tracer = struct
|
|||
include Tracer
|
||||
|
||||
(** Sync span guard *)
|
||||
let with_ ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state ?attrs
|
||||
?kind ?trace_id ?parent ?links name (cb : Span.t -> 'a Lwt.t) : 'a Lwt.t =
|
||||
let with_ (type a) ?(tracer = dynamic_main) ?force_new_trace_id ?trace_state
|
||||
?attrs ?kind ?trace_id ?parent ?links name (cb : Span.t -> a Lwt.t) :
|
||||
a Lwt.t =
|
||||
let thunk, finally =
|
||||
with_thunk_and_finally tracer ?force_new_trace_id ?trace_state ?attrs
|
||||
?kind ?trace_id ?parent ?links name cb
|
||||
in
|
||||
|
||||
try%lwt
|
||||
let* rv = thunk () in
|
||||
let () = finally (Ok ()) in
|
||||
Lwt.return rv
|
||||
with e ->
|
||||
match thunk () with
|
||||
| exception exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
let () = finally (Error (e, bt)) in
|
||||
reraise e
|
||||
finally (Error (exn, bt));
|
||||
Printexc.raise_with_backtrace exn bt
|
||||
| promise ->
|
||||
Lwt.on_any promise
|
||||
(fun _ -> finally (Ok ()))
|
||||
(fun exn ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
finally (Error (exn, bt)));
|
||||
promise
|
||||
end
|
||||
|
||||
module Trace = Tracer [@@deprecated "use Tracer"]
|
||||
|
|
|
|||
|
|
@ -14,10 +14,9 @@ let mk_client ~scope =
|
|||
|
||||
let run () =
|
||||
let open Lwt.Syntax in
|
||||
let tracer = OT.Tracer.get_main () in
|
||||
let rec go () =
|
||||
let@ scope =
|
||||
Otel_lwt.Tracer.with_ tracer ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
Otel_lwt.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
in
|
||||
let* () = Lwt_unix.sleep !sleep_outer in
|
||||
let module C = (val mk_client ~scope) in
|
||||
|
|
|
|||
|
|
@ -25,7 +25,12 @@
|
|||
(executable
|
||||
(name emit1_stdout)
|
||||
(modules emit1_stdout)
|
||||
(libraries unix threads opentelemetry opentelemetry-client))
|
||||
(libraries
|
||||
unix
|
||||
threads
|
||||
opentelemetry
|
||||
opentelemetry-client
|
||||
opentelemetry-client.sync))
|
||||
|
||||
(executable
|
||||
(name emit1_cohttp)
|
||||
|
|
@ -78,7 +83,10 @@
|
|||
(modules cohttp_client)
|
||||
(libraries
|
||||
cohttp-lwt-unix
|
||||
lwt
|
||||
lwt.unix
|
||||
unix
|
||||
uri
|
||||
opentelemetry
|
||||
opentelemetry-client-cohttp-lwt
|
||||
opentelemetry-cohttp-lwt))
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ let sleep_outer = ref 2.0
|
|||
|
||||
let n_jobs = ref 1
|
||||
|
||||
let iterations = ref 4
|
||||
|
||||
let n = ref max_int
|
||||
|
||||
let num_sleep = Atomic.make 0
|
||||
|
|
@ -21,25 +23,24 @@ let num_tr = Atomic.make 0
|
|||
|
||||
let run_job () =
|
||||
let active = OT.Main_exporter.active () in
|
||||
let tracer = OT.Tracer.get_main () in
|
||||
let i = ref 0 in
|
||||
let cnt = ref 0 in
|
||||
|
||||
while OT.Aswitch.is_on active && !cnt < !n do
|
||||
let@ _scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int !i ]
|
||||
in
|
||||
|
||||
(* Printf.printf "cnt=%d\n%!" !cnt; *)
|
||||
incr cnt;
|
||||
|
||||
for j = 0 to 4 do
|
||||
for j = 1 to !iterations do
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal ~parent:_scope
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:_scope
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
|
|
@ -59,8 +60,8 @@ let run_job () =
|
|||
(* allocate some stuff *)
|
||||
if !stress_alloc_ then (
|
||||
let@ _ =
|
||||
OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal
|
||||
~parent:scope "alloc"
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:scope
|
||||
"alloc"
|
||||
in
|
||||
Atomic.incr num_tr;
|
||||
|
||||
|
|
@ -92,12 +93,13 @@ let run () =
|
|||
OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set ->
|
||||
OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics;
|
||||
OT.Metrics_callbacks.add_metrics_cb set (fun () ->
|
||||
let now = OT.Clock.now_main () in
|
||||
OT.Metrics.
|
||||
[
|
||||
sum ~name:"num-sleep" ~is_monotonic:true
|
||||
[ int (Atomic.get num_sleep) ];
|
||||
[ int ~now (Atomic.get num_sleep) ];
|
||||
sum ~name:"otel.bytes-sent" ~is_monotonic:true ~unit_:"B"
|
||||
[ int (Opentelemetry_client_ocurl.n_bytes_sent ()) ];
|
||||
[ int ~now (Opentelemetry_client_ocurl.n_bytes_sent ()) ];
|
||||
]));
|
||||
|
||||
let n_jobs = max 1 !n_jobs in
|
||||
|
|
@ -124,6 +126,8 @@ let () =
|
|||
let final_stats = ref false in
|
||||
|
||||
let n_bg_threads = ref 0 in
|
||||
let url = ref None in
|
||||
let n_procs = ref 1 in
|
||||
let opts =
|
||||
[
|
||||
"--debug", Arg.Bool (( := ) debug), " enable debug output";
|
||||
|
|
@ -140,14 +144,26 @@ let () =
|
|||
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
|
||||
"--bg-threads", Arg.Set_int n_bg_threads, " number of background threads";
|
||||
"--no-self-trace", Arg.Clear self_trace, " disable self tracing";
|
||||
"-n", Arg.Set_int n, " number of iterations (default ∞)";
|
||||
"-n", Arg.Set_int n, " number of outer iterations (default ∞)";
|
||||
( "--iterations",
|
||||
Arg.Set_int iterations,
|
||||
" the number of inner iterations to run" );
|
||||
( "--url",
|
||||
Arg.String (fun s -> url := Some s),
|
||||
" set the url for the OTel collector" );
|
||||
"--final-stats", Arg.Set final_stats, " display some metrics at the end";
|
||||
"--procs", Arg.Set_int n_procs, " number of processes (stub)";
|
||||
]
|
||||
|> Arg.align
|
||||
in
|
||||
|
||||
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
|
||||
|
||||
if !n_procs > 1 then
|
||||
failwith
|
||||
"TODO: add support for running multiple processes to the lwt-cohttp \
|
||||
emitter";
|
||||
|
||||
let some_if_nzero r =
|
||||
if !r > 0 then
|
||||
Some !r
|
||||
|
|
@ -156,6 +172,7 @@ let () =
|
|||
in
|
||||
let config =
|
||||
Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:!self_trace
|
||||
?url:!url
|
||||
?bg_threads:(some_if_nzero n_bg_threads)
|
||||
~batch_traces:(some_if_nzero batch_traces)
|
||||
~batch_metrics:(some_if_nzero batch_metrics)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,8 @@ let sleep_outer = ref 2.0
|
|||
|
||||
let n_jobs = ref 1
|
||||
|
||||
let n = ref max_int
|
||||
|
||||
let iterations = ref 1
|
||||
|
||||
let num_sleep = Atomic.make 0
|
||||
|
|
@ -20,61 +22,53 @@ let stress_alloc_ = ref true
|
|||
|
||||
let num_tr = Atomic.make 0
|
||||
|
||||
(* Counter used to mark simulated failures *)
|
||||
let i = ref 0
|
||||
|
||||
let run_job job_id : unit Lwt.t =
|
||||
let switch = T.Main_exporter.active () in
|
||||
while%lwt T.Aswitch.is_on switch do
|
||||
let tracer = T.Tracer.get_main () in
|
||||
let i = ref 0 in
|
||||
while%lwt T.Aswitch.is_on switch && !i < !n do
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
T.Tracer.with_ ~tracer ~kind:T.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int job_id ]
|
||||
T.Tracer.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int !i; "job_id", `Int job_id ]
|
||||
in
|
||||
|
||||
for%lwt j = 0 to !iterations do
|
||||
if j >= !iterations then
|
||||
(* Terminate program, having reached our max iterations *)
|
||||
T.Main_exporter.remove ()
|
||||
else
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ span =
|
||||
Atomic.incr num_tr;
|
||||
T.Tracer.with_ ~tracer ~parent:scope ~kind:T.Span.Span_kind_internal
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
incr i;
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_outer in
|
||||
for%lwt j = 1 to !iterations do
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ span =
|
||||
Atomic.incr num_tr;
|
||||
T.Tracer.with_ ~parent:scope ~kind:T.Span.Span_kind_internal
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_outer in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
T.Logger.logf ~trace_id:(T.Span.trace_id span) ~span_id:(T.Span.id span)
|
||||
~severity:Severity_number_info (fun k -> k "inner at %d" j);
|
||||
|
||||
try%lwt
|
||||
Atomic.incr num_tr;
|
||||
let@ scope =
|
||||
T.Tracer.with_ ~kind:T.Span.Span_kind_internal ~parent:span "alloc"
|
||||
in
|
||||
(* allocate some stuff *)
|
||||
if !stress_alloc_ then (
|
||||
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
|
||||
ignore _arr
|
||||
);
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_inner in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
T.Logger.logf ~trace_id:(T.Span.trace_id span) ~span_id:(T.Span.id span)
|
||||
~severity:Severity_number_info (fun k -> k "inner at %d" j);
|
||||
(* simulate a failure *)
|
||||
if j = 4 && !i mod 13 = 0 then failwith "oh no";
|
||||
|
||||
incr i;
|
||||
|
||||
try%lwt
|
||||
Atomic.incr num_tr;
|
||||
let@ scope =
|
||||
T.Tracer.with_ ~tracer ~kind:T.Span.Span_kind_internal ~parent:span
|
||||
"alloc"
|
||||
in
|
||||
(* allocate some stuff *)
|
||||
if !stress_alloc_ then (
|
||||
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
|
||||
ignore _arr
|
||||
);
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_inner in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
(* simulate a failure *)
|
||||
if j = 4 && !i mod 13 = 0 then failwith "oh no";
|
||||
|
||||
T.Span.add_event scope (T.Event.make "done with alloc");
|
||||
Lwt.return ()
|
||||
with Failure _ -> Lwt.return ()
|
||||
T.Span.add_event scope (T.Event.make "done with alloc");
|
||||
Lwt.return ()
|
||||
with Failure _ -> Lwt.return ()
|
||||
done
|
||||
done
|
||||
|
||||
|
|
@ -84,10 +78,11 @@ let run () : unit Lwt.t =
|
|||
T.Metrics_callbacks.(
|
||||
with_set_added_to_main_exporter (fun set ->
|
||||
add_metrics_cb set (fun () ->
|
||||
let now = T.Clock.now_main () in
|
||||
T.Metrics.
|
||||
[
|
||||
sum ~name:"num-sleep" ~is_monotonic:true
|
||||
[ int (Atomic.get num_sleep) ];
|
||||
[ int ~now (Atomic.get num_sleep) ];
|
||||
])));
|
||||
|
||||
let n_jobs = max 1 !n_jobs in
|
||||
|
|
@ -124,9 +119,12 @@ let () =
|
|||
"--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch";
|
||||
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
|
||||
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
|
||||
"--iterations", Arg.Set_int iterations, " the number of iterations to run";
|
||||
( "--iterations",
|
||||
Arg.Set_int iterations,
|
||||
" the number of inner iterations to run" );
|
||||
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
|
||||
"--procs", Arg.Set_int n_procs, " number of processes";
|
||||
"-n", Arg.Set_int n, " number of outer iterations (default ∞)";
|
||||
"--procs", Arg.Set_int n_procs, " number of processes (stub)";
|
||||
]
|
||||
|> Arg.align
|
||||
in
|
||||
|
|
|
|||
|
|
@ -15,25 +15,22 @@ let stress_alloc_ = ref true
|
|||
|
||||
let num_sleep = Atomic.make 0
|
||||
|
||||
let stop = Atomic.make false
|
||||
|
||||
let num_tr = Atomic.make 0
|
||||
|
||||
(* Counter used to mark simulated failures *)
|
||||
let i = Atomic.make 0
|
||||
let n = ref max_int
|
||||
|
||||
let run_job clock _job_id iterations : unit =
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int (Atomic.get i) ]
|
||||
in
|
||||
let i = ref 0 in
|
||||
while OT.Aswitch.is_on (OT.Main_exporter.active ()) && !i < !n do
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int !i ]
|
||||
in
|
||||
|
||||
for j = 0 to iterations do
|
||||
if j >= iterations then
|
||||
(* Terminate program, having reached our max iterations *)
|
||||
Atomic.set stop true
|
||||
else
|
||||
incr i;
|
||||
|
||||
for j = 1 to iterations do
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
|
|
@ -49,8 +46,6 @@ let run_job clock _job_id iterations : unit =
|
|||
~span_id:(OT.Span.id scope) ~severity:Severity_number_info (fun k ->
|
||||
k "inner at %d" j);
|
||||
|
||||
Atomic.incr i;
|
||||
|
||||
try
|
||||
Atomic.incr num_tr;
|
||||
let@ scope =
|
||||
|
|
@ -65,18 +60,21 @@ let run_job clock _job_id iterations : unit =
|
|||
let () = Eio.Time.sleep clock !sleep_inner in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
if j = 4 && Atomic.get i mod 13 = 0 then failwith "oh no";
|
||||
if j = 4 && !i mod 13 = 0 then failwith "oh no";
|
||||
|
||||
(* simulate a failure *)
|
||||
OT.Span.add_event scope (OT.Event.make "done with alloc")
|
||||
with Failure _ -> ()
|
||||
done
|
||||
done
|
||||
|
||||
let run env proc iterations () : unit =
|
||||
OT.Gc_metrics.setup_on_main_exporter ();
|
||||
|
||||
OT.Metrics_callbacks.(
|
||||
with_set_added_to_main_exporter (fun set ->
|
||||
with_set_added_to_main_exporter
|
||||
~min_interval:Mtime.Span.(10 * ms)
|
||||
(fun set ->
|
||||
add_metrics_cb set (fun () ->
|
||||
let now = OT.Clock.now_main () in
|
||||
OT.Metrics.
|
||||
|
|
@ -126,6 +124,7 @@ let () =
|
|||
Arg.Set_int n_iterations,
|
||||
" the number of iterations to run" );
|
||||
"-j", Arg.Set_int n_jobs, " number of jobs per processes";
|
||||
"-n", Arg.Set_int n, " number of iterations (default ∞)";
|
||||
"--procs", Arg.Set_int n_procs, " number of processes";
|
||||
]
|
||||
|> Arg.align
|
||||
|
|
@ -156,16 +155,14 @@ let () =
|
|||
(Atomic.get num_tr) elapsed n_per_sec)
|
||||
in
|
||||
Eio_main.run @@ fun env ->
|
||||
(if !n_procs < 2 then
|
||||
Opentelemetry_client_cohttp_eio.with_setup ~config
|
||||
(run env 0 !n_iterations) env
|
||||
else
|
||||
Eio.Switch.run @@ fun sw ->
|
||||
Opentelemetry_client_cohttp_eio.setup ~config ~sw env;
|
||||
let dm = Eio.Stdenv.domain_mgr env in
|
||||
Eio.Switch.run (fun sw ->
|
||||
for proc = 1 to !n_procs do
|
||||
Eio.Fiber.fork ~sw @@ fun () ->
|
||||
Eio.Domain_manager.run dm (run env proc !n_iterations)
|
||||
done));
|
||||
OT.Main_exporter.remove () ~on_done:ignore
|
||||
if !n_procs < 2 then
|
||||
Opentelemetry_client_cohttp_eio.with_setup ~config env
|
||||
(run env 0 !n_iterations)
|
||||
else
|
||||
Opentelemetry_client_cohttp_eio.with_setup ~config env @@ fun () ->
|
||||
let dm = Eio.Stdenv.domain_mgr env in
|
||||
Eio.Switch.run (fun sw ->
|
||||
for proc = 1 to !n_procs do
|
||||
Eio.Fiber.fork ~sw @@ fun () ->
|
||||
Eio.Domain_manager.run dm (run env proc !n_iterations)
|
||||
done)
|
||||
|
|
|
|||
|
|
@ -21,14 +21,13 @@ let num_tr = Atomic.make 0
|
|||
|
||||
let run_job () : unit Lwt.t =
|
||||
let active = OT.Main_exporter.active () in
|
||||
let tracer = OT.Tracer.get_main () in
|
||||
let i = ref 0 in
|
||||
let cnt = ref 0 in
|
||||
|
||||
while%lwt OT.Aswitch.is_on active && !cnt < !n do
|
||||
let@ _scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ tracer ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int !i ]
|
||||
in
|
||||
|
||||
|
|
@ -39,7 +38,7 @@ let run_job () : unit Lwt.t =
|
|||
(* parent scope is found via thread local storage *)
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ tracer ~kind:OT.Span.Span_kind_internal ~parent:_scope
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:_scope
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
|
|
@ -49,13 +48,9 @@ let run_job () : unit Lwt.t =
|
|||
Atomic.incr num_sleep
|
||||
);
|
||||
|
||||
let logger = OT.Logger.get_main () in
|
||||
OT.Emitter.emit logger
|
||||
[
|
||||
OT.Log_record.make_strf ~trace_id:(OT.Span.trace_id scope)
|
||||
~span_id:(OT.Span.id scope) ~severity:Severity_number_info
|
||||
"inner at %d" j;
|
||||
];
|
||||
OT.Logger.logf ~trace_id:(OT.Span.trace_id scope)
|
||||
~span_id:(OT.Span.id scope) ~severity:Severity_number_info (fun k ->
|
||||
k "inner at %d" j);
|
||||
|
||||
incr i;
|
||||
|
||||
|
|
@ -63,8 +58,8 @@ let run_job () : unit Lwt.t =
|
|||
(* allocate some stuff *)
|
||||
if !stress_alloc_ then (
|
||||
let@ _ =
|
||||
OT.Tracer.with_ tracer ~kind:OT.Span.Span_kind_internal
|
||||
~parent:scope "alloc"
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:scope
|
||||
"alloc"
|
||||
in
|
||||
Atomic.incr num_tr;
|
||||
|
||||
|
|
@ -95,10 +90,11 @@ let run () : unit Lwt.t =
|
|||
OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set ->
|
||||
OT.Metrics_callbacks.add_metrics_cb set OT.Main_exporter.self_metrics;
|
||||
OT.Metrics_callbacks.add_metrics_cb set (fun () ->
|
||||
let now = OT.Clock.now_main () in
|
||||
OT.Metrics.
|
||||
[
|
||||
sum ~name:"num-sleep" ~is_monotonic:true
|
||||
[ int (Atomic.get num_sleep) ];
|
||||
[ int ~now (Atomic.get num_sleep) ];
|
||||
]));
|
||||
|
||||
let n_jobs = max 1 !n_jobs in
|
||||
|
|
@ -180,8 +176,7 @@ let () =
|
|||
)
|
||||
in
|
||||
|
||||
Lwt_main.run
|
||||
@@
|
||||
let@ () = Fun.protect ~finally in
|
||||
Opentelemetry_client_ocurl_lwt.with_setup ~config () run
|
||||
~after_shutdown:after_exp_shutdown
|
||||
Lwt_main.run
|
||||
(Opentelemetry_client_ocurl_lwt.with_setup ~config () run
|
||||
~after_shutdown:after_exp_shutdown)
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ let sleep_outer = ref 2.0
|
|||
|
||||
let n_jobs = ref 1
|
||||
|
||||
let iterations = ref 4
|
||||
|
||||
let n = ref max_int
|
||||
|
||||
let num_sleep = Atomic.make 0
|
||||
|
|
@ -21,25 +23,24 @@ let num_tr = Atomic.make 0
|
|||
|
||||
let run_job () =
|
||||
let active = OT.Main_exporter.active () in
|
||||
let tracer = OT.Tracer.get_main () in
|
||||
let i = ref 0 in
|
||||
let cnt = ref 0 in
|
||||
|
||||
while OT.Aswitch.is_on active && !cnt < !n do
|
||||
let@ _scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int !i ]
|
||||
in
|
||||
|
||||
(* Printf.printf "cnt=%d\n%!" !cnt; *)
|
||||
incr cnt;
|
||||
|
||||
for j = 0 to 4 do
|
||||
for j = 1 to !iterations do
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal ~parent:_scope
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:_scope
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
|
|
@ -60,8 +61,8 @@ let run_job () =
|
|||
(* allocate some stuff *)
|
||||
(if !stress_alloc_ then
|
||||
let@ _ =
|
||||
OT.Tracer.with_ ~tracer ~kind:OT.Span.Span_kind_internal
|
||||
~parent:scope "alloc"
|
||||
OT.Tracer.with_ ~kind:OT.Span.Span_kind_internal ~parent:scope
|
||||
"alloc"
|
||||
in
|
||||
let _arr : _ array =
|
||||
Sys.opaque_identity @@ Array.make (25 * 25551) 42.0
|
||||
|
|
@ -88,10 +89,11 @@ let run () =
|
|||
|
||||
OT.Metrics_callbacks.with_set_added_to_main_exporter (fun set ->
|
||||
OT.Metrics_callbacks.add_metrics_cb set (fun () ->
|
||||
let now = OT.Clock.now_main () in
|
||||
OT.Metrics.
|
||||
[
|
||||
sum ~name:"num-sleep" ~is_monotonic:true
|
||||
[ int (Atomic.get num_sleep) ];
|
||||
[ int ~now (Atomic.get num_sleep) ];
|
||||
]));
|
||||
|
||||
let n_jobs = max 1 !n_jobs in
|
||||
|
|
@ -105,7 +107,9 @@ let run () =
|
|||
Array.iter Thread.join jobs
|
||||
|
||||
module Consumer_exporter =
|
||||
OTC.Generic_consumer_exporter.Make (OTC.Io_sync) (OTC.Notifier_sync)
|
||||
OTC.Generic_consumer_exporter.Make
|
||||
(Opentelemetry_client_sync.Io_sync)
|
||||
(Opentelemetry_client_sync.Notifier_sync)
|
||||
|
||||
let () =
|
||||
OT.Globals.service_name := "t1";
|
||||
|
|
@ -137,7 +141,10 @@ let () =
|
|||
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
|
||||
"--bg-threads", Arg.Set_int n_bg_threads, " number of background threads";
|
||||
"--no-self-trace", Arg.Clear self_trace, " disable self tracing";
|
||||
"-n", Arg.Set_int n, " number of iterations (default ∞)";
|
||||
"-n", Arg.Set_int n, " number of outer iterations (default ∞)";
|
||||
( "--iterations",
|
||||
Arg.Set_int iterations,
|
||||
" the number of inner iterations to run" );
|
||||
"--queued", Arg.Set queued, " queue exporter";
|
||||
]
|
||||
|> Arg.align
|
||||
|
|
@ -145,21 +152,29 @@ let () =
|
|||
|
||||
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
|
||||
|
||||
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ queued: %ba@]@."
|
||||
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ queued: %b@]@."
|
||||
!sleep_outer !sleep_inner !queued;
|
||||
|
||||
let exporter =
|
||||
let exporter, finally =
|
||||
let exp = OTC.Exporter_stdout.stdout () in
|
||||
if !queued then (
|
||||
let q = OTC.Bounded_queue_sync.create ~high_watermark:20_000 () in
|
||||
OTC.Exporter_queued.create ~clock:exp.clock ~q
|
||||
~consumer:(Consumer_exporter.consumer exp)
|
||||
()
|
||||
let q =
|
||||
Opentelemetry_client_sync.Bounded_queue_sync.create
|
||||
~high_watermark:20_000 ()
|
||||
in
|
||||
let exp =
|
||||
OTC.Exporter_queued.create ~clock:exp.clock ~q
|
||||
~consumer:(Consumer_exporter.consumer exp)
|
||||
()
|
||||
in
|
||||
let finally () = Opentelemetry_client_sync.Shutdown_sync.shutdown exp in
|
||||
exp, finally
|
||||
) else
|
||||
exp
|
||||
exp, ignore
|
||||
in
|
||||
|
||||
OT.Main_exporter.set exporter;
|
||||
let@ () = Fun.protect ~finally in
|
||||
|
||||
if !self_trace then Opentelemetry_client.Self_trace.set_enabled true;
|
||||
|
||||
|
|
|
|||
|
|
@ -36,9 +36,7 @@ let run () =
|
|||
Logs.err (fun m -> m "emit_logs: error log");
|
||||
Logs.app (fun m -> m "emit_logs: app log");
|
||||
let%lwt () =
|
||||
let tracer = T.Tracer.get_main () in
|
||||
T.Tracer.with_ ~tracer ~kind:T.Span.Span_kind_producer "my_scope"
|
||||
(fun _scope ->
|
||||
T.Tracer.with_ ~kind:T.Span.Span_kind_producer "my_scope" (fun _scope ->
|
||||
Logs.info (fun m ->
|
||||
m ~tags:varied_tag_set
|
||||
"emit_logs: this log is emitted with varied tags from a span");
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue