diff --git a/dune-project b/dune-project index ec273074..8dd272eb 100644 --- a/dune-project +++ b/dune-project @@ -122,5 +122,6 @@ (>= "2.0")) cohttp-lwt cohttp-lwt-unix - (alcotest :with-test)) + (alcotest :with-test) + (containers :with-test)) (synopsis "Collector client for opentelemetry, using cohttp + lwt")) diff --git a/opentelemetry-client-cohttp-lwt.opam b/opentelemetry-client-cohttp-lwt.opam index 1a07ec45..988ad44a 100644 --- a/opentelemetry-client-cohttp-lwt.opam +++ b/opentelemetry-client-cohttp-lwt.opam @@ -22,6 +22,7 @@ depends: [ "cohttp-lwt" "cohttp-lwt-unix" "alcotest" {with-test} + "containers" {with-test} ] build: [ ["dune" "subst"] {dev} diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 51b186b6..82c4f65b 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -190,7 +190,6 @@ end let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in let open Lwt.Syntax in - let module Conv = Signal.Converter in (* local helpers *) let open struct let timeout = @@ -228,13 +227,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Lwt_unix.sleep 3. let send_metrics_http client (l : Metrics.resource_metrics list) = - Conv.metrics l |> send_http_ client ~url:config.url_metrics + Signal.Encode.metrics l |> send_http_ client ~url:config.url_metrics let send_traces_http client (l : Trace.resource_spans list) = - Conv.traces l |> send_http_ client ~url:config.url_traces + Signal.Encode.traces l |> send_http_ client ~url:config.url_traces let send_logs_http client (l : Logs.resource_logs list) = - Conv.logs l |> send_http_ client ~url:config.url_logs + Signal.Encode.logs l |> send_http_ client ~url:config.url_logs (* emit metrics, if the batch is full or timeout lapsed *) let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index c0e5eaaa..55d4788c 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -201,20 +201,19 @@ end = struct in conv l |> send_http_ ~stop ~config ~url client in - let module Conv = Signal.Converter in try while not (Atomic.get stop) do let msg = B_queue.pop self.send_q in match msg with | To_send.Send_trace tr -> - send ~name:"send-traces" ~conv:Conv.traces + send ~name:"send-traces" ~conv:Signal.Encode.traces ~url:config.common.url_traces tr | To_send.Send_metric ms -> - send ~name:"send-metrics" ~conv:Conv.metrics + send ~name:"send-metrics" ~conv:Signal.Encode.metrics ~url:config.common.url_metrics ms | To_send.Send_logs logs -> - send ~name:"send-logs" ~conv:Conv.logs ~url:config.common.url_logs - logs + send ~name:"send-logs" ~conv:Signal.Encode.logs + ~url:config.common.url_logs logs done with B_queue.Closed -> () diff --git a/src/client/config.ml b/src/client/config.ml index 2bdb633d..6a832291 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -36,6 +36,8 @@ let pp out (self : t) : unit = debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms +let default_url = "http://localhost:4318" + type 'k make = ?debug:bool -> ?url:string -> @@ -73,8 +75,6 @@ module Env () : ENV = struct let set_debug b = debug_ := b - let default_url = "http://localhost:4318" - let make_get_from_env env_name = let value = ref None in fun () -> diff --git a/src/client/config.mli b/src/client/config.mli index f3a4e6ec..b8d0238f 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -38,6 +38,9 @@ type t = private { To build one, use {!make} below. This might be extended with more fields in the future. *) +val default_url : string +(** The default base URL for the config. *) + val pp : Format.formatter -> t -> unit type 'k make = @@ -59,8 +62,8 @@ type 'k make = @param url base url used to construct per-signal urls. Per-signal url options take - precedence over this base url. Default is "http://localhost:4318", or - "OTEL_EXPORTER_OTLP_ENDPOINT" if set. + precedence over this base url. If not provided, this defaults to + "OTEL_EXPORTER_OTLP_ENDPOINT" if set, or if not {!default_url}. Example of constructed per-signal urls with the base url http://localhost:4318 diff --git a/src/client/signal.ml b/src/client/signal.ml index 91e9e332..7a2eddd5 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -5,7 +5,38 @@ module Span = Opentelemetry.Span let ( let@ ) = ( @@ ) -module Converter = struct +module Proto = Opentelemetry.Proto + +type t = + | Traces of Proto.Trace.resource_spans list + | Metrics of Proto.Metrics.resource_metrics list + | Logs of Proto.Logs.resource_logs list + +let to_traces = function + | Traces xs -> Some xs + | _ -> None + +let to_metrics = function + | Metrics xs -> Some xs + | _ -> None + +let to_logs = function + | Logs xs -> Some xs + | _ -> None + +let is_traces = function + | Traces _ -> true + | _ -> false + +let is_metrics = function + | Metrics _ -> true + | _ -> false + +let is_logs = function + | Logs _ -> true + | _ -> false + +module Encode = struct let resource_to_string ~encoder ~ctor ~enc resource = let encoder = match encoder with @@ -16,7 +47,6 @@ module Converter = struct in let x = ctor resource in let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in - Pbrt.Encoder.reset encoder; enc x encoder; Pbrt.Encoder.to_string encoder @@ -43,3 +73,42 @@ module Converter = struct ()) ~enc:Trace_service.encode_pb_export_trace_service_request end + +module Decode = struct + let resource_of_string ~dec s = Pbrt.Decoder.of_string s |> dec + + let logs data = + (resource_of_string ~dec:Logs_service.decode_pb_export_logs_service_request + data) + .resource_logs + + let metrics data = + (resource_of_string + ~dec:Metrics_service.decode_pb_export_metrics_service_request data) + .resource_metrics + + let traces data = + (resource_of_string + ~dec:Trace_service.decode_pb_export_trace_service_request data) + .resource_spans +end + +module Pp = struct + let pp_sep fmt () = Format.fprintf fmt ",@." + + let pp_signal pp fmt t = + Format.fprintf fmt "[@ @["; + Format.pp_print_list ~pp_sep pp fmt t; + Format.fprintf fmt "@ ]@]@." + + let logs = pp_signal Proto.Logs.pp_resource_logs + + let metrics = pp_signal Proto.Metrics.pp_resource_metrics + + let traces = pp_signal Proto.Trace.pp_resource_spans + + let pp fmt = function + | Logs ls -> logs fmt ls + | Metrics ms -> metrics fmt ms + | Traces ts -> traces fmt ts +end diff --git a/src/client/signal.mli b/src/client/signal.mli index ef2a6e24..a10f9e00 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -1,11 +1,31 @@ (** Constructing and managing OTel {{:https://opentelemetry.io/docs/concepts/signals/} signals} *) -(** Convert signals to protobuf encoded strings, ready to be sent over the wire +(** The type of signals - NOTE: The converters share an underlying stateful encoder, so each domain or - system thread should have its own [Converter] instance *) -module Converter : sig + This is not the principle type of signals from the perspective of what gets + encoded and sent via protocl buffers, but it is the principle type that + collector clients needs to reason about. *) +type t = + | Traces of Opentelemetry_proto.Trace.resource_spans list + | Metrics of Opentelemetry_proto.Metrics.resource_metrics list + | Logs of Opentelemetry_proto.Logs.resource_logs list + +val to_traces : t -> Opentelemetry_proto.Trace.resource_spans list option + +val to_metrics : t -> Opentelemetry_proto.Metrics.resource_metrics list option + +val to_logs : t -> Opentelemetry_proto.Logs.resource_logs list option + +val is_traces : t -> bool + +val is_metrics : t -> bool + +val is_logs : t -> bool + +(** Encode signals to protobuf encoded strings, ready to be sent over the wire +*) +module Encode : sig val logs : ?encoder:Pbrt.Encoder.t -> Opentelemetry_proto.Logs.resource_logs list -> @@ -25,7 +45,43 @@ module Converter : sig ?encoder:Pbrt.Encoder.t -> Opentelemetry_proto.Trace.resource_spans list -> string - (** [metrics ts] is a protobuf encoded string of the traces [ts] + (** [traces ts] is a protobuf encoded string of the traces [ts] @param encoder provide an encoder state to reuse *) end + +(** Decode signals from protobuf encoded strings, received over the wire *) +module Decode : sig + val logs : string -> Opentelemetry_proto.Logs.resource_logs list + (** [logs s] is a list of log resources decoded from the protobuf encoded + string [s]. + + @raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *) + + val metrics : string -> Opentelemetry_proto.Metrics.resource_metrics list + (** [metrics s] is a list of metrics resources decoded from the protobuf + encoded string [s]. + + @raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *) + + val traces : string -> Opentelemetry_proto.Trace.resource_spans list + (** [traces s] is a list of span resources decoded from the protobuf encoded + string [s]. + + @raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *) +end + +module Pp : sig + val logs : + Format.formatter -> Opentelemetry_proto.Logs.resource_logs list -> unit + + val metrics : + Format.formatter -> + Opentelemetry_proto.Metrics.resource_metrics list -> + unit + + val traces : + Format.formatter -> Opentelemetry_proto.Trace.resource_spans list -> unit + + val pp : Format.formatter -> t -> unit +end diff --git a/tests/bin/cohttp_client.ml b/tests/bin/cohttp_client.ml index 8cd4dbb0..a4523847 100644 --- a/tests/bin/cohttp_client.ml +++ b/tests/bin/cohttp_client.ml @@ -20,8 +20,9 @@ let run () = in let* () = Lwt_unix.sleep !sleep_outer in let module C = (val mk_client ~scope) in + (* Using the same default server O *) let* _res, body = - C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") + C.get (Uri.of_string Opentelemetry_client.Config.default_url) in let* () = Cohttp_lwt.Body.drain_body body in go () diff --git a/tests/bin/dune b/tests/bin/dune index cb4712fb..84ac795f 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -1,7 +1,11 @@ (executable (name emit1) (modules emit1) - (libraries unix opentelemetry opentelemetry-client-ocurl)) + (libraries + unix + opentelemetry + opentelemetry.client + opentelemetry-client-ocurl)) (executable (name emit1_cohttp) @@ -12,6 +16,7 @@ unix opentelemetry opentelemetry-lwt + opentelemetry.client opentelemetry-client-cohttp-lwt lwt.unix)) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index a9baa4d0..95a4d55b 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -99,12 +99,16 @@ let run () = Array.iter Thread.join jobs let () = - Sys.catch_break true; T.Globals.service_name := "t1"; T.Globals.service_namespace := Some "ocaml-otel.test"; let ts_start = Unix.gettimeofday () in let debug = ref false in + + let batch_traces = ref 400 in + let batch_metrics = ref 3 in + let batch_logs = ref 400 in + let n_bg_threads = ref 0 in let opts = [ @@ -112,6 +116,11 @@ let () = ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); + ( "--batch-metrics", + Arg.Int (( := ) batch_metrics), + " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; @@ -123,15 +132,18 @@ let () = Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + let some_if_nzero r = + if !r > 0 then + Some !r + else + None + in let config = Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true - ?bg_threads: - (let n = !n_bg_threads in - if n = 0 then - None - else - Some n) - () + ?bg_threads:(some_if_nzero n_bg_threads) + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~batch_logs:(some_if_nzero batch_logs) () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 1558b6b9..59acf285 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -12,6 +12,8 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 +let iterations = ref 1 + let num_sleep = Atomic.make 0 let stress_alloc_ = ref true @@ -20,57 +22,63 @@ let stop = Atomic.make false let num_tr = Atomic.make 0 -let run_job () : unit Lwt.t = - let i = ref 0 in +(* Counter used to mark simulated failures *) +let i = ref 0 + +let run_job job_id : unit Lwt.t = while%lwt not @@ Atomic.get stop do let@ scope = Atomic.incr num_tr; T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int !i ] + ~attrs:[ "i", `Int job_id ] in - for%lwt j = 0 to 4 do - (* parent scope is found via thread local storage *) - let@ scope = - Atomic.incr num_tr; - T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal - ~attrs:[ "j", `Int j ] - "loop.inner" - in - - let* () = Lwt_unix.sleep !sleep_outer in - Atomic.incr num_sleep; - - T.Logs.( - emit - [ - make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id - ~severity:Severity_number_info "inner at %d" j; - ]); - - incr i; - - try%lwt - Atomic.incr num_tr; + for%lwt j = 0 to !iterations do + if j >= !iterations then + (* Terminate program, having reached our max iterations *) + Lwt.return @@ Atomic.set stop true + else + (* parent scope is found via thread local storage *) let@ scope = - T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" + Atomic.incr num_tr; + T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal + ~attrs:[ "j", `Int j ] + "loop.inner" in - (* allocate some stuff *) - if !stress_alloc_ then ( - let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in - ignore _arr - ); - let* () = Lwt_unix.sleep !sleep_inner in + let* () = Lwt_unix.sleep !sleep_outer in Atomic.incr num_sleep; - if j = 4 && !i mod 13 = 0 then failwith "oh no"; + T.Logs.( + emit + [ + make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id + ~severity:Severity_number_info "inner at %d" j; + ]); - (* simulate a failure *) - Opentelemetry.Scope.add_event scope (fun () -> - T.Event.make "done with alloc"); - Lwt.return () - with Failure _ -> Lwt.return () + incr i; + + try%lwt + Atomic.incr num_tr; + let@ scope = + T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" + in + (* allocate some stuff *) + if !stress_alloc_ then ( + let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in + ignore _arr + ); + + let* () = Lwt_unix.sleep !sleep_inner in + Atomic.incr num_sleep; + + (* simulate a failure *) + if j = 4 && !i mod 13 = 0 then failwith "oh no"; + + Opentelemetry.Scope.add_event scope (fun () -> + T.Event.make "done with alloc"); + Lwt.return () + with Failure _ -> Lwt.return () done done @@ -87,7 +95,7 @@ let run () : unit Lwt.t = let n_jobs = max 1 !n_jobs in Printf.printf "run %d jobs\n%!" n_jobs; - let jobs = Array.init n_jobs (fun _ -> run_job ()) |> Array.to_list in + let jobs = List.init n_jobs run_job in Lwt.join jobs let () = @@ -99,18 +107,21 @@ let () = let debug = ref false in let batch_traces = ref 400 in let batch_metrics = ref 3 in + let batch_logs = ref 400 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); - "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; ( "--batch-metrics", Arg.Int (( := ) batch_metrics), " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + "--iterations", Arg.Set_int iterations, " the number of iterations to run"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; ] |> Arg.align @@ -128,7 +139,7 @@ let () = Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) - () + ~batch_logs:(some_if_nzero batch_logs) () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config; diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml new file mode 100644 index 00000000..aa79e84a --- /dev/null +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -0,0 +1,194 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto +open Containers + +let batch_size : Client.Signal.t -> int = function + | Traces ts -> List.length ts + | Logs ls -> List.length ls + | Metrics ms -> List.length ms + +let avg_batch_size (p : Client.Signal.t -> bool) + (batches : Client.Signal.t list) : int = + let sum = + List.fold_left + (fun acc b -> + if p b then + acc + batch_size b + else + acc) + 0 batches + in + sum / List.length batches + +let signals_from_batch (signal_batch : Client.Signal.t) = + match signal_batch with + | Traces ts -> List.map (fun t -> `Trace t) ts + | Logs ls -> List.map (fun l -> `Log l) ls + | Metrics ms -> List.map (fun m -> `Metric m) ms + +let filter_map_spans f signals = + signals + |> List.filter_map (function + | `Log _ | `Metric _ -> None + | `Trace (r : Proto.Trace.resource_spans) -> + r.scope_spans + |> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f)) + +let count_spans_with_name name signals = + signals + |> filter_map_spans (fun s -> + if String.equal s.Proto.Trace.name name then + Some s + else + None) + |> List.length + +let filter_map_metrics f signals = + signals + |> List.filter_map (function + | `Log _ | `Trace _ -> None + | `Metric (r : Proto.Metrics.resource_metrics) -> + r.scope_metrics + |> List.find_map (fun ss -> + ss.Proto.Metrics.metrics |> List.find_map f)) + +let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float + = function + | Proto.Metrics.As_double f -> f + | Proto.Metrics.As_int i64 -> Int64.to_float i64 + +let get_metric_values name signals = + signals + |> filter_map_metrics (fun (m : Proto.Metrics.metric) -> + if not (String.equal m.name name) then + None + else + Option.some + @@ + match m.data with + | Sum { data_points; is_monotonic = true; _ } -> + List.fold_left + (fun acc (p : Proto.Metrics.number_data_point) -> + acc +. number_data_point_to_float p.value) + 0. data_points + | _ -> failwith "TODO: Support for getting other metrics") + +let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list = + signals + |> List.filter_map (function + | `Metric _ | `Trace _ -> None + | `Log (r : Proto.Logs.resource_logs) -> + r.scope_logs + |> List.find_map (fun ss -> + ss.Proto.Logs.log_records |> List.find_map f)) + +let count_logs_with_body p signals = + signals + |> filter_map_logs (fun (l : Proto.Logs.log_record) -> + if p l.body then + Some () + else + None) + |> List.length + +type params = { + jobs: int; + batch_traces: int; + batch_metrics: int; + batch_logs: int; + iterations: int; +} + +let cmd exec params = + [ + exec; + "-j"; + string_of_int params.jobs; + "--iterations"; + string_of_int params.iterations; + "--batch-traces"; + string_of_int params.batch_traces; + "--batch-metrics"; + string_of_int params.batch_metrics; + "--batch-logs"; + string_of_int params.batch_logs; + ] + +let test name f = Alcotest.test_case name `Quick f + +let tests params signal_batches = + let signals = + signal_batches + |> List.fold_left + (fun acc b -> List.rev_append (signals_from_batch b) acc) + [] + in + [ + (* TODO: What properties of batch sizes does it make sense to test? *) + test "loop.outer spans" (fun () -> + Alcotest.(check' int) + ~msg:"number of occurrences should equal the configured jobs" + ~expected:params.jobs + ~actual:(count_spans_with_name "loop.outer" signals)); + test "loop.inner spans" (fun () -> + Alcotest.(check' int) + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured iterations" + ~expected:(params.jobs * params.iterations) + ~actual:(count_spans_with_name "loop.inner" signals)); + test "alloc spans" (fun () -> + Alcotest.(check' int) + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured iterations" + ~expected:(params.jobs * params.iterations) + ~actual:(count_spans_with_name "alloc" signals); + Alcotest.(check' bool) + ~msg:"should have 'done with alloc' event" ~expected:true + ~actual: + (let all_alloc_events = + signals + |> filter_map_spans (fun s -> + if not (String.equal s.name "alloc") then + Some s.events + else + None) + |> List.flatten + in + all_alloc_events + |> List.for_all (fun (e : Proto.Trace.span_event) -> + String.equal e.name "done with alloc"))); + test "num-sleep metrics" (fun () -> + Alcotest.(check' (float 0.)) + ~msg:"should record jobs * iterations sleeps" + ~expected:(params.jobs * params.iterations |> float_of_int) + ~actual: + (get_metric_values "num-sleep" signals + |> List.sort Float.compare |> List.rev |> List.hd)); + test "logs" (fun () -> + Alcotest.(check' int) + ~msg:"should record jobs * iterations occurrences of 'inner at n'" + ~expected:(params.jobs * params.iterations) + ~actual: + (signals + |> count_logs_with_body (function + | Some (Proto.Common.String_value s) + when String.prefix ~pre:"inner at" s -> + true + | _ -> false))); + ] + +let run_tests cmds = + let suites = + cmds + |> List.map (fun (exec, params) -> + let cmd = cmd exec params in + let name = cmd |> String.concat " " in + let signal_batches = Signal_gatherer.gather_signals cmd in + (* Let server reset *) + Unix.sleep 1; + name, tests params signal_batches) + in + let open Alcotest in + run "Collector integration tests" suites diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune new file mode 100644 index 00000000..706740da --- /dev/null +++ b/tests/client_e2e/dune @@ -0,0 +1,39 @@ +(env + (_ + ; Make the binaries for the test emitters available on the path for the components defined in this dir. + ; See https://dune.readthedocs.io/en/stable/reference/dune/env.html + (binaries + (../bin/emit1.exe as emit1) + (../bin/emit1_cohttp.exe as emit1_cohttp) + (./gather_signals.exe as gather_signals)))) + +(library + (name signal_gatherer) + (modules signal_gatherer) + (libraries + str + alcotest + cohttp-lwt-unix + fmt + unix + containers + logs.fmt + logs.threaded + opentelemetry.client)) + +(library + (name clients_e2e_lib) + (modules clients_e2e_lib) + (libraries alcotest signal_gatherer)) + +(tests + (names test_cottp_lwt_client_e2e) + (modules test_cottp_lwt_client_e2e) + (package opentelemetry-client-cohttp-lwt) + (deps %{bin:emit1_cohttp}) + (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) + +(executable + (name signal_reporter_server) + (modules signal_reporter_server) + (libraries signal_gatherer)) diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml new file mode 100644 index 00000000..dcc44bfd --- /dev/null +++ b/tests/client_e2e/signal_gatherer.ml @@ -0,0 +1,118 @@ +(* A runs tests against a OTel-instrumented program *) + +module Client = Opentelemetry_client +module Signal = Client.Signal +open Lwt.Syntax + +let debug = + match Sys.getenv_opt "DEBUG" with + | Some "1" -> true + | _ -> false + +(* Server to collect telemetry data *) +module Server = struct + let dbg_request kind req pp data : unit Lwt.t = + if debug then ( + let _ = kind, req, pp, data in + let req : string = Format.asprintf "%a" Http.Request.pp req in + let data_s : string = Format.asprintf "%a" pp data in + Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@." + kind req data_s + ) else + Lwt.return_unit + + let handler push_signal _socket (request : Http.Request.t) + (body : Cohttp_lwt.Body.t) = + let* data = Cohttp_lwt.Body.to_string body in + let* status, signal = + match Http.Request.resource request with + | "/v1/traces" -> + let traces = Signal.Decode.traces data in + let+ () = dbg_request "trace" request Signal.Pp.traces traces in + `OK, Some (Signal.Traces traces) + | "/v1/metrics" -> + let metrics = Signal.Decode.metrics data in + let+ () = dbg_request "metrics" request Signal.Pp.metrics metrics in + `OK, Some (Signal.Metrics metrics) + | "/v1/logs" -> + let logs = Signal.Decode.logs data in + let+ () = dbg_request "logs" request Signal.Pp.logs logs in + `OK, Some (Signal.Logs logs) + | unexepected -> + let+ () = Lwt_io.eprintf "unexpected endpoint %s\n" unexepected in + `Not_found, None + in + push_signal signal; + let resp_body = Cohttp_lwt.Body.of_string "" in + Cohttp_lwt_unix.Server.respond ~status ~body:resp_body () + + let run port push_signals = + let* () = Lwt_io.eprintf "starting server\n" in + Cohttp_lwt_unix.Server.( + make ~callback:(handler push_signals) () + |> create ~mode:(`TCP (`Port port))) +end + +(** Manage launching and cleaning up the program we are testing *) +module Tested_program = struct + let validate_exit = function + | Unix.WEXITED 0 -> () + | Unix.WEXITED bad_code -> + failwith + @@ Printf.sprintf "process under test ended with bad exit code %d" + bad_code + | Unix.WSIGNALED i -> + failwith + @@ Printf.sprintf "process under test ended with unexpected signal %d" i + | Unix.WSTOPPED i -> + failwith + @@ Printf.sprintf "process under test ended with unexpected stop %d" i + + let run program_to_test = + let redirect = `FD_copy Unix.stderr in + let cmd = "", Array.of_list program_to_test in + (* Give server time to be online *) + let* () = Lwt_unix.sleep 0.5 in + let* () = + Lwt_io.eprintf "running command: %s\n" + (Format.asprintf "%a" + (Format.pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt " ") + Format.pp_print_string) + program_to_test) + in + let* result = Lwt_process.exec ~stdout:redirect cmd in + (* Give server time process signals *) + let+ () = Lwt_unix.sleep 0.5 in + validate_exit result +end + +let default_port = + String.split_on_char ':' Client.Config.default_url |> function + (* Extracting the port from 'http://foo:' *) + | [ _; _; port ] -> int_of_string port + | _ -> failwith "unexpected format in Client.Config.default_url" + +let gather_signals ?(port = default_port) program_to_test = + Lwt_main.run + @@ + let stream, push = Lwt_stream.create () in + let* () = + Lwt.pick [ Server.run port push; Tested_program.run program_to_test ] + in + (* Close out the stream *) + push None; + Lwt_stream.to_list stream + +(* Just run the server, and print the signals gathered. *) +let run ?(port = default_port) () = + Lwt_main.run + @@ + let stream, push = Lwt_stream.create () in + Lwt.join + [ + Server.run port push; + Lwt_stream.iter_s + (fun s -> Format.asprintf "%a" Signal.Pp.pp s |> Lwt_io.printl) + stream; + ] diff --git a/tests/client_e2e/signal_gatherer.mli b/tests/client_e2e/signal_gatherer.mli new file mode 100644 index 00000000..b948d059 --- /dev/null +++ b/tests/client_e2e/signal_gatherer.mli @@ -0,0 +1,21 @@ +(** A test utility for running a signal emitting executable alongside a minimal + server that can receive the signals make them available for inspection. *) + +val gather_signals : + ?port:int -> string list -> Opentelemetry_client.Signal.t list +(** [gather_signals program_to_test] is a list of all the signals emitted by the + [program_to_test], which the server was able to record. This function + assumes that the program to test will be sending its signals to the + localhost on [port]. + + @param port + the port where signals will be received. Default is port set in + {!Opentelemetry_client.Config.default_url}. *) + +val run : ?port:int -> unit -> unit +(** [run ()] runs a signal gathering server and prints all batches of signals + received to stdout. + + @param port + the port where signals will be received. Default is port set in + {!Opentelemetry_client.Config.default_url}. *) diff --git a/tests/client_e2e/signal_reporter_server.ml b/tests/client_e2e/signal_reporter_server.ml new file mode 100644 index 00000000..a7f17708 --- /dev/null +++ b/tests/client_e2e/signal_reporter_server.ml @@ -0,0 +1,4 @@ +(** Runs a signal gatherer server, and prints out every batch of signals + received to stdout. This can be used to monitor the signals sent by an + application, e.g., the test executables defined in /tests/bin/emit1*.ml *) +let () = Signal_gatherer.run () diff --git a/tests/client_e2e/test_cottp_lwt_client_e2e.ml b/tests/client_e2e/test_cottp_lwt_client_e2e.ml new file mode 100644 index 00000000..c093155f --- /dev/null +++ b/tests/client_e2e/test_cottp_lwt_client_e2e.ml @@ -0,0 +1,33 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto +open Clients_e2e_lib + +let () = + Clients_e2e_lib.run_tests + [ + (* TODO: Running with batch-traces = 1 causes deadlocks *) + (* ( "emit1_cohttp", *) + (* { *) + (* jobs = 1; *) + (* iterations = 1; *) + (* batch_traces = 1; *) + (* batch_metrics = 1; *) + (* batch_logs = 1; *) + (* } ); *) + ( "emit1_cohttp", + { + jobs = 1; + iterations = 1; + batch_traces = 2; + batch_metrics = 2; + batch_logs = 2; + } ); + ( "emit1_cohttp", + { + jobs = 3; + iterations = 1; + batch_traces = 400; + batch_metrics = 3; + batch_logs = 400; + } ); + ]