wip: fix the e2e tests

not clear exactly why there is a discrepancy currently whenever -j is
higher than 1
This commit is contained in:
Simon Cruanes 2025-12-17 16:14:32 -05:00
parent bf7eaa97bd
commit 5596552379
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
10 changed files with 122 additions and 29 deletions

View file

@ -11,7 +11,7 @@ let test_config_printing () =
\ self_trace=false; url_traces=\"http://localhost:4318/v1/traces\";\n\ \ self_trace=false; url_traces=\"http://localhost:4318/v1/traces\";\n\
\ url_metrics=\"http://localhost:4318/v1/metrics\";\n\ \ url_metrics=\"http://localhost:4318/v1/metrics\";\n\
\ url_logs=\"http://localhost:4318/v1/logs\"; headers=[]; batch_traces=400;\n\ \ url_logs=\"http://localhost:4318/v1/logs\"; headers=[]; batch_traces=400;\n\
\ batch_metrics=20; batch_logs=400; batch_timeout_ms=2000;\n\ \ batch_metrics=200; batch_logs=400; batch_timeout_ms=2000;\n\
\ http_concurrency_level=None }" \ http_concurrency_level=None }"
in in
check' string ~msg:"is rendered correctly" ~actual ~expected check' string ~msg:"is rendered correctly" ~actual ~expected

View file

@ -110,13 +110,14 @@ type params = {
url: string; url: string;
jobs: int; jobs: int;
procs: int; procs: int;
n_outer: int;
batch_traces: int; batch_traces: int;
batch_metrics: int; batch_metrics: int;
batch_logs: int; batch_logs: int;
iterations: int; iterations: int;
} }
let cmd exec params = let cmd exec params : string list =
[ [
exec; exec;
"-j"; "-j";
@ -125,6 +126,8 @@ let cmd exec params =
string_of_int params.procs; string_of_int params.procs;
"--url"; "--url";
params.url; params.url;
"-n";
string_of_int params.n_outer;
"--iterations"; "--iterations";
string_of_int params.iterations; string_of_int params.iterations;
"--batch-traces"; "--batch-traces";
@ -151,21 +154,23 @@ let tests params signal_batches =
~msg: ~msg:
"number of occurrences should equal the configured jobs * the \ "number of occurrences should equal the configured jobs * the \
configured processes" configured processes"
~expected:(params.jobs * params.procs) ~expected:(params.jobs * params.procs * params.n_outer)
~actual:(count_spans_with_name "loop.outer" signals)); ~actual:(count_spans_with_name "loop.outer" signals));
test "loop.inner spans" (fun () -> test "loop.inner spans" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg: ~msg:
"number of occurrences should equal the configured jobs * the \ "number of occurrences should equal the configured jobs * the \
configured iterations * configured processes" configured iterations * configured processes"
~expected:(params.jobs * params.iterations * params.procs) ~expected:
(params.jobs * params.iterations * params.procs * params.n_outer)
~actual:(count_spans_with_name "loop.inner" signals)); ~actual:(count_spans_with_name "loop.inner" signals));
test "alloc spans" (fun () -> test "alloc spans" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg: ~msg:
"number of occurrences should equal the configured jobs * the \ "number of occurrences should equal the configured jobs * the \
configured iterations * configured processes" configured iterations * configured processes"
~expected:(params.jobs * params.iterations * params.procs) ~expected:
(params.jobs * params.iterations * params.procs * params.n_outer)
~actual:(count_spans_with_name "alloc" signals); ~actual:(count_spans_with_name "alloc" signals);
Alcotest.(check' bool) Alcotest.(check' bool)
~msg:"should have 'done with alloc' event" ~expected:true ~msg:"should have 'done with alloc' event" ~expected:true
@ -193,9 +198,10 @@ let tests params signal_batches =
test "logs" (fun () -> test "logs" (fun () ->
Alcotest.(check' int) Alcotest.(check' int)
~msg: ~msg:
"should record jobs * iterations occurrences * configured \ "should record jobs * iterations occurrences * configured * n \
processes of 'inner at n'" processes of 'inner at n'"
~expected:(params.jobs * params.iterations * params.procs) ~expected:
(params.jobs * params.iterations * params.procs * params.n_outer)
~actual: ~actual:
(signals (signals
|> count_logs_with_body (function |> count_logs_with_body (function
@ -205,16 +211,19 @@ let tests params signal_batches =
| _ -> false))); | _ -> false)));
] ]
let run_tests ~port cmds = let run_tests ~port (cmds : _ list) : unit =
let suites = let suites =
cmds let open Lwt.Syntax in
|> List.map (fun (exec, params) -> Lwt_main.run
@@ Lwt_list.map_s
(fun (exec, params) ->
let cmd = cmd exec params in let cmd = cmd exec params in
let name = cmd |> String.concat " " in let name = Printf.sprintf "'test: %s'" (String.concat " " cmd) in
let signal_batches = Signal_gatherer.gather_signals ~port cmd in let* signal_batches = Signal_gatherer.gather_signals ~port cmd in
(* Let server reset *) (* Let server reset *)
Unix.sleep 1; let* () = Lwt_unix.sleep 1. in
name, tests params signal_batches) Lwt.return (name, tests params signal_batches))
cmds
in in
let open Alcotest in let open Alcotest in
run "Collector integration tests" suites run "Collector integration tests" suites

