From 87cfd5e31ee5075c8df963b66384512eabebda79 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 23:22:59 -0400 Subject: [PATCH] Add test harness for instrumented applications --- dune-project | 3 +- opentelemetry-cohttp-lwt.opam | 1 + src/client/signal.ml | 42 +++- src/client/signal.mli | 14 ++ tests/bin/emit1.ml | 27 ++- tests/bin/emit1_cohttp.ml | 95 +++++---- tests/client_e2e/clients_e2e_lib.ml | 193 ++++++++++++++++++ tests/client_e2e/dune | 35 ++++ tests/client_e2e/gather_signals.ml | 3 + tests/client_e2e/signal_gatherer.ml | 151 ++++++++++++++ tests/client_e2e/test_cottp_lwt_client_e2e.ml | 33 +++ 11 files changed, 544 insertions(+), 53 deletions(-) create mode 100644 tests/client_e2e/clients_e2e_lib.ml create mode 100644 tests/client_e2e/dune create mode 100644 tests/client_e2e/gather_signals.ml create mode 100644 tests/client_e2e/signal_gatherer.ml create mode 100644 tests/client_e2e/test_cottp_lwt_client_e2e.ml diff --git a/dune-project b/dune-project index ec273074..a28374f2 100644 --- a/dune-project +++ b/dune-project @@ -102,7 +102,8 @@ (>= "5.3")) (cohttp-lwt (>= "6.0.0")) - (alcotest :with-test)) + (alcotest :with-test) + (cohttp-eio :with-test)) (synopsis "Opentelemetry tracing for Cohttp HTTP servers")) (package diff --git a/opentelemetry-cohttp-lwt.opam b/opentelemetry-cohttp-lwt.opam index ad66a0ef..a56e353b 100644 --- a/opentelemetry-cohttp-lwt.opam +++ b/opentelemetry-cohttp-lwt.opam @@ -20,6 +20,7 @@ depends: [ "lwt" {>= "5.3"} "cohttp-lwt" {>= "6.0.0"} "alcotest" {with-test} + "cohttp-eio" {with-test} ] build: [ ["dune" "subst"] {dev} diff --git a/src/client/signal.ml b/src/client/signal.ml index c4c3fa11..7a2eddd5 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -12,6 +12,30 @@ type t = | Metrics of Proto.Metrics.resource_metrics list | Logs of Proto.Logs.resource_logs list +let to_traces = function + | Traces xs -> Some xs + | _ -> None + +let to_metrics = function + | Metrics xs -> Some xs + | _ -> None + +let to_logs = function + | Logs xs -> Some xs + | _ -> None + +let is_traces = function + | Traces _ -> true + | _ -> false + +let is_metrics = function + | Metrics _ -> true + | _ -> false + +let is_logs = function + | Logs _ -> true + | _ -> false + module Encode = struct let resource_to_string ~encoder ~ctor ~enc resource = let encoder = @@ -70,9 +94,21 @@ module Decode = struct end module Pp = struct - let logs = Format.pp_print_list Proto.Logs.pp_resource_logs + let pp_sep fmt () = Format.fprintf fmt ",@." - let metrics = Format.pp_print_list Proto.Metrics.pp_resource_metrics + let pp_signal pp fmt t = + Format.fprintf fmt "[@ @["; + Format.pp_print_list ~pp_sep pp fmt t; + Format.fprintf fmt "@ ]@]@." - let traces = Format.pp_print_list Proto.Trace.pp_resource_spans + let logs = pp_signal Proto.Logs.pp_resource_logs + + let metrics = pp_signal Proto.Metrics.pp_resource_metrics + + let traces = pp_signal Proto.Trace.pp_resource_spans + + let pp fmt = function + | Logs ls -> logs fmt ls + | Metrics ms -> metrics fmt ms + | Traces ts -> traces fmt ts end diff --git a/src/client/signal.mli b/src/client/signal.mli index 7c8f7779..cbe252b1 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -11,6 +11,18 @@ type t = | Metrics of Opentelemetry_proto.Metrics.resource_metrics list | Logs of Opentelemetry_proto.Logs.resource_logs list +val to_traces : t -> Opentelemetry_proto.Trace.resource_spans list option + +val to_metrics : t -> Opentelemetry_proto.Metrics.resource_metrics list option + +val to_logs : t -> Opentelemetry_proto.Logs.resource_logs list option + +val is_traces : t -> bool + +val is_metrics : t -> bool + +val is_logs : t -> bool + (** Encode signals to protobuf encoded strings, ready to be sent over the wire *) module Encode : sig @@ -67,4 +79,6 @@ module Pp : sig val traces : Format.formatter -> Opentelemetry_proto.Trace.resource_spans list -> unit + + val pp : Format.formatter -> t -> unit end diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 093bc888..95a4d55b 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -104,6 +104,11 @@ let () = let ts_start = Unix.gettimeofday () in let debug = ref false in + + let batch_traces = ref 400 in + let batch_metrics = ref 3 in + let batch_logs = ref 400 in + let n_bg_threads = ref 0 in let opts = [ @@ -111,6 +116,11 @@ let () = ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); + ( "--batch-metrics", + Arg.Int (( := ) batch_metrics), + " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; @@ -122,15 +132,18 @@ let () = Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + let some_if_nzero r = + if !r > 0 then + Some !r + else + None + in let config = Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true - ?bg_threads: - (let n = !n_bg_threads in - if n = 0 then - None - else - Some n) - () + ?bg_threads:(some_if_nzero n_bg_threads) + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~batch_logs:(some_if_nzero batch_logs) () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 1558b6b9..59acf285 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -12,6 +12,8 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 +let iterations = ref 1 + let num_sleep = Atomic.make 0 let stress_alloc_ = ref true @@ -20,57 +22,63 @@ let stop = Atomic.make false let num_tr = Atomic.make 0 -let run_job () : unit Lwt.t = - let i = ref 0 in +(* Counter used to mark simulated failures *) +let i = ref 0 + +let run_job job_id : unit Lwt.t = while%lwt not @@ Atomic.get stop do let@ scope = Atomic.incr num_tr; T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int !i ] + ~attrs:[ "i", `Int job_id ] in - for%lwt j = 0 to 4 do - (* parent scope is found via thread local storage *) - let@ scope = - Atomic.incr num_tr; - T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal - ~attrs:[ "j", `Int j ] - "loop.inner" - in - - let* () = Lwt_unix.sleep !sleep_outer in - Atomic.incr num_sleep; - - T.Logs.( - emit - [ - make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id - ~severity:Severity_number_info "inner at %d" j; - ]); - - incr i; - - try%lwt - Atomic.incr num_tr; + for%lwt j = 0 to !iterations do + if j >= !iterations then + (* Terminate program, having reached our max iterations *) + Lwt.return @@ Atomic.set stop true + else + (* parent scope is found via thread local storage *) let@ scope = - T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" + Atomic.incr num_tr; + T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal + ~attrs:[ "j", `Int j ] + "loop.inner" in - (* allocate some stuff *) - if !stress_alloc_ then ( - let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in - ignore _arr - ); - let* () = Lwt_unix.sleep !sleep_inner in + let* () = Lwt_unix.sleep !sleep_outer in Atomic.incr num_sleep; - if j = 4 && !i mod 13 = 0 then failwith "oh no"; + T.Logs.( + emit + [ + make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id + ~severity:Severity_number_info "inner at %d" j; + ]); - (* simulate a failure *) - Opentelemetry.Scope.add_event scope (fun () -> - T.Event.make "done with alloc"); - Lwt.return () - with Failure _ -> Lwt.return () + incr i; + + try%lwt + Atomic.incr num_tr; + let@ scope = + T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" + in + (* allocate some stuff *) + if !stress_alloc_ then ( + let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in + ignore _arr + ); + + let* () = Lwt_unix.sleep !sleep_inner in + Atomic.incr num_sleep; + + (* simulate a failure *) + if j = 4 && !i mod 13 = 0 then failwith "oh no"; + + Opentelemetry.Scope.add_event scope (fun () -> + T.Event.make "done with alloc"); + Lwt.return () + with Failure _ -> Lwt.return () done done @@ -87,7 +95,7 @@ let run () : unit Lwt.t = let n_jobs = max 1 !n_jobs in Printf.printf "run %d jobs\n%!" n_jobs; - let jobs = Array.init n_jobs (fun _ -> run_job ()) |> Array.to_list in + let jobs = List.init n_jobs run_job in Lwt.join jobs let () = @@ -99,18 +107,21 @@ let () = let debug = ref false in let batch_traces = ref 400 in let batch_metrics = ref 3 in + let batch_logs = ref 400 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); - "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; ( "--batch-metrics", Arg.Int (( := ) batch_metrics), " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + "--iterations", Arg.Set_int iterations, " the number of iterations to run"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; ] |> Arg.align @@ -128,7 +139,7 @@ let () = Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) - () + ~batch_logs:(some_if_nzero batch_logs) () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config; diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml new file mode 100644 index 00000000..81f4695c --- /dev/null +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -0,0 +1,193 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto + +let batch_size : Client.Signal.t -> int = function + | Traces ts -> List.length ts + | Logs ls -> List.length ls + | Metrics ms -> List.length ms + +let avg_batch_size (p : Client.Signal.t -> bool) + (batches : Client.Signal.t list) : int = + let sum = + List.fold_left + (fun acc b -> + if p b then + acc + batch_size b + else + acc) + 0 batches + in + sum / List.length batches + +let signals_from_batch (signal_batch : Client.Signal.t) = + match signal_batch with + | Traces ts -> List.map (fun t -> `Trace t) ts + | Logs ls -> List.map (fun l -> `Log l) ls + | Metrics ms -> List.map (fun m -> `Metric m) ms + +let filter_map_spans f signals = + signals + |> List.filter_map (function + | `Log _ | `Metric _ -> None + | `Trace (r : Proto.Trace.resource_spans) -> + r.scope_spans + |> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f)) + +let count_spans_with_name name signals = + signals + |> filter_map_spans (fun s -> + if s.Proto.Trace.name = name then + Some s + else + None) + |> List.length + +let filter_map_metrics f signals = + signals + |> List.filter_map (function + | `Log _ | `Trace _ -> None + | `Metric (r : Proto.Metrics.resource_metrics) -> + r.scope_metrics + |> List.find_map (fun ss -> + ss.Proto.Metrics.metrics |> List.find_map f)) + +let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float + = function + | Proto.Metrics.As_double f -> f + | Proto.Metrics.As_int i64 -> Int64.to_float i64 + +let get_metric_values name signals = + signals + |> filter_map_metrics (fun (m : Proto.Metrics.metric) -> + if m.name <> name then + None + else + Option.some + @@ + match m.data with + | Sum { data_points; is_monotonic = true; _ } -> + List.fold_left + (fun acc (p : Proto.Metrics.number_data_point) -> + acc +. number_data_point_to_float p.value) + 0. data_points + | _ -> failwith "TODO: Support for getting other metrics") + +let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list = + signals + |> List.filter_map (function + | `Metric _ | `Trace _ -> None + | `Log (r : Proto.Logs.resource_logs) -> + r.scope_logs + |> List.find_map (fun ss -> + ss.Proto.Logs.log_records |> List.find_map f)) + +let count_logs_with_body p signals = + signals + |> filter_map_logs (fun (l : Proto.Logs.log_record) -> + if p l.body then + Some () + else + None) + |> List.length + +type params = { + jobs: int; + batch_traces: int; + batch_metrics: int; + batch_logs: int; + iterations: int; +} + +let cmd exec params = + [ + exec; + "-j"; + string_of_int params.jobs; + "--iterations"; + string_of_int params.iterations; + "--batch-traces"; + string_of_int params.batch_traces; + "--batch-metrics"; + string_of_int params.batch_metrics; + "--batch-logs"; + string_of_int params.batch_logs; + ] + +let test name f = Alcotest.test_case name `Quick f + +let tests params signal_batches = + let signals = + signal_batches + |> List.fold_left + (fun acc b -> List.rev_append (signals_from_batch b) acc) + [] + in + [ + (* TODO: What properties of batch sizes does it make sense to test? *) + test "loop.outer spans" (fun () -> + Alcotest.(check' int) + ~msg:"number of occurrences should equal the configured jobs" + ~expected:params.jobs + ~actual:(count_spans_with_name "loop.outer" signals)); + test "loop.inner spans" (fun () -> + Alcotest.(check' int) + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured iterations" + ~expected:(params.jobs * params.iterations) + ~actual:(count_spans_with_name "loop.inner" signals)); + test "alloc spans" (fun () -> + Alcotest.(check' int) + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured iterations" + ~expected:(params.jobs * params.iterations) + ~actual:(count_spans_with_name "alloc" signals); + Alcotest.(check' bool) + ~msg:"should have 'done with alloc' event" ~expected:true + ~actual: + (let all_alloc_events = + signals + |> filter_map_spans (fun s -> + if s.name <> "alloc" then + Some s.events + else + None) + |> List.flatten + in + all_alloc_events + |> List.for_all (fun (e : Proto.Trace.span_event) -> + e.name = "done with alloc"))); + test "num-sleep metrics" (fun () -> + Alcotest.(check' (float 0.)) + ~msg:"should record jobs * iterations sleeps" + ~expected:(params.jobs * params.iterations |> float_of_int) + ~actual: + (get_metric_values "num-sleep" signals + |> List.sort Float.compare |> List.rev |> List.hd)); + test "logs" (fun () -> + Alcotest.(check' int) + ~msg:"should record jobs * iterations occurrences of 'inner at n'" + ~expected:(params.jobs * params.iterations) + ~actual: + (signals + |> count_logs_with_body (function + | Some (Proto.Common.String_value s) + when String.starts_with ~prefix:"inner at" s -> + true + | _ -> false))); + ] + +let run_tests cmds = + let suites = + cmds + |> List.map (fun (exec, params) -> + let cmd = cmd exec params in + let name = cmd |> String.concat " " in + let signal_batches = Signal_gatherer.gather_signals cmd in + (* Let server reset *) + Unix.sleep 1; + name, tests params signal_batches) + in + let open Alcotest in + run "Collector integration tests" suites diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune new file mode 100644 index 00000000..45c234cb --- /dev/null +++ b/tests/client_e2e/dune @@ -0,0 +1,35 @@ +(env + (_ + ; Make the binaries for the test emitters available on the path for the components defined in this dir. + ; See https://dune.readthedocs.io/en/stable/reference/dune/env.html + (binaries + (../bin/emit1.exe as emit1) + (../bin/emit1_cohttp.exe as emit1_cohttp) + (./gather_signals.exe as gather_signals)))) + +(library + (name signal_gatherer) + (modules signal_gatherer) + (libraries + str + alcotest + cohttp-lwt-unix + fmt + unix + logs.fmt + logs.threaded + opentelemetry.client)) + +(library + (name clients_e2e_lib) + (modules clients_e2e_lib) + (libraries alcotest signal_gatherer)) + +(tests + (names test_cottp_lwt_client_e2e) + (modules test_cottp_lwt_client_e2e) + (package opentelemetry-client-cohttp-lwt) + (deps %{bin:emit1_cohttp}) + (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) + +; TODO : Add tests for ocurl's emit1 diff --git a/tests/client_e2e/gather_signals.ml b/tests/client_e2e/gather_signals.ml new file mode 100644 index 00000000..1a5360ff --- /dev/null +++ b/tests/client_e2e/gather_signals.ml @@ -0,0 +1,3 @@ +let () = + let program_to_test = Sys.argv |> Array.to_list |> List.tl in + Signal_gatherer.run ~program_to_test () diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml new file mode 100644 index 00000000..adf87ce7 --- /dev/null +++ b/tests/client_e2e/signal_gatherer.ml @@ -0,0 +1,151 @@ +(* A runs tests against a OTel-instrumented program *) + +module Client = Opentelemetry_client +module Signal = Client.Signal +module Proto = Opentelemetry.Proto +open Lwt.Syntax + +(* Server to collect telemetry data *) +module Server = struct + let dbg_request kind req pp data : unit Lwt.t = + let _ = kind, req, pp, data in + (* NOTE: Uncomment for debugging *) + (* let* () = *) + (* let req : string = Format.asprintf "%a" Http.Request.pp req in *) + (* let data_s : string = Format.asprintf "%a" pp data in *) + (* Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@." *) + (* kind req data_s *) + (* in *) + Lwt.return () + + let metrics req data = + let metrics = Signal.Decode.metrics data in + let+ () = dbg_request "metrics" req Signal.Pp.metrics metrics in + Signal.Metrics metrics + + let handler push_signal _socket (request : Http.Request.t) + (body : Cohttp_lwt.Body.t) = + let* data = Cohttp_lwt.Body.to_string body in + let* status, signal = + match Http.Request.resource request with + | "/v1/traces" -> + let traces = Signal.Decode.traces data in + let+ () = dbg_request "trace" request Signal.Pp.traces traces in + `OK, Some (Signal.Traces traces) + | "/v1/metrics" -> + let metrics = Signal.Decode.metrics data in + let+ () = dbg_request "metrics" request Signal.Pp.metrics metrics in + `OK, Some (Signal.Metrics metrics) + | "/v1/logs" -> + let logs = Signal.Decode.logs data in + let+ () = dbg_request "logs" request Signal.Pp.logs logs in + `OK, Some (Signal.Logs logs) + | unexepected -> + let+ () = Lwt_io.eprintf "unexpected endpoint %s\n" unexepected in + `Not_found, None + in + push_signal signal; + let resp_body = Cohttp_lwt.Body.of_string "" in + Cohttp_lwt_unix.Server.respond ~status ~body:resp_body () + + let run port push_signals = + let* () = Lwt_io.eprintf "starting server\n" in + Cohttp_lwt_unix.Server.( + make ~callback:(handler push_signals) () + |> create ~mode:(`TCP (`Port port))) +end + +(** Manage launching and cleaning up the program we are testing *) +module Tested_program = struct + let validate_exit = function + | Unix.WEXITED 0 -> () + | Unix.WEXITED bad_code -> + failwith + @@ Printf.sprintf "process under test ended with bad exit code %d" + bad_code + | Unix.WSIGNALED i -> + failwith + @@ Printf.sprintf "process under test ended with unexpected signal %d" i + | Unix.WSTOPPED i -> + failwith + @@ Printf.sprintf "process under test ended with unexpected stop %d" i + + let run program_to_test = + let redirect = `FD_copy Unix.stderr in + let cmd = "", Array.of_list program_to_test in + (* Give server time to be online *) + let* () = Lwt_unix.sleep 0.5 in + let* () = + Lwt_io.eprintf "running command: %s\n" + (Format.asprintf "%a" + (Format.pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt " ") + Format.pp_print_string) + program_to_test) + in + let* result = Lwt_process.exec ~stdout:redirect cmd in + (* Give server time process signals *) + let+ () = Lwt_unix.sleep 0.5 in + validate_exit result +end + +let collect_traces ~port program_to_test push_signals () = + let* () = + Lwt.pick + [ Server.run port push_signals; Tested_program.run program_to_test ] + in + (* Let the tester know all the signals have be sent *) + Lwt.return (push_signals None) + +let normalize_scope_span : Proto.Trace.scope_spans -> Proto.Trace.scope_spans = + function + | scope_span -> + { + scope_span with + spans = + scope_span.spans + |> List.map (fun (span : Proto.Trace.span) -> + { + span with + start_time_unix_nano = -1L; + end_time_unix_nano = -1L; + }); + } + +let normalize_signal : Signal.t -> Signal.t = function + | Traces ts -> + Traces + (ts + |> List.map (fun (trace : Proto.Trace.resource_spans) -> + { + trace with + scope_spans = trace.scope_spans |> List.map normalize_scope_span; + })) + | x -> x + +(* normalize trace output by redacting non-deterministic values from output *) +let normalize = + let re = + Str.regexp + {|\(start_time_unix_nano\|time_unix_nano\|end_time_unix_nano\|value\) = \([0-9]*\|As_int([0-9]*)\|As_double([0-9]*\.)\);|} + in + fun s -> Str.global_replace re {|\1 = ;|} s + +let default_port = + String.split_on_char ':' Client.Config.default_url |> function + (* Extracting the port from 'http://foo:' *) + | [ _; _; port ] -> int_of_string port + | _ -> failwith "unexpected format in Client.Config.default_url" + +let gather_signals ?(port = default_port) program_to_test = + Lwt_main.run + @@ + let stream, push = Lwt_stream.create () in + let* () = collect_traces ~port program_to_test push () in + Lwt_stream.to_list stream + +let run ?(port = default_port) ~program_to_test () = + gather_signals ~port program_to_test + |> List.map (fun s -> s |> Format.asprintf "%a" Signal.Pp.pp |> normalize) + |> List.stable_sort String.compare (* Produce a deterministic order *) + |> List.iter print_string diff --git a/tests/client_e2e/test_cottp_lwt_client_e2e.ml b/tests/client_e2e/test_cottp_lwt_client_e2e.ml new file mode 100644 index 00000000..c093155f --- /dev/null +++ b/tests/client_e2e/test_cottp_lwt_client_e2e.ml @@ -0,0 +1,33 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto +open Clients_e2e_lib + +let () = + Clients_e2e_lib.run_tests + [ + (* TODO: Running with batch-traces = 1 causes deadlocks *) + (* ( "emit1_cohttp", *) + (* { *) + (* jobs = 1; *) + (* iterations = 1; *) + (* batch_traces = 1; *) + (* batch_metrics = 1; *) + (* batch_logs = 1; *) + (* } ); *) + ( "emit1_cohttp", + { + jobs = 1; + iterations = 1; + batch_traces = 2; + batch_metrics = 2; + batch_logs = 2; + } ); + ( "emit1_cohttp", + { + jobs = 3; + iterations = 1; + batch_traces = 400; + batch_metrics = 3; + batch_logs = 400; + } ); + ]