mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-07 18:37:56 -05:00
Add test harness for instrumented applications
This commit is contained in:
parent
a44e0cd3b5
commit
87cfd5e31e
11 changed files with 544 additions and 53 deletions
|
|
@ -102,7 +102,8 @@
|
|||
(>= "5.3"))
|
||||
(cohttp-lwt
|
||||
(>= "6.0.0"))
|
||||
(alcotest :with-test))
|
||||
(alcotest :with-test)
|
||||
(cohttp-eio :with-test))
|
||||
(synopsis "Opentelemetry tracing for Cohttp HTTP servers"))
|
||||
|
||||
(package
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ depends: [
|
|||
"lwt" {>= "5.3"}
|
||||
"cohttp-lwt" {>= "6.0.0"}
|
||||
"alcotest" {with-test}
|
||||
"cohttp-eio" {with-test}
|
||||
]
|
||||
build: [
|
||||
["dune" "subst"] {dev}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,30 @@ type t =
|
|||
| Metrics of Proto.Metrics.resource_metrics list
|
||||
| Logs of Proto.Logs.resource_logs list
|
||||
|
||||
let to_traces = function
|
||||
| Traces xs -> Some xs
|
||||
| _ -> None
|
||||
|
||||
let to_metrics = function
|
||||
| Metrics xs -> Some xs
|
||||
| _ -> None
|
||||
|
||||
let to_logs = function
|
||||
| Logs xs -> Some xs
|
||||
| _ -> None
|
||||
|
||||
let is_traces = function
|
||||
| Traces _ -> true
|
||||
| _ -> false
|
||||
|
||||
let is_metrics = function
|
||||
| Metrics _ -> true
|
||||
| _ -> false
|
||||
|
||||
let is_logs = function
|
||||
| Logs _ -> true
|
||||
| _ -> false
|
||||
|
||||
module Encode = struct
|
||||
let resource_to_string ~encoder ~ctor ~enc resource =
|
||||
let encoder =
|
||||
|
|
@ -70,9 +94,21 @@ module Decode = struct
|
|||
end
|
||||
|
||||
module Pp = struct
|
||||
let logs = Format.pp_print_list Proto.Logs.pp_resource_logs
|
||||
let pp_sep fmt () = Format.fprintf fmt ",@."
|
||||
|
||||
let metrics = Format.pp_print_list Proto.Metrics.pp_resource_metrics
|
||||
let pp_signal pp fmt t =
|
||||
Format.fprintf fmt "[@ @[";
|
||||
Format.pp_print_list ~pp_sep pp fmt t;
|
||||
Format.fprintf fmt "@ ]@]@."
|
||||
|
||||
let traces = Format.pp_print_list Proto.Trace.pp_resource_spans
|
||||
let logs = pp_signal Proto.Logs.pp_resource_logs
|
||||
|
||||
let metrics = pp_signal Proto.Metrics.pp_resource_metrics
|
||||
|
||||
let traces = pp_signal Proto.Trace.pp_resource_spans
|
||||
|
||||
let pp fmt = function
|
||||
| Logs ls -> logs fmt ls
|
||||
| Metrics ms -> metrics fmt ms
|
||||
| Traces ts -> traces fmt ts
|
||||
end
|
||||
|
|
|
|||
|
|
@ -11,6 +11,18 @@ type t =
|
|||
| Metrics of Opentelemetry_proto.Metrics.resource_metrics list
|
||||
| Logs of Opentelemetry_proto.Logs.resource_logs list
|
||||
|
||||
val to_traces : t -> Opentelemetry_proto.Trace.resource_spans list option
|
||||
|
||||
val to_metrics : t -> Opentelemetry_proto.Metrics.resource_metrics list option
|
||||
|
||||
val to_logs : t -> Opentelemetry_proto.Logs.resource_logs list option
|
||||
|
||||
val is_traces : t -> bool
|
||||
|
||||
val is_metrics : t -> bool
|
||||
|
||||
val is_logs : t -> bool
|
||||
|
||||
(** Encode signals to protobuf encoded strings, ready to be sent over the wire
|
||||
*)
|
||||
module Encode : sig
|
||||
|
|
@ -67,4 +79,6 @@ module Pp : sig
|
|||
|
||||
val traces :
|
||||
Format.formatter -> Opentelemetry_proto.Trace.resource_spans list -> unit
|
||||
|
||||
val pp : Format.formatter -> t -> unit
|
||||
end
|
||||
|
|
|
|||
|
|
@ -104,6 +104,11 @@ let () =
|
|||
let ts_start = Unix.gettimeofday () in
|
||||
|
||||
let debug = ref false in
|
||||
|
||||
let batch_traces = ref 400 in
|
||||
let batch_metrics = ref 3 in
|
||||
let batch_logs = ref 400 in
|
||||
|
||||
let n_bg_threads = ref 0 in
|
||||
let opts =
|
||||
[
|
||||
|
|
@ -111,6 +116,11 @@ let () =
|
|||
( "--stress-alloc",
|
||||
Arg.Bool (( := ) stress_alloc_),
|
||||
" perform heavy allocs in inner loop" );
|
||||
( "--batch-metrics",
|
||||
Arg.Int (( := ) batch_metrics),
|
||||
" size of metrics batch" );
|
||||
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
|
||||
"--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch";
|
||||
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
|
||||
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
|
||||
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
|
||||
|
|
@ -122,15 +132,18 @@ let () =
|
|||
|
||||
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
|
||||
|
||||
let some_if_nzero r =
|
||||
if !r > 0 then
|
||||
Some !r
|
||||
else
|
||||
None
|
||||
in
|
||||
let config =
|
||||
Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true
|
||||
?bg_threads:
|
||||
(let n = !n_bg_threads in
|
||||
if n = 0 then
|
||||
None
|
||||
else
|
||||
Some n)
|
||||
()
|
||||
?bg_threads:(some_if_nzero n_bg_threads)
|
||||
~batch_traces:(some_if_nzero batch_traces)
|
||||
~batch_metrics:(some_if_nzero batch_metrics)
|
||||
~batch_logs:(some_if_nzero batch_logs) ()
|
||||
in
|
||||
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
|
||||
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;
|
||||
|
|
|
|||
|
|
@ -12,6 +12,8 @@ let sleep_outer = ref 2.0
|
|||
|
||||
let n_jobs = ref 1
|
||||
|
||||
let iterations = ref 1
|
||||
|
||||
let num_sleep = Atomic.make 0
|
||||
|
||||
let stress_alloc_ = ref true
|
||||
|
|
@ -20,57 +22,63 @@ let stop = Atomic.make false
|
|||
|
||||
let num_tr = Atomic.make 0
|
||||
|
||||
let run_job () : unit Lwt.t =
|
||||
let i = ref 0 in
|
||||
(* Counter used to mark simulated failures *)
|
||||
let i = ref 0
|
||||
|
||||
let run_job job_id : unit Lwt.t =
|
||||
while%lwt not @@ Atomic.get stop do
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
|
||||
~attrs:[ "i", `Int !i ]
|
||||
~attrs:[ "i", `Int job_id ]
|
||||
in
|
||||
|
||||
for%lwt j = 0 to 4 do
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ scope =
|
||||
Atomic.incr num_tr;
|
||||
T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_outer in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
T.Logs.(
|
||||
emit
|
||||
[
|
||||
make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id
|
||||
~severity:Severity_number_info "inner at %d" j;
|
||||
]);
|
||||
|
||||
incr i;
|
||||
|
||||
try%lwt
|
||||
Atomic.incr num_tr;
|
||||
for%lwt j = 0 to !iterations do
|
||||
if j >= !iterations then
|
||||
(* Terminate program, having reached our max iterations *)
|
||||
Lwt.return @@ Atomic.set stop true
|
||||
else
|
||||
(* parent scope is found via thread local storage *)
|
||||
let@ scope =
|
||||
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc"
|
||||
Atomic.incr num_tr;
|
||||
T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal
|
||||
~attrs:[ "j", `Int j ]
|
||||
"loop.inner"
|
||||
in
|
||||
(* allocate some stuff *)
|
||||
if !stress_alloc_ then (
|
||||
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
|
||||
ignore _arr
|
||||
);
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_inner in
|
||||
let* () = Lwt_unix.sleep !sleep_outer in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
if j = 4 && !i mod 13 = 0 then failwith "oh no";
|
||||
T.Logs.(
|
||||
emit
|
||||
[
|
||||
make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id
|
||||
~severity:Severity_number_info "inner at %d" j;
|
||||
]);
|
||||
|
||||
(* simulate a failure *)
|
||||
Opentelemetry.Scope.add_event scope (fun () ->
|
||||
T.Event.make "done with alloc");
|
||||
Lwt.return ()
|
||||
with Failure _ -> Lwt.return ()
|
||||
incr i;
|
||||
|
||||
try%lwt
|
||||
Atomic.incr num_tr;
|
||||
let@ scope =
|
||||
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc"
|
||||
in
|
||||
(* allocate some stuff *)
|
||||
if !stress_alloc_ then (
|
||||
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
|
||||
ignore _arr
|
||||
);
|
||||
|
||||
let* () = Lwt_unix.sleep !sleep_inner in
|
||||
Atomic.incr num_sleep;
|
||||
|
||||
(* simulate a failure *)
|
||||
if j = 4 && !i mod 13 = 0 then failwith "oh no";
|
||||
|
||||
Opentelemetry.Scope.add_event scope (fun () ->
|
||||
T.Event.make "done with alloc");
|
||||
Lwt.return ()
|
||||
with Failure _ -> Lwt.return ()
|
||||
done
|
||||
done
|
||||
|
||||
|
|
@ -87,7 +95,7 @@ let run () : unit Lwt.t =
|
|||
let n_jobs = max 1 !n_jobs in
|
||||
Printf.printf "run %d jobs\n%!" n_jobs;
|
||||
|
||||
let jobs = Array.init n_jobs (fun _ -> run_job ()) |> Array.to_list in
|
||||
let jobs = List.init n_jobs run_job in
|
||||
Lwt.join jobs
|
||||
|
||||
let () =
|
||||
|
|
@ -99,18 +107,21 @@ let () =
|
|||
let debug = ref false in
|
||||
let batch_traces = ref 400 in
|
||||
let batch_metrics = ref 3 in
|
||||
let batch_logs = ref 400 in
|
||||
let opts =
|
||||
[
|
||||
"--debug", Arg.Bool (( := ) debug), " enable debug output";
|
||||
( "--stress-alloc",
|
||||
Arg.Bool (( := ) stress_alloc_),
|
||||
" perform heavy allocs in inner loop" );
|
||||
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
|
||||
( "--batch-metrics",
|
||||
Arg.Int (( := ) batch_metrics),
|
||||
" size of metrics batch" );
|
||||
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
|
||||
"--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch";
|
||||
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
|
||||
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
|
||||
"--iterations", Arg.Set_int iterations, " the number of iterations to run";
|
||||
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
|
||||
]
|
||||
|> Arg.align
|
||||
|
|
@ -128,7 +139,7 @@ let () =
|
|||
Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug
|
||||
~batch_traces:(some_if_nzero batch_traces)
|
||||
~batch_metrics:(some_if_nzero batch_metrics)
|
||||
()
|
||||
~batch_logs:(some_if_nzero batch_logs) ()
|
||||
in
|
||||
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
|
||||
!sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config;
|
||||
|
|
|
|||
193
tests/client_e2e/clients_e2e_lib.ml
Normal file
193
tests/client_e2e/clients_e2e_lib.ml
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
module Client = Opentelemetry_client
|
||||
module Proto = Opentelemetry.Proto
|
||||
|
||||
let batch_size : Client.Signal.t -> int = function
|
||||
| Traces ts -> List.length ts
|
||||
| Logs ls -> List.length ls
|
||||
| Metrics ms -> List.length ms
|
||||
|
||||
let avg_batch_size (p : Client.Signal.t -> bool)
|
||||
(batches : Client.Signal.t list) : int =
|
||||
let sum =
|
||||
List.fold_left
|
||||
(fun acc b ->
|
||||
if p b then
|
||||
acc + batch_size b
|
||||
else
|
||||
acc)
|
||||
0 batches
|
||||
in
|
||||
sum / List.length batches
|
||||
|
||||
let signals_from_batch (signal_batch : Client.Signal.t) =
|
||||
match signal_batch with
|
||||
| Traces ts -> List.map (fun t -> `Trace t) ts
|
||||
| Logs ls -> List.map (fun l -> `Log l) ls
|
||||
| Metrics ms -> List.map (fun m -> `Metric m) ms
|
||||
|
||||
let filter_map_spans f signals =
|
||||
signals
|
||||
|> List.filter_map (function
|
||||
| `Log _ | `Metric _ -> None
|
||||
| `Trace (r : Proto.Trace.resource_spans) ->
|
||||
r.scope_spans
|
||||
|> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f))
|
||||
|
||||
let count_spans_with_name name signals =
|
||||
signals
|
||||
|> filter_map_spans (fun s ->
|
||||
if s.Proto.Trace.name = name then
|
||||
Some s
|
||||
else
|
||||
None)
|
||||
|> List.length
|
||||
|
||||
let filter_map_metrics f signals =
|
||||
signals
|
||||
|> List.filter_map (function
|
||||
| `Log _ | `Trace _ -> None
|
||||
| `Metric (r : Proto.Metrics.resource_metrics) ->
|
||||
r.scope_metrics
|
||||
|> List.find_map (fun ss ->
|
||||
ss.Proto.Metrics.metrics |> List.find_map f))
|
||||
|
||||
let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float
|
||||
= function
|
||||
| Proto.Metrics.As_double f -> f
|
||||
| Proto.Metrics.As_int i64 -> Int64.to_float i64
|
||||
|
||||
let get_metric_values name signals =
|
||||
signals
|
||||
|> filter_map_metrics (fun (m : Proto.Metrics.metric) ->
|
||||
if m.name <> name then
|
||||
None
|
||||
else
|
||||
Option.some
|
||||
@@
|
||||
match m.data with
|
||||
| Sum { data_points; is_monotonic = true; _ } ->
|
||||
List.fold_left
|
||||
(fun acc (p : Proto.Metrics.number_data_point) ->
|
||||
acc +. number_data_point_to_float p.value)
|
||||
0. data_points
|
||||
| _ -> failwith "TODO: Support for getting other metrics")
|
||||
|
||||
let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list =
|
||||
signals
|
||||
|> List.filter_map (function
|
||||
| `Metric _ | `Trace _ -> None
|
||||
| `Log (r : Proto.Logs.resource_logs) ->
|
||||
r.scope_logs
|
||||
|> List.find_map (fun ss ->
|
||||
ss.Proto.Logs.log_records |> List.find_map f))
|
||||
|
||||
let count_logs_with_body p signals =
|
||||
signals
|
||||
|> filter_map_logs (fun (l : Proto.Logs.log_record) ->
|
||||
if p l.body then
|
||||
Some ()
|
||||
else
|
||||
None)
|
||||
|> List.length
|
||||
|
||||
type params = {
|
||||
jobs: int;
|
||||
batch_traces: int;
|
||||
batch_metrics: int;
|
||||
batch_logs: int;
|
||||
iterations: int;
|
||||
}
|
||||
|
||||
let cmd exec params =
|
||||
[
|
||||
exec;
|
||||
"-j";
|
||||
string_of_int params.jobs;
|
||||
"--iterations";
|
||||
string_of_int params.iterations;
|
||||
"--batch-traces";
|
||||
string_of_int params.batch_traces;
|
||||
"--batch-metrics";
|
||||
string_of_int params.batch_metrics;
|
||||
"--batch-logs";
|
||||
string_of_int params.batch_logs;
|
||||
]
|
||||
|
||||
let test name f = Alcotest.test_case name `Quick f
|
||||
|
||||
let tests params signal_batches =
|
||||
let signals =
|
||||
signal_batches
|
||||
|> List.fold_left
|
||||
(fun acc b -> List.rev_append (signals_from_batch b) acc)
|
||||
[]
|
||||
in
|
||||
[
|
||||
(* TODO: What properties of batch sizes does it make sense to test? *)
|
||||
test "loop.outer spans" (fun () ->
|
||||
Alcotest.(check' int)
|
||||
~msg:"number of occurrences should equal the configured jobs"
|
||||
~expected:params.jobs
|
||||
~actual:(count_spans_with_name "loop.outer" signals));
|
||||
test "loop.inner spans" (fun () ->
|
||||
Alcotest.(check' int)
|
||||
~msg:
|
||||
"number of occurrences should equal the configured jobs * the \
|
||||
configured iterations"
|
||||
~expected:(params.jobs * params.iterations)
|
||||
~actual:(count_spans_with_name "loop.inner" signals));
|
||||
test "alloc spans" (fun () ->
|
||||
Alcotest.(check' int)
|
||||
~msg:
|
||||
"number of occurrences should equal the configured jobs * the \
|
||||
configured iterations"
|
||||
~expected:(params.jobs * params.iterations)
|
||||
~actual:(count_spans_with_name "alloc" signals);
|
||||
Alcotest.(check' bool)
|
||||
~msg:"should have 'done with alloc' event" ~expected:true
|
||||
~actual:
|
||||
(let all_alloc_events =
|
||||
signals
|
||||
|> filter_map_spans (fun s ->
|
||||
if s.name <> "alloc" then
|
||||
Some s.events
|
||||
else
|
||||
None)
|
||||
|> List.flatten
|
||||
in
|
||||
all_alloc_events
|
||||
|> List.for_all (fun (e : Proto.Trace.span_event) ->
|
||||
e.name = "done with alloc")));
|
||||
test "num-sleep metrics" (fun () ->
|
||||
Alcotest.(check' (float 0.))
|
||||
~msg:"should record jobs * iterations sleeps"
|
||||
~expected:(params.jobs * params.iterations |> float_of_int)
|
||||
~actual:
|
||||
(get_metric_values "num-sleep" signals
|
||||
|> List.sort Float.compare |> List.rev |> List.hd));
|
||||
test "logs" (fun () ->
|
||||
Alcotest.(check' int)
|
||||
~msg:"should record jobs * iterations occurrences of 'inner at n'"
|
||||
~expected:(params.jobs * params.iterations)
|
||||
~actual:
|
||||
(signals
|
||||
|> count_logs_with_body (function
|
||||
| Some (Proto.Common.String_value s)
|
||||
when String.starts_with ~prefix:"inner at" s ->
|
||||
true
|
||||
| _ -> false)));
|
||||
]
|
||||
|
||||
let run_tests cmds =
|
||||
let suites =
|
||||
cmds
|
||||
|> List.map (fun (exec, params) ->
|
||||
let cmd = cmd exec params in
|
||||
let name = cmd |> String.concat " " in
|
||||
let signal_batches = Signal_gatherer.gather_signals cmd in
|
||||
(* Let server reset *)
|
||||
Unix.sleep 1;
|
||||
name, tests params signal_batches)
|
||||
in
|
||||
let open Alcotest in
|
||||
run "Collector integration tests" suites
|
||||
35
tests/client_e2e/dune
Normal file
35
tests/client_e2e/dune
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
(env
|
||||
(_
|
||||
; Make the binaries for the test emitters available on the path for the components defined in this dir.
|
||||
; See https://dune.readthedocs.io/en/stable/reference/dune/env.html
|
||||
(binaries
|
||||
(../bin/emit1.exe as emit1)
|
||||
(../bin/emit1_cohttp.exe as emit1_cohttp)
|
||||
(./gather_signals.exe as gather_signals))))
|
||||
|
||||
(library
|
||||
(name signal_gatherer)
|
||||
(modules signal_gatherer)
|
||||
(libraries
|
||||
str
|
||||
alcotest
|
||||
cohttp-lwt-unix
|
||||
fmt
|
||||
unix
|
||||
logs.fmt
|
||||
logs.threaded
|
||||
opentelemetry.client))
|
||||
|
||||
(library
|
||||
(name clients_e2e_lib)
|
||||
(modules clients_e2e_lib)
|
||||
(libraries alcotest signal_gatherer))
|
||||
|
||||
(tests
|
||||
(names test_cottp_lwt_client_e2e)
|
||||
(modules test_cottp_lwt_client_e2e)
|
||||
(package opentelemetry-client-cohttp-lwt)
|
||||
(deps %{bin:emit1_cohttp})
|
||||
(libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client))
|
||||
|
||||
; TODO : Add tests for ocurl's emit1
|
||||
3
tests/client_e2e/gather_signals.ml
Normal file
3
tests/client_e2e/gather_signals.ml
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
let () =
|
||||
let program_to_test = Sys.argv |> Array.to_list |> List.tl in
|
||||
Signal_gatherer.run ~program_to_test ()
|
||||
151
tests/client_e2e/signal_gatherer.ml
Normal file
151
tests/client_e2e/signal_gatherer.ml
Normal file
|
|
@ -0,0 +1,151 @@
|
|||
(* A runs tests against a OTel-instrumented program *)
|
||||
|
||||
module Client = Opentelemetry_client
|
||||
module Signal = Client.Signal
|
||||
module Proto = Opentelemetry.Proto
|
||||
open Lwt.Syntax
|
||||
|
||||
(* Server to collect telemetry data *)
|
||||
module Server = struct
|
||||
let dbg_request kind req pp data : unit Lwt.t =
|
||||
let _ = kind, req, pp, data in
|
||||
(* NOTE: Uncomment for debugging *)
|
||||
(* let* () = *)
|
||||
(* let req : string = Format.asprintf "%a" Http.Request.pp req in *)
|
||||
(* let data_s : string = Format.asprintf "%a" pp data in *)
|
||||
(* Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@." *)
|
||||
(* kind req data_s *)
|
||||
(* in *)
|
||||
Lwt.return ()
|
||||
|
||||
let metrics req data =
|
||||
let metrics = Signal.Decode.metrics data in
|
||||
let+ () = dbg_request "metrics" req Signal.Pp.metrics metrics in
|
||||
Signal.Metrics metrics
|
||||
|
||||
let handler push_signal _socket (request : Http.Request.t)
|
||||
(body : Cohttp_lwt.Body.t) =
|
||||
let* data = Cohttp_lwt.Body.to_string body in
|
||||
let* status, signal =
|
||||
match Http.Request.resource request with
|
||||
| "/v1/traces" ->
|
||||
let traces = Signal.Decode.traces data in
|
||||
let+ () = dbg_request "trace" request Signal.Pp.traces traces in
|
||||
`OK, Some (Signal.Traces traces)
|
||||
| "/v1/metrics" ->
|
||||
let metrics = Signal.Decode.metrics data in
|
||||
let+ () = dbg_request "metrics" request Signal.Pp.metrics metrics in
|
||||
`OK, Some (Signal.Metrics metrics)
|
||||
| "/v1/logs" ->
|
||||
let logs = Signal.Decode.logs data in
|
||||
let+ () = dbg_request "logs" request Signal.Pp.logs logs in
|
||||
`OK, Some (Signal.Logs logs)
|
||||
| unexepected ->
|
||||
let+ () = Lwt_io.eprintf "unexpected endpoint %s\n" unexepected in
|
||||
`Not_found, None
|
||||
in
|
||||
push_signal signal;
|
||||
let resp_body = Cohttp_lwt.Body.of_string "" in
|
||||
Cohttp_lwt_unix.Server.respond ~status ~body:resp_body ()
|
||||
|
||||
let run port push_signals =
|
||||
let* () = Lwt_io.eprintf "starting server\n" in
|
||||
Cohttp_lwt_unix.Server.(
|
||||
make ~callback:(handler push_signals) ()
|
||||
|> create ~mode:(`TCP (`Port port)))
|
||||
end
|
||||
|
||||
(** Manage launching and cleaning up the program we are testing *)
|
||||
module Tested_program = struct
|
||||
let validate_exit = function
|
||||
| Unix.WEXITED 0 -> ()
|
||||
| Unix.WEXITED bad_code ->
|
||||
failwith
|
||||
@@ Printf.sprintf "process under test ended with bad exit code %d"
|
||||
bad_code
|
||||
| Unix.WSIGNALED i ->
|
||||
failwith
|
||||
@@ Printf.sprintf "process under test ended with unexpected signal %d" i
|
||||
| Unix.WSTOPPED i ->
|
||||
failwith
|
||||
@@ Printf.sprintf "process under test ended with unexpected stop %d" i
|
||||
|
||||
let run program_to_test =
|
||||
let redirect = `FD_copy Unix.stderr in
|
||||
let cmd = "", Array.of_list program_to_test in
|
||||
(* Give server time to be online *)
|
||||
let* () = Lwt_unix.sleep 0.5 in
|
||||
let* () =
|
||||
Lwt_io.eprintf "running command: %s\n"
|
||||
(Format.asprintf "%a"
|
||||
(Format.pp_print_list
|
||||
~pp_sep:(fun fmt () -> Format.pp_print_string fmt " ")
|
||||
Format.pp_print_string)
|
||||
program_to_test)
|
||||
in
|
||||
let* result = Lwt_process.exec ~stdout:redirect cmd in
|
||||
(* Give server time process signals *)
|
||||
let+ () = Lwt_unix.sleep 0.5 in
|
||||
validate_exit result
|
||||
end
|
||||
|
||||
let collect_traces ~port program_to_test push_signals () =
|
||||
let* () =
|
||||
Lwt.pick
|
||||
[ Server.run port push_signals; Tested_program.run program_to_test ]
|
||||
in
|
||||
(* Let the tester know all the signals have be sent *)
|
||||
Lwt.return (push_signals None)
|
||||
|
||||
let normalize_scope_span : Proto.Trace.scope_spans -> Proto.Trace.scope_spans =
|
||||
function
|
||||
| scope_span ->
|
||||
{
|
||||
scope_span with
|
||||
spans =
|
||||
scope_span.spans
|
||||
|> List.map (fun (span : Proto.Trace.span) ->
|
||||
{
|
||||
span with
|
||||
start_time_unix_nano = -1L;
|
||||
end_time_unix_nano = -1L;
|
||||
});
|
||||
}
|
||||
|
||||
let normalize_signal : Signal.t -> Signal.t = function
|
||||
| Traces ts ->
|
||||
Traces
|
||||
(ts
|
||||
|> List.map (fun (trace : Proto.Trace.resource_spans) ->
|
||||
{
|
||||
trace with
|
||||
scope_spans = trace.scope_spans |> List.map normalize_scope_span;
|
||||
}))
|
||||
| x -> x
|
||||
|
||||
(* normalize trace output by redacting non-deterministic values from output *)
|
||||
let normalize =
|
||||
let re =
|
||||
Str.regexp
|
||||
{|\(start_time_unix_nano\|time_unix_nano\|end_time_unix_nano\|value\) = \([0-9]*\|As_int([0-9]*)\|As_double([0-9]*\.)\);|}
|
||||
in
|
||||
fun s -> Str.global_replace re {|\1 = <redacted>;|} s
|
||||
|
||||
let default_port =
|
||||
String.split_on_char ':' Client.Config.default_url |> function
|
||||
(* Extracting the port from 'http://foo:<port>' *)
|
||||
| [ _; _; port ] -> int_of_string port
|
||||
| _ -> failwith "unexpected format in Client.Config.default_url"
|
||||
|
||||
let gather_signals ?(port = default_port) program_to_test =
|
||||
Lwt_main.run
|
||||
@@
|
||||
let stream, push = Lwt_stream.create () in
|
||||
let* () = collect_traces ~port program_to_test push () in
|
||||
Lwt_stream.to_list stream
|
||||
|
||||
let run ?(port = default_port) ~program_to_test () =
|
||||
gather_signals ~port program_to_test
|
||||
|> List.map (fun s -> s |> Format.asprintf "%a" Signal.Pp.pp |> normalize)
|
||||
|> List.stable_sort String.compare (* Produce a deterministic order *)
|
||||
|> List.iter print_string
|
||||
33
tests/client_e2e/test_cottp_lwt_client_e2e.ml
Normal file
33
tests/client_e2e/test_cottp_lwt_client_e2e.ml
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
module Client = Opentelemetry_client
|
||||
module Proto = Opentelemetry.Proto
|
||||
open Clients_e2e_lib
|
||||
|
||||
let () =
|
||||
Clients_e2e_lib.run_tests
|
||||
[
|
||||
(* TODO: Running with batch-traces = 1 causes deadlocks *)
|
||||
(* ( "emit1_cohttp", *)
|
||||
(* { *)
|
||||
(* jobs = 1; *)
|
||||
(* iterations = 1; *)
|
||||
(* batch_traces = 1; *)
|
||||
(* batch_metrics = 1; *)
|
||||
(* batch_logs = 1; *)
|
||||
(* } ); *)
|
||||
( "emit1_cohttp",
|
||||
{
|
||||
jobs = 1;
|
||||
iterations = 1;
|
||||
batch_traces = 2;
|
||||
batch_metrics = 2;
|
||||
batch_logs = 2;
|
||||
} );
|
||||
( "emit1_cohttp",
|
||||
{
|
||||
jobs = 3;
|
||||
iterations = 1;
|
||||
batch_traces = 400;
|
||||
batch_metrics = 3;
|
||||
batch_logs = 400;
|
||||
} );
|
||||
]
|
||||
Loading…
Add table
Reference in a new issue