View file

@ -24,15 +24,15 @@
(re_export containers) (re_export containers)
logs.fmt logs.fmt
logs.threaded logs.threaded
lwt (re_export lwt)
lwt.unix (re_export lwt.unix)
(re_export opentelemetry) (re_export opentelemetry)
(re_export opentelemetry-client))) (re_export opentelemetry-client)))
(library (library
(name clients_e2e_lib) (name clients_e2e_lib)
(modules clients_e2e_lib) (modules clients_e2e_lib)
(libraries alcotest signal_gatherer)) (libraries alcotest lwt lwt.unix signal_gatherer))
(tests (tests
(names test_cottp_lwt_client_e2e) (names test_cottp_lwt_client_e2e)
@ -52,7 +52,16 @@
(>= %{ocaml_version} 5.0)) (>= %{ocaml_version} 5.0))
(libraries clients_e2e_lib alcotest opentelemetry opentelemetry-client)) (libraries clients_e2e_lib alcotest opentelemetry opentelemetry-client))
(tests
(names test_ocurl_client_e2e)
(modules test_ocurl_client_e2e)
(package opentelemetry-client-ocurl)
(deps %{bin:emit1})
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries clients_e2e_lib alcotest opentelemetry opentelemetry-client))
(executable (executable
(name signal_reporter_server) (name signal_reporter_server)
(modules signal_reporter_server) (modules signal_reporter_server)
(libraries signal_gatherer)) (libraries signal_gatherer lwt lwt.unix))

View file

@ -116,9 +116,7 @@ let default_port =
| [ _; _; port ] -> int_of_string port | [ _; _; port ] -> int_of_string port
| _ -> failwith "unexpected format in Client.Config.default_url" | _ -> failwith "unexpected format in Client.Config.default_url"
let gather_signals ?(port = default_port) program_to_test = let gather_signals ?(port = default_port) program_to_test : _ Lwt.t =
Lwt_main.run
@@
let stream, push = Lwt_stream.create () in let stream, push = Lwt_stream.create () in
let* () = let* () =
Lwt.pick [ Server.run port push; Tested_program.run program_to_test ] Lwt.pick [ Server.run port push; Tested_program.run program_to_test ]
@ -128,14 +126,15 @@ let gather_signals ?(port = default_port) program_to_test =
Lwt_stream.to_list stream Lwt_stream.to_list stream
(* Just run the server, and print the signals gathered. *) (* Just run the server, and print the signals gathered. *)
let run ?(port = default_port) () = let run ?(port = default_port) () : _ Lwt.t =
Lwt_main.run
@@
let stream, push = Lwt_stream.create () in let stream, push = Lwt_stream.create () in
Lwt.join Lwt.join
[ [
Server.run port push; Server.run port push;
Lwt_stream.iter_s Lwt_stream.iter_s
(fun s -> Format.asprintf "%a" Signal.Pp.pp s |> Lwt_io.printl) (fun s ->
let open Lwt.Syntax in
let* () = Lwt_io.printl (Format.asprintf "%a" Signal.Pp.pp s) in
Lwt_io.flush Lwt_io.stdout)
stream; stream;
] ]

View file

