diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 0611c754..14f657cd 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -109,6 +109,7 @@ let () = let batch_metrics = ref 3 in let batch_logs = ref 400 in let url = ref None in + let n_procs = ref 1 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; @@ -127,12 +128,18 @@ let () = "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--iterations", Arg.Set_int iterations, " the number of iterations to run"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; + "--procs", Arg.Set_int n_procs, " number of processes"; ] |> Arg.align in Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + if !n_procs > 1 then + failwith + "TODO: add support for running multiple processes to the lwt-cohttp \ + emitter"; + let some_if_nzero r = if !r > 0 then Some !r diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml index dc5198ae..debb9e75 100644 --- a/tests/bin/emit1_eio.ml +++ b/tests/bin/emit1_eio.ml @@ -11,7 +11,7 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 -let iterations = ref 1 +let iterations = Atomic.make 1 let num_sleep = Atomic.make 0 @@ -32,8 +32,8 @@ let run_job clock _job_id : unit = ~attrs:[ "i", `Int !i ] in - for j = 0 to !iterations do - if j >= !iterations then + for j = 0 to Atomic.get iterations do + if j >= Atomic.get iterations then (* Terminate program, having reached our max iterations *) Atomic.set stop true else @@ -80,7 +80,7 @@ let run_job clock _job_id : unit = done done -let run env : unit = +let run env proc () : unit = OT.GC_metrics.basic_setup (); OT.Metrics_callbacks.register (fun () -> @@ -91,7 +91,7 @@ let run env : unit = ]); let n_jobs = max 1 !n_jobs in - Printf.printf "run %d jobs\n%!" n_jobs; + Printf.printf "run %d jobs in proc %d\n%!" n_jobs proc; Eio.Switch.run (fun sw -> for j = 1 to n_jobs do @@ -109,6 +109,7 @@ let () = let batch_metrics = ref 3 in let batch_logs = ref 400 in let url = ref None in + let n_procs = ref 1 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; @@ -125,8 +126,11 @@ let () = "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; - "--iterations", Arg.Set_int iterations, " the number of iterations to run"; - "-j", Arg.Set_int n_jobs, " number of parallel jobs"; + ( "--iterations", + Arg.Int (Atomic.set iterations), + " the number of iterations to run" ); + "-j", Arg.Set_int n_jobs, " number of jobs per processes"; + "--procs", Arg.Set_int n_procs, " number of processes"; ] |> Arg.align in @@ -155,4 +159,16 @@ let () = Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" (Atomic.get num_tr) elapsed n_per_sec) in - Opentelemetry_client_cohttp_eio.with_setup ~stop ~config run |> Eio_main.run + Eio_main.run @@ fun env -> + (if !n_procs < 2 then + Opentelemetry_client_cohttp_eio.with_setup ~stop ~config (run env 0) env + else + Eio.Switch.run @@ fun sw -> + Opentelemetry_client_cohttp_eio.setup ~stop ~config ~sw env; + let dm = Eio.Stdenv.domain_mgr env in + Eio.Switch.run (fun sw -> + for proc = 1 to !n_procs do + Eio.Fiber.fork ~sw @@ fun () -> + Eio.Domain_manager.run dm (run env proc) + done)); + Opentelemetry.Collector.remove_backend () ~on_done:ignore diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml index 8c085a22..3206bfe9 100644 --- a/tests/client_e2e/clients_e2e_lib.ml +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -56,6 +56,15 @@ let filter_map_metrics f signals = |> List.find_map (fun ss -> ss.Proto.Metrics.metrics |> List.find_map f)) +let count_metrics_with_name name signals = + signals + |> filter_map_metrics (fun s -> + if String.equal s.Proto.Metrics.name name then + Some s + else + None) + |> List.length + let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float = function | Proto.Metrics.As_double f -> f @@ -98,6 +107,7 @@ let count_logs_with_body p signals = type params = { url: string; jobs: int; + procs: int; batch_traces: int; batch_metrics: int; batch_logs: int; @@ -109,6 +119,8 @@ let cmd exec params = exec; "-j"; string_of_int params.jobs; + "--procs"; + string_of_int params.procs; "--url"; params.url; "--iterations"; @@ -134,22 +146,24 @@ let tests params signal_batches = (* TODO: What properties of batch sizes does it make sense to test? *) test "loop.outer spans" (fun () -> Alcotest.(check' int) - ~msg:"number of occurrences should equal the configured jobs" - ~expected:params.jobs + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured processes" + ~expected:(params.jobs * params.procs) ~actual:(count_spans_with_name "loop.outer" signals)); test "loop.inner spans" (fun () -> Alcotest.(check' int) ~msg: "number of occurrences should equal the configured jobs * the \ - configured iterations" - ~expected:(params.jobs * params.iterations) + configured iterations * configured processes" + ~expected:(params.jobs * params.iterations * params.procs) ~actual:(count_spans_with_name "loop.inner" signals)); test "alloc spans" (fun () -> Alcotest.(check' int) ~msg: "number of occurrences should equal the configured jobs * the \ - configured iterations" - ~expected:(params.jobs * params.iterations) + configured iterations * configured processes" + ~expected:(params.jobs * params.iterations * params.procs) ~actual:(count_spans_with_name "alloc" signals); Alcotest.(check' bool) ~msg:"should have 'done with alloc' event" ~expected:true @@ -167,16 +181,19 @@ let tests params signal_batches = |> List.for_all (fun (e : Proto.Trace.span_event) -> String.equal e.name "done with alloc"))); test "num-sleep metrics" (fun () -> - Alcotest.(check' (float 0.)) - ~msg:"should record jobs * iterations sleeps" - ~expected:(params.jobs * params.iterations |> float_of_int) + Alcotest.(check' bool) + ~msg: + "should record at lest as many sleep metrics as there are \ + iterations configured" + ~expected:true ~actual: - (get_metric_values "num-sleep" signals - |> List.sort Float.compare |> List.rev |> List.hd)); + (count_metrics_with_name "num-sleep" signals >= params.iterations)); test "logs" (fun () -> Alcotest.(check' int) - ~msg:"should record jobs * iterations occurrences of 'inner at n'" - ~expected:(params.jobs * params.iterations) + ~msg: + "should record jobs * iterations occurrences * configured \ + processes of 'inner at n'" + ~expected:(params.jobs * params.iterations * params.procs) ~actual: (signals |> count_logs_with_body (function diff --git a/tests/client_e2e/test_cottp_eio_client_e2e.ml b/tests/client_e2e/test_cottp_eio_client_e2e.ml index 539a1ba4..ab5cf985 100644 --- a/tests/client_e2e/test_cottp_eio_client_e2e.ml +++ b/tests/client_e2e/test_cottp_eio_client_e2e.ml @@ -15,6 +15,7 @@ let () = { url; jobs = 1; + procs = 1; iterations = 1; batch_traces = 2; batch_metrics = 2; @@ -24,6 +25,17 @@ let () = { url; jobs = 3; + procs = 1; + iterations = 1; + batch_traces = 400; + batch_metrics = 3; + batch_logs = 400; + } ); + ( "emit1_eio", + { + url; + jobs = 3; + procs = 3; iterations = 1; batch_traces = 400; batch_metrics = 3; diff --git a/tests/client_e2e/test_cottp_lwt_client_e2e.ml b/tests/client_e2e/test_cottp_lwt_client_e2e.ml index 5c72165e..b1ba3772 100644 --- a/tests/client_e2e/test_cottp_lwt_client_e2e.ml +++ b/tests/client_e2e/test_cottp_lwt_client_e2e.ml @@ -25,6 +25,7 @@ let () = { url; jobs = 1; + procs = 1; iterations = 1; batch_traces = 2; batch_metrics = 2; @@ -34,6 +35,7 @@ let () = { url; jobs = 3; + procs = 1; iterations = 1; batch_traces = 400; batch_metrics = 3;