Use multiple system threads in integration tests

This commit is contained in:
Shon Feder 2025-07-29 23:41:16 -04:00
parent ddbdc80d57
commit 0890a1a5cd
No known key found for this signature in database
5 changed files with 75 additions and 21 deletions

View file

@ -109,6 +109,7 @@ let () =
let batch_metrics = ref 3 in let batch_metrics = ref 3 in
let batch_logs = ref 400 in let batch_logs = ref 400 in
let url = ref None in let url = ref None in
let n_procs = ref 1 in
let opts = let opts =
[ [
"--debug", Arg.Bool (( := ) debug), " enable debug output"; "--debug", Arg.Bool (( := ) debug), " enable debug output";
@ -127,12 +128,18 @@ let () =
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"--iterations", Arg.Set_int iterations, " the number of iterations to run"; "--iterations", Arg.Set_int iterations, " the number of iterations to run";
"-j", Arg.Set_int n_jobs, " number of parallel jobs"; "-j", Arg.Set_int n_jobs, " number of parallel jobs";
"--procs", Arg.Set_int n_procs, " number of processes";
] ]
|> Arg.align |> Arg.align
in in
Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
if !n_procs > 1 then
failwith
"TODO: add support for running multiple processes to the lwt-cohttp \
emitter";
let some_if_nzero r = let some_if_nzero r =
if !r > 0 then if !r > 0 then
Some !r Some !r

View file

@ -11,7 +11,7 @@ let sleep_outer = ref 2.0
let n_jobs = ref 1 let n_jobs = ref 1
let iterations = ref 1 let iterations = Atomic.make 1
let num_sleep = Atomic.make 0 let num_sleep = Atomic.make 0
@ -32,8 +32,8 @@ let run_job clock _job_id : unit =
~attrs:[ "i", `Int !i ] ~attrs:[ "i", `Int !i ]
in in
for j = 0 to !iterations do for j = 0 to Atomic.get iterations do
if j >= !iterations then if j >= Atomic.get iterations then
(* Terminate program, having reached our max iterations *) (* Terminate program, having reached our max iterations *)
Atomic.set stop true Atomic.set stop true
else else
@ -80,7 +80,7 @@ let run_job clock _job_id : unit =
done done
done done
let run env : unit = let run env proc () : unit =
OT.GC_metrics.basic_setup (); OT.GC_metrics.basic_setup ();
OT.Metrics_callbacks.register (fun () -> OT.Metrics_callbacks.register (fun () ->
@ -91,7 +91,7 @@ let run env : unit =
]); ]);
let n_jobs = max 1 !n_jobs in let n_jobs = max 1 !n_jobs in
Printf.printf "run %d jobs\n%!" n_jobs; Printf.printf "run %d jobs in proc %d\n%!" n_jobs proc;
Eio.Switch.run (fun sw -> Eio.Switch.run (fun sw ->
for j = 1 to n_jobs do for j = 1 to n_jobs do
@ -109,6 +109,7 @@ let () =
let batch_metrics = ref 3 in let batch_metrics = ref 3 in
let batch_logs = ref 400 in let batch_logs = ref 400 in
let url = ref None in let url = ref None in
let n_procs = ref 1 in
let opts = let opts =
[ [
"--debug", Arg.Bool (( := ) debug), " enable debug output"; "--debug", Arg.Bool (( := ) debug), " enable debug output";
@ -125,8 +126,11 @@ let () =
"--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch";
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"--iterations", Arg.Set_int iterations, " the number of iterations to run"; ( "--iterations",
"-j", Arg.Set_int n_jobs, " number of parallel jobs"; Arg.Int (Atomic.set iterations),
" the number of iterations to run" );
"-j", Arg.Set_int n_jobs, " number of jobs per processes";
"--procs", Arg.Set_int n_procs, " number of processes";
] ]
|> Arg.align |> Arg.align
in in
@ -155,4 +159,16 @@ let () =
Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!"
(Atomic.get num_tr) elapsed n_per_sec) (Atomic.get num_tr) elapsed n_per_sec)
in in
Opentelemetry_client_cohttp_eio.with_setup ~stop ~config run |> Eio_main.run Eio_main.run @@ fun env ->
(if !n_procs < 2 then
Opentelemetry_client_cohttp_eio.with_setup ~stop ~config (run env 0) env
else
Eio.Switch.run @@ fun sw ->
Opentelemetry_client_cohttp_eio.setup ~stop ~config ~sw env;
let dm = Eio.Stdenv.domain_mgr env in
Eio.Switch.run (fun sw ->
for proc = 1 to !n_procs do
Eio.Fiber.fork ~sw @@ fun () ->
Eio.Domain_manager.run dm (run env proc)
done));
Opentelemetry.Collector.remove_backend () ~on_done:ignore

View file