@ -2,7 +2,7 @@
server that can receive the signals make them available for inspection. *) server that can receive the signals make them available for inspection. *)
val gather_signals : val gather_signals :
?port:int -> string list -> Opentelemetry_client.Signal.t list ?port:int -> string list -> Opentelemetry_client.Signal.t list Lwt.t
(** [gather_signals program_to_test] is a list of all the signals emitted by the (** [gather_signals program_to_test] is a list of all the signals emitted by the
[program_to_test], which the server was able to record. This function [program_to_test], which the server was able to record. This function
assumes that the program to test will be sending its signals to the assumes that the program to test will be sending its signals to the
@ -12,7 +12,7 @@ val gather_signals :
the port where signals will be received. Default is port set in the port where signals will be received. Default is port set in
{!Opentelemetry_client.Config.default_url}. *) {!Opentelemetry_client.Config.default_url}. *)
val run : ?port:int -> unit -> unit val run : ?port:int -> unit -> unit Lwt.t
(** [run ()] runs a signal gathering server and prints all batches of signals (** [run ()] runs a signal gathering server and prints all batches of signals
received to stdout. received to stdout.

View file

@ -1,4 +1,4 @@
(** Runs a signal gatherer server, and prints out every batch of signals (** Runs a signal gatherer server, and prints out every batch of signals
received to stdout. This can be used to monitor the signals sent by an received to stdout. This can be used to monitor the signals sent by an
application, e.g., the test executables defined in /tests/bin/emit1*.ml *) application, e.g., the test executables defined in /tests/bin/emit1*.ml *)
let () = Signal_gatherer.run () let () = Lwt_main.run @@ Signal_gatherer.run ()

View file

@ -16,6 +16,7 @@ let () =
url; url;
jobs = 1; jobs = 1;
procs = 1; procs = 1;
n_outer = 1;
iterations = 1; iterations = 1;
batch_traces = 2; batch_traces = 2;
batch_metrics = 2; batch_metrics = 2;
@ -26,16 +27,29 @@ let () =
url; url;
jobs = 3; jobs = 3;
procs = 1; procs = 1;
n_outer = 1;
iterations = 1; iterations = 1;
batch_traces = 400; batch_traces = 400;
batch_metrics = 3; batch_metrics = 3;
batch_logs = 400; batch_logs = 400;
} ); } );
( "emit1_eio",
{
url;
jobs = 3;
procs = 1;
n_outer = 5;
iterations = 1;
batch_traces = 100;
batch_metrics = 20;
batch_logs = 400;
} );
( "emit1_eio", ( "emit1_eio",
{ {
url; url;
jobs = 3; jobs = 3;
procs = 3; procs = 3;
n_outer = 1;
iterations = 1; iterations = 1;
batch_traces = 400; batch_traces = 400;
batch_metrics = 3; batch_metrics = 3;

View file

@ -26,6 +26,7 @@ let () =
url; url;
jobs = 1; jobs = 1;
procs = 1; procs = 1;
n_outer = 1;
iterations = 1; iterations = 1;
batch_traces = 2; batch_traces = 2;
batch_metrics = 2; batch_metrics = 2;
@ -36,6 +37,18 @@ let () =
url; url;
jobs = 3; jobs = 3;
procs = 1; procs = 1;
n_outer = 1;
iterations = 1;
batch_traces = 400;
batch_metrics = 3;
batch_logs = 400;
} );
( "emit1_cohttp",
{
url;
jobs = 3;
procs = 1;
n_outer = 5;
iterations = 1; iterations = 1;
batch_traces = 400; batch_traces = 400;
batch_metrics = 3; batch_metrics = 3;

View file

@ -0,0 +1,47 @@
module Client = Opentelemetry_client
module Proto = Opentelemetry.Proto
open Clients_e2e_lib
(* NOTE: This port must be different from that used by other integration tests,
to prevent socket binding clashes. *)
let port = 4359
let url = Printf.sprintf "http://localhost:%d" port
let () =
Clients_e2e_lib.run_tests ~port
[
( "emit1",
{
url;
jobs = 1;
procs = 1;
n_outer = 1;
iterations = 1;
batch_traces = 2;
batch_metrics = 2;
batch_logs = 2;
} );
( "emit1",
{
url;
jobs = 3;
procs = 1;
n_outer = 1;
iterations = 1;
batch_traces = 400;
batch_metrics = 3;
batch_logs = 400;
} );
( "emit1",
{
url;
jobs = 3;
procs = 1;
n_outer = 5;
iterations = 1;
batch_traces = 400;
batch_metrics = 3;
batch_logs = 400;
} );
]

View file

@ -46,5 +46,7 @@ let tests (signal_batches : Client.Signal.t list) =
signal_batches signal_batches
let () = let () =
let signal_batches = Signal_gatherer.gather_signals ~port cmd in let signal_batches =
Lwt_main.run (Signal_gatherer.gather_signals ~port cmd)
in
tests signal_batches tests signal_batches