Merge pull request #96 from shonfeder/tests

Add integration tests for collectors
This commit is contained in:
Simon Cruanes 2025-07-12 00:19:33 -04:00 committed by GitHub
commit a5af3c9b65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 639 additions and 73 deletions

View file

@ -122,5 +122,6 @@
(>= "2.0"))
cohttp-lwt
cohttp-lwt-unix
(alcotest :with-test))
(alcotest :with-test)
(containers :with-test))
(synopsis "Collector client for opentelemetry, using cohttp + lwt"))

View file

@ -22,6 +22,7 @@ depends: [
"cohttp-lwt"
"cohttp-lwt-unix"
"alcotest" {with-test}
"containers" {with-test}
]
build: [
["dune" "subst"] {dev}

View file

@ -190,7 +190,6 @@ end
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in
let open Lwt.Syntax in
let module Conv = Signal.Converter in
(* local helpers *)
let open struct
let timeout =
@ -228,13 +227,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Lwt_unix.sleep 3.
let send_metrics_http client (l : Metrics.resource_metrics list) =
Conv.metrics l |> send_http_ client ~url:config.url_metrics
Signal.Encode.metrics l |> send_http_ client ~url:config.url_metrics
let send_traces_http client (l : Trace.resource_spans list) =
Conv.traces l |> send_http_ client ~url:config.url_traces
Signal.Encode.traces l |> send_http_ client ~url:config.url_traces
let send_logs_http client (l : Logs.resource_logs list) =
Conv.logs l |> send_http_ client ~url:config.url_logs
Signal.Encode.logs l |> send_http_ client ~url:config.url_logs
(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force httpc : bool Lwt.t =

View file

@ -201,20 +201,19 @@ end = struct
in
conv l |> send_http_ ~stop ~config ~url client
in
let module Conv = Signal.Converter in
try
while not (Atomic.get stop) do
let msg = B_queue.pop self.send_q in
match msg with
| To_send.Send_trace tr ->
send ~name:"send-traces" ~conv:Conv.traces
send ~name:"send-traces" ~conv:Signal.Encode.traces
~url:config.common.url_traces tr
| To_send.Send_metric ms ->
send ~name:"send-metrics" ~conv:Conv.metrics
send ~name:"send-metrics" ~conv:Signal.Encode.metrics
~url:config.common.url_metrics ms
| To_send.Send_logs logs ->
send ~name:"send-logs" ~conv:Conv.logs ~url:config.common.url_logs
logs
send ~name:"send-logs" ~conv:Signal.Encode.logs
~url:config.common.url_logs logs
done
with B_queue.Closed -> ()

View file

@ -36,6 +36,8 @@ let pp out (self : t) : unit =
debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt
batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms
let default_url = "http://localhost:4318"
type 'k make =
?debug:bool ->
?url:string ->
@ -73,8 +75,6 @@ module Env () : ENV = struct
let set_debug b = debug_ := b
let default_url = "http://localhost:4318"
let make_get_from_env env_name =
let value = ref None in
fun () ->

View file

@ -38,6 +38,9 @@ type t = private {
To build one, use {!make} below. This might be extended with more fields in
the future. *)
val default_url : string
(** The default base URL for the config. *)
val pp : Format.formatter -> t -> unit
type 'k make =
@ -59,8 +62,8 @@ type 'k make =
@param url
base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
precedence over this base url. If not provided, this defaults to
"OTEL_EXPORTER_OTLP_ENDPOINT" if set, or if not {!default_url}.
Example of constructed per-signal urls with the base url
http://localhost:4318

View file

@ -5,7 +5,38 @@ module Span = Opentelemetry.Span
let ( let@ ) = ( @@ )
module Converter = struct
module Proto = Opentelemetry.Proto
type t =
| Traces of Proto.Trace.resource_spans list
| Metrics of Proto.Metrics.resource_metrics list
| Logs of Proto.Logs.resource_logs list
let to_traces = function
| Traces xs -> Some xs
| _ -> None
let to_metrics = function
| Metrics xs -> Some xs
| _ -> None
let to_logs = function
| Logs xs -> Some xs
| _ -> None
let is_traces = function
| Traces _ -> true
| _ -> false
let is_metrics = function
| Metrics _ -> true
| _ -> false
let is_logs = function
| Logs _ -> true
| _ -> false
module Encode = struct
let resource_to_string ~encoder ~ctor ~enc resource =
let encoder =
match encoder with
@ -16,7 +47,6 @@ module Converter = struct
in
let x = ctor resource in
let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in
Pbrt.Encoder.reset encoder;
enc x encoder;
Pbrt.Encoder.to_string encoder
@ -43,3 +73,42 @@ module Converter = struct
())
~enc:Trace_service.encode_pb_export_trace_service_request
end
module Decode = struct
let resource_of_string ~dec s = Pbrt.Decoder.of_string s |> dec
let logs data =
(resource_of_string ~dec:Logs_service.decode_pb_export_logs_service_request
data)
.resource_logs
let metrics data =
(resource_of_string
~dec:Metrics_service.decode_pb_export_metrics_service_request data)
.resource_metrics
let traces data =
(resource_of_string
~dec:Trace_service.decode_pb_export_trace_service_request data)
.resource_spans
end
module Pp = struct
let pp_sep fmt () = Format.fprintf fmt ",@."
let pp_signal pp fmt t =
Format.fprintf fmt "[@ @[";
Format.pp_print_list ~pp_sep pp fmt t;
Format.fprintf fmt "@ ]@]@."
let logs = pp_signal Proto.Logs.pp_resource_logs
let metrics = pp_signal Proto.Metrics.pp_resource_metrics
let traces = pp_signal Proto.Trace.pp_resource_spans
let pp fmt = function
| Logs ls -> logs fmt ls
| Metrics ms -> metrics fmt ms
| Traces ts -> traces fmt ts
end

View file

@ -1,11 +1,31 @@
(** Constructing and managing OTel
{{:https://opentelemetry.io/docs/concepts/signals/} signals} *)
(** Convert signals to protobuf encoded strings, ready to be sent over the wire
(** The type of signals
NOTE: The converters share an underlying stateful encoder, so each domain or
system thread should have its own [Converter] instance *)
module Converter : sig
This is not the principle type of signals from the perspective of what gets
encoded and sent via protocl buffers, but it is the principle type that
collector clients needs to reason about. *)
type t =
| Traces of Opentelemetry_proto.Trace.resource_spans list
| Metrics of Opentelemetry_proto.Metrics.resource_metrics list
| Logs of Opentelemetry_proto.Logs.resource_logs list
val to_traces : t -> Opentelemetry_proto.Trace.resource_spans list option
val to_metrics : t -> Opentelemetry_proto.Metrics.resource_metrics list option
val to_logs : t -> Opentelemetry_proto.Logs.resource_logs list option
val is_traces : t -> bool
val is_metrics : t -> bool
val is_logs : t -> bool
(** Encode signals to protobuf encoded strings, ready to be sent over the wire
*)
module Encode : sig
val logs :
?encoder:Pbrt.Encoder.t ->
Opentelemetry_proto.Logs.resource_logs list ->
@ -25,7 +45,43 @@ module Converter : sig
?encoder:Pbrt.Encoder.t ->
Opentelemetry_proto.Trace.resource_spans list ->
string
(** [metrics ts] is a protobuf encoded string of the traces [ts]
(** [traces ts] is a protobuf encoded string of the traces [ts]
@param encoder provide an encoder state to reuse *)
end
(** Decode signals from protobuf encoded strings, received over the wire *)
module Decode : sig
val logs : string -> Opentelemetry_proto.Logs.resource_logs list
(** [logs s] is a list of log resources decoded from the protobuf encoded
string [s].
@raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *)
val metrics : string -> Opentelemetry_proto.Metrics.resource_metrics list
(** [metrics s] is a list of metrics resources decoded from the protobuf
encoded string [s].
@raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *)
val traces : string -> Opentelemetry_proto.Trace.resource_spans list
(** [traces s] is a list of span resources decoded from the protobuf encoded
string [s].
@raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *)
end
module Pp : sig
val logs :
Format.formatter -> Opentelemetry_proto.Logs.resource_logs list -> unit
val metrics :
Format.formatter ->
Opentelemetry_proto.Metrics.resource_metrics list ->
unit
val traces :
Format.formatter -> Opentelemetry_proto.Trace.resource_spans list -> unit
val pp : Format.formatter -> t -> unit
end

View file

@ -20,8 +20,9 @@ let run () =
in
let* () = Lwt_unix.sleep !sleep_outer in
let module C = (val mk_client ~scope) in
(* Using the same default server O *)
let* _res, body =
C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net")
C.get (Uri.of_string Opentelemetry_client.Config.default_url)
in
let* () = Cohttp_lwt.Body.drain_body body in
go ()

View file

@ -1,7 +1,11 @@
(executable
(name emit1)
(modules emit1)
(libraries unix opentelemetry opentelemetry-client-ocurl))
(libraries
unix
opentelemetry
opentelemetry.client
opentelemetry-client-ocurl))
(executable
(name emit1_cohttp)
@ -12,6 +16,7 @@
unix
opentelemetry
opentelemetry-lwt
opentelemetry.client
opentelemetry-client-cohttp-lwt
lwt.unix))

View file

@ -99,12 +99,16 @@ let run () =
Array.iter Thread.join jobs
let () =
Sys.catch_break true;
T.Globals.service_name := "t1";
T.Globals.service_namespace := Some "ocaml-otel.test";
let ts_start = Unix.gettimeofday () in
let debug = ref false in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let batch_logs = ref 400 in
let n_bg_threads = ref 0 in
let opts =
[
@ -112,6 +116,11 @@ let () =
( "--stress-alloc",
Arg.Bool (( := ) stress_alloc_),
" perform heavy allocs in inner loop" );
( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
" size of metrics batch" );
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
"--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch";
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
@ -123,15 +132,18 @@ let () =
Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let some_if_nzero r =
if !r > 0 then
Some !r
else
None
in
let config =
Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true
?bg_threads:
(let n = !n_bg_threads in
if n = 0 then
None
else
Some n)
()
?bg_threads:(some_if_nzero n_bg_threads)
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
~batch_logs:(some_if_nzero batch_logs) ()
in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;

View file

@ -12,6 +12,8 @@ let sleep_outer = ref 2.0
let n_jobs = ref 1
let iterations = ref 1
let num_sleep = Atomic.make 0
let stress_alloc_ = ref true
@ -20,57 +22,63 @@ let stop = Atomic.make false
let num_tr = Atomic.make 0
let run_job () : unit Lwt.t =
let i = ref 0 in
(* Counter used to mark simulated failures *)
let i = ref 0
let run_job job_id : unit Lwt.t =
while%lwt not @@ Atomic.get stop do
let@ scope =
Atomic.incr num_tr;
T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
~attrs:[ "i", `Int !i ]
~attrs:[ "i", `Int job_id ]
in
for%lwt j = 0 to 4 do
(* parent scope is found via thread local storage *)
let@ scope =
Atomic.incr num_tr;
T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal
~attrs:[ "j", `Int j ]
"loop.inner"
in
let* () = Lwt_unix.sleep !sleep_outer in
Atomic.incr num_sleep;
T.Logs.(
emit
[
make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id
~severity:Severity_number_info "inner at %d" j;
]);
incr i;
try%lwt
Atomic.incr num_tr;
for%lwt j = 0 to !iterations do
if j >= !iterations then
(* Terminate program, having reached our max iterations *)
Lwt.return @@ Atomic.set stop true
else
(* parent scope is found via thread local storage *)
let@ scope =
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc"
Atomic.incr num_tr;
T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal
~attrs:[ "j", `Int j ]
"loop.inner"
in
(* allocate some stuff *)
if !stress_alloc_ then (
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
ignore _arr
);
let* () = Lwt_unix.sleep !sleep_inner in
let* () = Lwt_unix.sleep !sleep_outer in
Atomic.incr num_sleep;
if j = 4 && !i mod 13 = 0 then failwith "oh no";
T.Logs.(
emit
[
make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id
~severity:Severity_number_info "inner at %d" j;
]);
(* simulate a failure *)
Opentelemetry.Scope.add_event scope (fun () ->
T.Event.make "done with alloc");
Lwt.return ()
with Failure _ -> Lwt.return ()
incr i;
try%lwt
Atomic.incr num_tr;
let@ scope =
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc"
in
(* allocate some stuff *)
if !stress_alloc_ then (
let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in
ignore _arr
);
let* () = Lwt_unix.sleep !sleep_inner in
Atomic.incr num_sleep;
(* simulate a failure *)
if j = 4 && !i mod 13 = 0 then failwith "oh no";
Opentelemetry.Scope.add_event scope (fun () ->
T.Event.make "done with alloc");
Lwt.return ()
with Failure _ -> Lwt.return ()
done
done
@ -87,7 +95,7 @@ let run () : unit Lwt.t =
let n_jobs = max 1 !n_jobs in
Printf.printf "run %d jobs\n%!" n_jobs;
let jobs = Array.init n_jobs (fun _ -> run_job ()) |> Array.to_list in
let jobs = List.init n_jobs run_job in
Lwt.join jobs
let () =
@ -99,18 +107,21 @@ let () =
let debug = ref false in
let batch_traces = ref 400 in
let batch_metrics = ref 3 in
let batch_logs = ref 400 in
let opts =
[
"--debug", Arg.Bool (( := ) debug), " enable debug output";
( "--stress-alloc",
Arg.Bool (( := ) stress_alloc_),
" perform heavy allocs in inner loop" );
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
( "--batch-metrics",
Arg.Int (( := ) batch_metrics),
" size of metrics batch" );
"--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch";
"--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch";
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"--iterations", Arg.Set_int iterations, " the number of iterations to run";
"-j", Arg.Set_int n_jobs, " number of parallel jobs";
]
|> Arg.align
@ -128,7 +139,7 @@ let () =
Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics)
()
~batch_logs:(some_if_nzero batch_logs) ()
in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config;

View file

@ -0,0 +1,194 @@
module Client = Opentelemetry_client
module Proto = Opentelemetry.Proto
open Containers
let batch_size : Client.Signal.t -> int = function
| Traces ts -> List.length ts
| Logs ls -> List.length ls
| Metrics ms -> List.length ms
let avg_batch_size (p : Client.Signal.t -> bool)
(batches : Client.Signal.t list) : int =
let sum =
List.fold_left
(fun acc b ->
if p b then
acc + batch_size b
else
acc)
0 batches
in
sum / List.length batches
let signals_from_batch (signal_batch : Client.Signal.t) =
match signal_batch with
| Traces ts -> List.map (fun t -> `Trace t) ts
| Logs ls -> List.map (fun l -> `Log l) ls
| Metrics ms -> List.map (fun m -> `Metric m) ms
let filter_map_spans f signals =
signals
|> List.filter_map (function
| `Log _ | `Metric _ -> None
| `Trace (r : Proto.Trace.resource_spans) ->
r.scope_spans
|> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f))
let count_spans_with_name name signals =
signals
|> filter_map_spans (fun s ->
if String.equal s.Proto.Trace.name name then
Some s
else
None)
|> List.length
let filter_map_metrics f signals =
signals
|> List.filter_map (function
| `Log _ | `Trace _ -> None
| `Metric (r : Proto.Metrics.resource_metrics) ->
r.scope_metrics
|> List.find_map (fun ss ->
ss.Proto.Metrics.metrics |> List.find_map f))
let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float
= function
| Proto.Metrics.As_double f -> f
| Proto.Metrics.As_int i64 -> Int64.to_float i64
let get_metric_values name signals =
signals
|> filter_map_metrics (fun (m : Proto.Metrics.metric) ->
if not (String.equal m.name name) then
None
else
Option.some
@@
match m.data with
| Sum { data_points; is_monotonic = true; _ } ->
List.fold_left
(fun acc (p : Proto.Metrics.number_data_point) ->
acc +. number_data_point_to_float p.value)
0. data_points
| _ -> failwith "TODO: Support for getting other metrics")
let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list =
signals
|> List.filter_map (function
| `Metric _ | `Trace _ -> None
| `Log (r : Proto.Logs.resource_logs) ->
r.scope_logs
|> List.find_map (fun ss ->
ss.Proto.Logs.log_records |> List.find_map f))
let count_logs_with_body p signals =
signals
|> filter_map_logs (fun (l : Proto.Logs.log_record) ->
if p l.body then
Some ()
else
None)
|> List.length
type params = {
jobs: int;
batch_traces: int;
batch_metrics: int;
batch_logs: int;
iterations: int;
}
let cmd exec params =
[
exec;
"-j";
string_of_int params.jobs;
"--iterations";
string_of_int params.iterations;
"--batch-traces";
string_of_int params.batch_traces;
"--batch-metrics";
string_of_int params.batch_metrics;
"--batch-logs";
string_of_int params.batch_logs;
]
let test name f = Alcotest.test_case name `Quick f
let tests params signal_batches =
let signals =
signal_batches
|> List.fold_left
(fun acc b -> List.rev_append (signals_from_batch b) acc)
[]
in
[
(* TODO: What properties of batch sizes does it make sense to test? *)
test "loop.outer spans" (fun () ->
Alcotest.(check' int)
~msg:"number of occurrences should equal the configured jobs"
~expected:params.jobs
~actual:(count_spans_with_name "loop.outer" signals));
test "loop.inner spans" (fun () ->
Alcotest.(check' int)
~msg:
"number of occurrences should equal the configured jobs * the \
configured iterations"
~expected:(params.jobs * params.iterations)
~actual:(count_spans_with_name "loop.inner" signals));
test "alloc spans" (fun () ->
Alcotest.(check' int)
~msg:
"number of occurrences should equal the configured jobs * the \
configured iterations"
~expected:(params.jobs * params.iterations)
~actual:(count_spans_with_name "alloc" signals);
Alcotest.(check' bool)
~msg:"should have 'done with alloc' event" ~expected:true
~actual:
(let all_alloc_events =
signals
|> filter_map_spans (fun s ->
if not (String.equal s.name "alloc") then
Some s.events
else
None)
|> List.flatten
in
all_alloc_events
|> List.for_all (fun (e : Proto.Trace.span_event) ->
String.equal e.name "done with alloc")));
test "num-sleep metrics" (fun () ->
Alcotest.(check' (float 0.))
~msg:"should record jobs * iterations sleeps"
~expected:(params.jobs * params.iterations |> float_of_int)
~actual:
(get_metric_values "num-sleep" signals
|> List.sort Float.compare |> List.rev |> List.hd));
test "logs" (fun () ->
Alcotest.(check' int)
~msg:"should record jobs * iterations occurrences of 'inner at n'"
~expected:(params.jobs * params.iterations)
~actual:
(signals
|> count_logs_with_body (function
| Some (Proto.Common.String_value s)
when String.prefix ~pre:"inner at" s ->
true
| _ -> false)));
]
let run_tests cmds =
let suites =
cmds
|> List.map (fun (exec, params) ->
let cmd = cmd exec params in
let name = cmd |> String.concat " " in
let signal_batches = Signal_gatherer.gather_signals cmd in
(* Let server reset *)
Unix.sleep 1;
name, tests params signal_batches)
in
let open Alcotest in
run "Collector integration tests" suites

39
tests/client_e2e/dune Normal file
View file

@ -0,0 +1,39 @@
(env
(_
; Make the binaries for the test emitters available on the path for the components defined in this dir.
; See https://dune.readthedocs.io/en/stable/reference/dune/env.html
(binaries
(../bin/emit1.exe as emit1)
(../bin/emit1_cohttp.exe as emit1_cohttp)
(./gather_signals.exe as gather_signals))))
(library
(name signal_gatherer)
(modules signal_gatherer)
(libraries
str
alcotest
cohttp-lwt-unix
fmt
unix
containers
logs.fmt
logs.threaded
opentelemetry.client))
(library
(name clients_e2e_lib)
(modules clients_e2e_lib)
(libraries alcotest signal_gatherer))
(tests
(names test_cottp_lwt_client_e2e)
(modules test_cottp_lwt_client_e2e)
(package opentelemetry-client-cohttp-lwt)
(deps %{bin:emit1_cohttp})
(libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client))
(executable
(name signal_reporter_server)
(modules signal_reporter_server)
(libraries signal_gatherer))

View file

@ -0,0 +1,118 @@
(* A runs tests against a OTel-instrumented program *)
module Client = Opentelemetry_client
module Signal = Client.Signal
open Lwt.Syntax
let debug =
match Sys.getenv_opt "DEBUG" with
| Some "1" -> true
| _ -> false
(* Server to collect telemetry data *)
module Server = struct
let dbg_request kind req pp data : unit Lwt.t =
if debug then (
let _ = kind, req, pp, data in
let req : string = Format.asprintf "%a" Http.Request.pp req in
let data_s : string = Format.asprintf "%a" pp data in
Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@."
kind req data_s
) else
Lwt.return_unit
let handler push_signal _socket (request : Http.Request.t)
(body : Cohttp_lwt.Body.t) =
let* data = Cohttp_lwt.Body.to_string body in
let* status, signal =
match Http.Request.resource request with
| "/v1/traces" ->
let traces = Signal.Decode.traces data in
let+ () = dbg_request "trace" request Signal.Pp.traces traces in
`OK, Some (Signal.Traces traces)
| "/v1/metrics" ->
let metrics = Signal.Decode.metrics data in
let+ () = dbg_request "metrics" request Signal.Pp.metrics metrics in
`OK, Some (Signal.Metrics metrics)
| "/v1/logs" ->
let logs = Signal.Decode.logs data in
let+ () = dbg_request "logs" request Signal.Pp.logs logs in
`OK, Some (Signal.Logs logs)
| unexepected ->
let+ () = Lwt_io.eprintf "unexpected endpoint %s\n" unexepected in
`Not_found, None
in
push_signal signal;
let resp_body = Cohttp_lwt.Body.of_string "" in
Cohttp_lwt_unix.Server.respond ~status ~body:resp_body ()
let run port push_signals =
let* () = Lwt_io.eprintf "starting server\n" in
Cohttp_lwt_unix.Server.(
make ~callback:(handler push_signals) ()
|> create ~mode:(`TCP (`Port port)))
end
(** Manage launching and cleaning up the program we are testing *)
module Tested_program = struct
let validate_exit = function
| Unix.WEXITED 0 -> ()
| Unix.WEXITED bad_code ->
failwith
@@ Printf.sprintf "process under test ended with bad exit code %d"
bad_code
| Unix.WSIGNALED i ->
failwith
@@ Printf.sprintf "process under test ended with unexpected signal %d" i
| Unix.WSTOPPED i ->
failwith
@@ Printf.sprintf "process under test ended with unexpected stop %d" i
let run program_to_test =
let redirect = `FD_copy Unix.stderr in
let cmd = "", Array.of_list program_to_test in
(* Give server time to be online *)
let* () = Lwt_unix.sleep 0.5 in
let* () =
Lwt_io.eprintf "running command: %s\n"
(Format.asprintf "%a"
(Format.pp_print_list
~pp_sep:(fun fmt () -> Format.pp_print_string fmt " ")
Format.pp_print_string)
program_to_test)
in
let* result = Lwt_process.exec ~stdout:redirect cmd in
(* Give server time process signals *)
let+ () = Lwt_unix.sleep 0.5 in
validate_exit result
end
let default_port =
String.split_on_char ':' Client.Config.default_url |> function
(* Extracting the port from 'http://foo:<port>' *)
| [ _; _; port ] -> int_of_string port
| _ -> failwith "unexpected format in Client.Config.default_url"
let gather_signals ?(port = default_port) program_to_test =
Lwt_main.run
@@
let stream, push = Lwt_stream.create () in
let* () =
Lwt.pick [ Server.run port push; Tested_program.run program_to_test ]
in
(* Close out the stream *)
push None;
Lwt_stream.to_list stream
(* Just run the server, and print the signals gathered. *)
let run ?(port = default_port) () =
Lwt_main.run
@@
let stream, push = Lwt_stream.create () in
Lwt.join
[
Server.run port push;
Lwt_stream.iter_s
(fun s -> Format.asprintf "%a" Signal.Pp.pp s |> Lwt_io.printl)
stream;
]

View file

@ -0,0 +1,21 @@
(** A test utility for running a signal emitting executable alongside a minimal
server that can receive the signals make them available for inspection. *)
val gather_signals :
?port:int -> string list -> Opentelemetry_client.Signal.t list
(** [gather_signals program_to_test] is a list of all the signals emitted by the
[program_to_test], which the server was able to record. This function
assumes that the program to test will be sending its signals to the
localhost on [port].
@param port
the port where signals will be received. Default is port set in
{!Opentelemetry_client.Config.default_url}. *)
val run : ?port:int -> unit -> unit
(** [run ()] runs a signal gathering server and prints all batches of signals
received to stdout.
@param port
the port where signals will be received. Default is port set in
{!Opentelemetry_client.Config.default_url}. *)

View file

@ -0,0 +1,4 @@
(** Runs a signal gatherer server, and prints out every batch of signals
received to stdout. This can be used to monitor the signals sent by an
application, e.g., the test executables defined in /tests/bin/emit1*.ml *)
let () = Signal_gatherer.run ()

View file

@ -0,0 +1,33 @@
module Client = Opentelemetry_client
module Proto = Opentelemetry.Proto
open Clients_e2e_lib
let () =
Clients_e2e_lib.run_tests
[
(* TODO: Running with batch-traces = 1 causes deadlocks *)
(* ( "emit1_cohttp", *)
(* { *)
(* jobs = 1; *)
(* iterations = 1; *)
(* batch_traces = 1; *)
(* batch_metrics = 1; *)
(* batch_logs = 1; *)
(* } ); *)
( "emit1_cohttp",
{
jobs = 1;
iterations = 1;
batch_traces = 2;
batch_metrics = 2;
batch_logs = 2;
} );
( "emit1_cohttp",
{
jobs = 3;
iterations = 1;
batch_traces = 400;
batch_metrics = 3;
batch_logs = 400;
} );
]