@ -56,6 +56,15 @@ let filter_map_metrics f signals =
|> List.find_map (fun ss -> |> List.find_map (fun ss ->
ss.Proto.Metrics.metrics |> List.find_map f)) ss.Proto.Metrics.metrics |> List.find_map f))
let count_metrics_with_name name signals =
signals
|> filter_map_metrics (fun s ->
if String.equal s.Proto.Metrics.name name then
Some s
else
None)
|> List.length
let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float
= function = function
| Proto.Metrics.As_double f -> f | Proto.Metrics.As_double f -> f
@ -98,6 +107,7 @@ let count_logs_with_body p signals =
type params = { type params = {
url: string; url: string;
jobs: int; jobs: int;
procs: int;
batch_traces: int; batch_traces: int;
batch_metrics: int; batch_metrics: int;
batch_logs: int; batch_logs: int;
@ -109,6 +119,8 @@ let cmd exec params =
exec; exec;
"-j"; "-j";
string_of_int params.jobs; string_of_int params.jobs;
"--procs";
string_of_int params.procs;
"--url"; "--url";
params.url; params.url;
"--iterations"; "--iterations";
@ -134,22 +146,24 @@ let tests params signal_batches =
(* TODO: What properties of batch sizes does it make sense to test? *) (* TODO: What properties of batch sizes does it make sense to test? *)
test "loop.outer spans" (fun () -> test "loop.outer spans" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg:"number of occurrences should equal the configured jobs" ~msg:
~expected:params.jobs "number of occurrences should equal the configured jobs * the \
configured processes"
~expected:(params.jobs * params.procs)
~actual:(count_spans_with_name "loop.outer" signals)); ~actual:(count_spans_with_name "loop.outer" signals));
test "loop.inner spans" (fun () -> test "loop.inner spans" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg: ~msg:
"number of occurrences should equal the configured jobs * the \ "number of occurrences should equal the configured jobs * the \
configured iterations" configured iterations * configured processes"
~expected:(params.jobs * params.iterations) ~expected:(params.jobs * params.iterations * params.procs)
~actual:(count_spans_with_name "loop.inner" signals)); ~actual:(count_spans_with_name "loop.inner" signals));
test "alloc spans" (fun () -> test "alloc spans" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg: ~msg:
"number of occurrences should equal the configured jobs * the \ "number of occurrences should equal the configured jobs * the \
configured iterations" configured iterations * configured processes"
~expected:(params.jobs * params.iterations) ~expected:(params.jobs * params.iterations * params.procs)
~actual:(count_spans_with_name "alloc" signals); ~actual:(count_spans_with_name "alloc" signals);
Alcotest.(check' bool) Alcotest.(check' bool)
~msg:"should have 'done with alloc' event" ~expected:true ~msg:"should have 'done with alloc' event" ~expected:true
@ -167,16 +181,19 @@ let tests params signal_batches =
|> List.for_all (fun (e : Proto.Trace.span_event) -> |> List.for_all (fun (e : Proto.Trace.span_event) ->
String.equal e.name "done with alloc"))); String.equal e.name "done with alloc")));
test "num-sleep metrics" (fun () -> test "num-sleep metrics" (fun () ->
Alcotest.(check' (float 0.)) Alcotest.(check' bool)
~msg:"should record jobs * iterations sleeps" ~msg:
~expected:(params.jobs * params.iterations |> float_of_int) "should record at lest as many sleep metrics as there are \
iterations configured"
~expected:true
~actual: ~actual:
(get_metric_values "num-sleep" signals (count_metrics_with_name "num-sleep" signals >= params.iterations));
|> List.sort Float.compare |> List.rev |> List.hd));
test "logs" (fun () -> test "logs" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg:"should record jobs * iterations occurrences of 'inner at n'" ~msg:
~expected:(params.jobs * params.iterations) "should record jobs * iterations occurrences * configured \
processes of 'inner at n'"
~expected:(params.jobs * params.iterations * params.procs)
~actual: ~actual:
(signals (signals
|> count_logs_with_body (function |> count_logs_with_body (function

View file

@ -15,6 +15,7 @@ let () =
{ {
url; url;
jobs = 1; jobs = 1;
procs = 1;
iterations = 1; iterations = 1;
batch_traces = 2; batch_traces = 2;
batch_metrics = 2; batch_metrics = 2;
@ -24,6 +25,17 @@ let () =
{ {
url; url;
jobs = 3; jobs = 3;
procs = 1;
iterations = 1;
batch_traces = 400;
batch_metrics = 3;
batch_logs = 400;
} );
( "emit1_eio",
{
url;
jobs = 3;
procs = 3;
iterations = 1; iterations = 1;
batch_traces = 400; batch_traces = 400;
batch_metrics = 3; batch_metrics = 3;

View file

@ -25,6 +25,7 @@ let () =
{ {
url; url;
jobs = 1; jobs = 1;
procs = 1;
iterations = 1; iterations = 1;
batch_traces = 2; batch_traces = 2;
batch_metrics = 2; batch_metrics = 2;
@ -34,6 +35,7 @@ let () =
{ {
url; url;
jobs = 3; jobs = 3;
procs = 1;
iterations = 1; iterations = 1;
batch_traces = 400; batch_traces = 400;
batch_metrics = 3; batch_metrics = 3;