From d3235a1864886e5ef48c55d205f9bd9270eda3ed Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 13:55:26 -0400 Subject: [PATCH 01/16] Remove unneeded encoder reset We reset the encoder if we are reusing one, and we generate a fresh new one otherwise. --- src/client/signal.ml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client/signal.ml b/src/client/signal.ml index 91e9e332..48b95d1d 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -16,7 +16,6 @@ module Converter = struct in let x = ctor resource in let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in - Pbrt.Encoder.reset encoder; enc x encoder; Pbrt.Encoder.to_string encoder From 39920ed10970b6425462d317d1059903dbc6af98 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 17:03:54 -0400 Subject: [PATCH 02/16] Fix Signal encoder name choice Don't know why I didn't opt for this clearer name originally. --- src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml | 7 +++---- src/client-ocurl/opentelemetry_client_ocurl.ml | 9 ++++----- src/client/signal.ml | 2 +- src/client/signal.mli | 8 +++----- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 51b186b6..82c4f65b 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -190,7 +190,6 @@ end let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in let open Lwt.Syntax in - let module Conv = Signal.Converter in (* local helpers *) let open struct let timeout = @@ -228,13 +227,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Lwt_unix.sleep 3. let send_metrics_http client (l : Metrics.resource_metrics list) = - Conv.metrics l |> send_http_ client ~url:config.url_metrics + Signal.Encode.metrics l |> send_http_ client ~url:config.url_metrics let send_traces_http client (l : Trace.resource_spans list) = - Conv.traces l |> send_http_ client ~url:config.url_traces + Signal.Encode.traces l |> send_http_ client ~url:config.url_traces let send_logs_http client (l : Logs.resource_logs list) = - Conv.logs l |> send_http_ client ~url:config.url_logs + Signal.Encode.logs l |> send_http_ client ~url:config.url_logs (* emit metrics, if the batch is full or timeout lapsed *) let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index c0e5eaaa..55d4788c 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -201,20 +201,19 @@ end = struct in conv l |> send_http_ ~stop ~config ~url client in - let module Conv = Signal.Converter in try while not (Atomic.get stop) do let msg = B_queue.pop self.send_q in match msg with | To_send.Send_trace tr -> - send ~name:"send-traces" ~conv:Conv.traces + send ~name:"send-traces" ~conv:Signal.Encode.traces ~url:config.common.url_traces tr | To_send.Send_metric ms -> - send ~name:"send-metrics" ~conv:Conv.metrics + send ~name:"send-metrics" ~conv:Signal.Encode.metrics ~url:config.common.url_metrics ms | To_send.Send_logs logs -> - send ~name:"send-logs" ~conv:Conv.logs ~url:config.common.url_logs - logs + send ~name:"send-logs" ~conv:Signal.Encode.logs + ~url:config.common.url_logs logs done with B_queue.Closed -> () diff --git a/src/client/signal.ml b/src/client/signal.ml index 48b95d1d..64731434 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -5,7 +5,7 @@ module Span = Opentelemetry.Span let ( let@ ) = ( @@ ) -module Converter = struct +module Encode = struct let resource_to_string ~encoder ~ctor ~enc resource = let encoder = match encoder with diff --git a/src/client/signal.mli b/src/client/signal.mli index ef2a6e24..2eb91b6e 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -1,11 +1,9 @@ (** Constructing and managing OTel {{:https://opentelemetry.io/docs/concepts/signals/} signals} *) -(** Convert signals to protobuf encoded strings, ready to be sent over the wire - - NOTE: The converters share an underlying stateful encoder, so each domain or - system thread should have its own [Converter] instance *) -module Converter : sig +(** Encode signals to protobuf encoded strings, ready to be sent over the wire +*) +module Encode : sig val logs : ?encoder:Pbrt.Encoder.t -> Opentelemetry_proto.Logs.resource_logs list -> From 916b962c434f7fbc3330555c6fc10fc8d367cceb Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 17:04:59 -0400 Subject: [PATCH 03/16] Expose default_url in config No reason to keep this value hidden, and we want to reuse it for tests. --- src/client/config.ml | 4 ++-- src/client/config.mli | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/client/config.ml b/src/client/config.ml index 2bdb633d..6a832291 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -36,6 +36,8 @@ let pp out (self : t) : unit = debug self_trace url_traces url_metrics url_logs ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms +let default_url = "http://localhost:4318" + type 'k make = ?debug:bool -> ?url:string -> @@ -73,8 +75,6 @@ module Env () : ENV = struct let set_debug b = debug_ := b - let default_url = "http://localhost:4318" - let make_get_from_env env_name = let value = ref None in fun () -> diff --git a/src/client/config.mli b/src/client/config.mli index f3a4e6ec..b8d0238f 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -38,6 +38,9 @@ type t = private { To build one, use {!make} below. This might be extended with more fields in the future. *) +val default_url : string +(** The default base URL for the config. *) + val pp : Format.formatter -> t -> unit type 'k make = @@ -59,8 +62,8 @@ type 'k make = @param url base url used to construct per-signal urls. Per-signal url options take - precedence over this base url. Default is "http://localhost:4318", or - "OTEL_EXPORTER_OTLP_ENDPOINT" if set. + precedence over this base url. If not provided, this defaults to + "OTEL_EXPORTER_OTLP_ENDPOINT" if set, or if not {!default_url}. Example of constructed per-signal urls with the base url http://localhost:4318 From 33104f231e67d88e212501b15b4dc58c3e17f36c Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 17:05:42 -0400 Subject: [PATCH 04/16] Add Signal.Decode module For testing --- src/client/signal.ml | 19 +++++++++++++++++++ src/client/signal.mli | 18 ++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/src/client/signal.ml b/src/client/signal.ml index 64731434..7dc1cf08 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -42,3 +42,22 @@ module Encode = struct ()) ~enc:Trace_service.encode_pb_export_trace_service_request end + +module Decode = struct + let resource_of_string ~dec s = Pbrt.Decoder.of_string s |> dec + + let logs data = + (resource_of_string ~dec:Logs_service.decode_pb_export_logs_service_request + data) + .resource_logs + + let metrics data = + (resource_of_string + ~dec:Metrics_service.decode_pb_export_metrics_service_request data) + .resource_metrics + + let traces data = + (resource_of_string + ~dec:Trace_service.decode_pb_export_trace_service_request data) + .resource_spans +end diff --git a/src/client/signal.mli b/src/client/signal.mli index 2eb91b6e..3ed188c7 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -27,3 +27,21 @@ module Encode : sig @param encoder provide an encoder state to reuse *) end + +(** Decode signals from protobuf encoded strings, received over the wire *) +module Decode : sig + val logs : string -> Opentelemetry_proto.Logs.resource_logs list + (** [logs ls] is a protobuf encoded string of the logs [ls] + + @param encoder provide an encoder state to reuse *) + + val metrics : string -> Opentelemetry_proto.Metrics.resource_metrics list + (** [metrics ms] is a protobuf encoded string of the metrics [ms] + + @param encoder provide an encoder state to reuse *) + + val traces : string -> Opentelemetry_proto.Trace.resource_spans list + (** [metrics ts] is a protobuf encoded string of the traces [ts] + + @param encoder provide an encoder state to reuse *) +end From 00840e0b88ece395647828ef253c8101c9c18a34 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 18:37:01 -0400 Subject: [PATCH 05/16] Add pretty printer utils These combinators seem tiny, but they simpflify code where they are used quite a lot. --- src/client/signal.ml | 10 ++++++++++ src/client/signal.mli | 13 +++++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/client/signal.ml b/src/client/signal.ml index 7dc1cf08..ebb3e460 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -61,3 +61,13 @@ module Decode = struct ~dec:Trace_service.decode_pb_export_trace_service_request data) .resource_spans end + +module Pp = struct + module Proto = Opentelemetry.Proto + + let logs = Format.pp_print_list Proto.Logs.pp_resource_logs + + let metrics = Format.pp_print_list Proto.Metrics.pp_resource_metrics + + let traces = Format.pp_print_list Proto.Trace.pp_resource_spans +end diff --git a/src/client/signal.mli b/src/client/signal.mli index 3ed188c7..a5c25a27 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -45,3 +45,16 @@ module Decode : sig @param encoder provide an encoder state to reuse *) end + +module Pp : sig + val logs : + Format.formatter -> Opentelemetry_proto.Logs.resource_logs list -> unit + + val metrics : + Format.formatter -> + Opentelemetry_proto.Metrics.resource_metrics list -> + unit + + val traces : + Format.formatter -> Opentelemetry_proto.Trace.resource_spans list -> unit +end From a95b787a7b5bb8ffc509caec1c08b1a742d50f9d Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 22:05:44 -0400 Subject: [PATCH 06/16] Allowing breaking with ctrl-c --- tests/bin/emit1.ml | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index a9baa4d0..093bc888 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -99,7 +99,6 @@ let run () = Array.iter Thread.join jobs let () = - Sys.catch_break true; T.Globals.service_name := "t1"; T.Globals.service_namespace := Some "ocaml-otel.test"; let ts_start = Unix.gettimeofday () in From b6448b330d7922020a2b4d1cfcb779dc64ada1b2 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 22:09:04 -0400 Subject: [PATCH 07/16] Used default_url --- tests/bin/cohttp_client.ml | 3 ++- tests/bin/dune | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/bin/cohttp_client.ml b/tests/bin/cohttp_client.ml index 8cd4dbb0..a4523847 100644 --- a/tests/bin/cohttp_client.ml +++ b/tests/bin/cohttp_client.ml @@ -20,8 +20,9 @@ let run () = in let* () = Lwt_unix.sleep !sleep_outer in let module C = (val mk_client ~scope) in + (* Using the same default server O *) let* _res, body = - C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") + C.get (Uri.of_string Opentelemetry_client.Config.default_url) in let* () = Cohttp_lwt.Body.drain_body body in go () diff --git a/tests/bin/dune b/tests/bin/dune index cb4712fb..84ac795f 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -1,7 +1,11 @@ (executable (name emit1) (modules emit1) - (libraries unix opentelemetry opentelemetry-client-ocurl)) + (libraries + unix + opentelemetry + opentelemetry.client + opentelemetry-client-ocurl)) (executable (name emit1_cohttp) @@ -12,6 +16,7 @@ unix opentelemetry opentelemetry-lwt + opentelemetry.client opentelemetry-client-cohttp-lwt lwt.unix)) From a44e0cd3b559375aba8dc899ad008c03ac9a869a Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 22:14:36 -0400 Subject: [PATCH 08/16] Add common type for signals --- src/client/signal.ml | 9 +++++++-- src/client/signal.mli | 10 ++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/client/signal.ml b/src/client/signal.ml index ebb3e460..c4c3fa11 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -5,6 +5,13 @@ module Span = Opentelemetry.Span let ( let@ ) = ( @@ ) +module Proto = Opentelemetry.Proto + +type t = + | Traces of Proto.Trace.resource_spans list + | Metrics of Proto.Metrics.resource_metrics list + | Logs of Proto.Logs.resource_logs list + module Encode = struct let resource_to_string ~encoder ~ctor ~enc resource = let encoder = @@ -63,8 +70,6 @@ module Decode = struct end module Pp = struct - module Proto = Opentelemetry.Proto - let logs = Format.pp_print_list Proto.Logs.pp_resource_logs let metrics = Format.pp_print_list Proto.Metrics.pp_resource_metrics diff --git a/src/client/signal.mli b/src/client/signal.mli index a5c25a27..7c8f7779 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -1,6 +1,16 @@ (** Constructing and managing OTel {{:https://opentelemetry.io/docs/concepts/signals/} signals} *) +(** The type of signals + + This is not the principle type of signals from the perspective of what gets + encoded and sent via protocl buffers, but it is the principle type that + collector clients needs to reason about. *) +type t = + | Traces of Opentelemetry_proto.Trace.resource_spans list + | Metrics of Opentelemetry_proto.Metrics.resource_metrics list + | Logs of Opentelemetry_proto.Logs.resource_logs list + (** Encode signals to protobuf encoded strings, ready to be sent over the wire *) module Encode : sig From 87cfd5e31ee5075c8df963b66384512eabebda79 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 23 Jun 2025 23:22:59 -0400 Subject: [PATCH 09/16] Add test harness for instrumented applications --- dune-project | 3 +- opentelemetry-cohttp-lwt.opam | 1 + src/client/signal.ml | 42 +++- src/client/signal.mli | 14 ++ tests/bin/emit1.ml | 27 ++- tests/bin/emit1_cohttp.ml | 95 +++++---- tests/client_e2e/clients_e2e_lib.ml | 193 ++++++++++++++++++ tests/client_e2e/dune | 35 ++++ tests/client_e2e/gather_signals.ml | 3 + tests/client_e2e/signal_gatherer.ml | 151 ++++++++++++++ tests/client_e2e/test_cottp_lwt_client_e2e.ml | 33 +++ 11 files changed, 544 insertions(+), 53 deletions(-) create mode 100644 tests/client_e2e/clients_e2e_lib.ml create mode 100644 tests/client_e2e/dune create mode 100644 tests/client_e2e/gather_signals.ml create mode 100644 tests/client_e2e/signal_gatherer.ml create mode 100644 tests/client_e2e/test_cottp_lwt_client_e2e.ml diff --git a/dune-project b/dune-project index ec273074..a28374f2 100644 --- a/dune-project +++ b/dune-project @@ -102,7 +102,8 @@ (>= "5.3")) (cohttp-lwt (>= "6.0.0")) - (alcotest :with-test)) + (alcotest :with-test) + (cohttp-eio :with-test)) (synopsis "Opentelemetry tracing for Cohttp HTTP servers")) (package diff --git a/opentelemetry-cohttp-lwt.opam b/opentelemetry-cohttp-lwt.opam index ad66a0ef..a56e353b 100644 --- a/opentelemetry-cohttp-lwt.opam +++ b/opentelemetry-cohttp-lwt.opam @@ -20,6 +20,7 @@ depends: [ "lwt" {>= "5.3"} "cohttp-lwt" {>= "6.0.0"} "alcotest" {with-test} + "cohttp-eio" {with-test} ] build: [ ["dune" "subst"] {dev} diff --git a/src/client/signal.ml b/src/client/signal.ml index c4c3fa11..7a2eddd5 100644 --- a/src/client/signal.ml +++ b/src/client/signal.ml @@ -12,6 +12,30 @@ type t = | Metrics of Proto.Metrics.resource_metrics list | Logs of Proto.Logs.resource_logs list +let to_traces = function + | Traces xs -> Some xs + | _ -> None + +let to_metrics = function + | Metrics xs -> Some xs + | _ -> None + +let to_logs = function + | Logs xs -> Some xs + | _ -> None + +let is_traces = function + | Traces _ -> true + | _ -> false + +let is_metrics = function + | Metrics _ -> true + | _ -> false + +let is_logs = function + | Logs _ -> true + | _ -> false + module Encode = struct let resource_to_string ~encoder ~ctor ~enc resource = let encoder = @@ -70,9 +94,21 @@ module Decode = struct end module Pp = struct - let logs = Format.pp_print_list Proto.Logs.pp_resource_logs + let pp_sep fmt () = Format.fprintf fmt ",@." - let metrics = Format.pp_print_list Proto.Metrics.pp_resource_metrics + let pp_signal pp fmt t = + Format.fprintf fmt "[@ @["; + Format.pp_print_list ~pp_sep pp fmt t; + Format.fprintf fmt "@ ]@]@." - let traces = Format.pp_print_list Proto.Trace.pp_resource_spans + let logs = pp_signal Proto.Logs.pp_resource_logs + + let metrics = pp_signal Proto.Metrics.pp_resource_metrics + + let traces = pp_signal Proto.Trace.pp_resource_spans + + let pp fmt = function + | Logs ls -> logs fmt ls + | Metrics ms -> metrics fmt ms + | Traces ts -> traces fmt ts end diff --git a/src/client/signal.mli b/src/client/signal.mli index 7c8f7779..cbe252b1 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -11,6 +11,18 @@ type t = | Metrics of Opentelemetry_proto.Metrics.resource_metrics list | Logs of Opentelemetry_proto.Logs.resource_logs list +val to_traces : t -> Opentelemetry_proto.Trace.resource_spans list option + +val to_metrics : t -> Opentelemetry_proto.Metrics.resource_metrics list option + +val to_logs : t -> Opentelemetry_proto.Logs.resource_logs list option + +val is_traces : t -> bool + +val is_metrics : t -> bool + +val is_logs : t -> bool + (** Encode signals to protobuf encoded strings, ready to be sent over the wire *) module Encode : sig @@ -67,4 +79,6 @@ module Pp : sig val traces : Format.formatter -> Opentelemetry_proto.Trace.resource_spans list -> unit + + val pp : Format.formatter -> t -> unit end diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 093bc888..95a4d55b 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -104,6 +104,11 @@ let () = let ts_start = Unix.gettimeofday () in let debug = ref false in + + let batch_traces = ref 400 in + let batch_metrics = ref 3 in + let batch_logs = ref 400 in + let n_bg_threads = ref 0 in let opts = [ @@ -111,6 +116,11 @@ let () = ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); + ( "--batch-metrics", + Arg.Int (( := ) batch_metrics), + " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; @@ -122,15 +132,18 @@ let () = Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + let some_if_nzero r = + if !r > 0 then + Some !r + else + None + in let config = Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true - ?bg_threads: - (let n = !n_bg_threads in - if n = 0 then - None - else - Some n) - () + ?bg_threads:(some_if_nzero n_bg_threads) + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~batch_logs:(some_if_nzero batch_logs) () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 1558b6b9..59acf285 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -12,6 +12,8 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 +let iterations = ref 1 + let num_sleep = Atomic.make 0 let stress_alloc_ = ref true @@ -20,57 +22,63 @@ let stop = Atomic.make false let num_tr = Atomic.make 0 -let run_job () : unit Lwt.t = - let i = ref 0 in +(* Counter used to mark simulated failures *) +let i = ref 0 + +let run_job job_id : unit Lwt.t = while%lwt not @@ Atomic.get stop do let@ scope = Atomic.incr num_tr; T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int !i ] + ~attrs:[ "i", `Int job_id ] in - for%lwt j = 0 to 4 do - (* parent scope is found via thread local storage *) - let@ scope = - Atomic.incr num_tr; - T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal - ~attrs:[ "j", `Int j ] - "loop.inner" - in - - let* () = Lwt_unix.sleep !sleep_outer in - Atomic.incr num_sleep; - - T.Logs.( - emit - [ - make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id - ~severity:Severity_number_info "inner at %d" j; - ]); - - incr i; - - try%lwt - Atomic.incr num_tr; + for%lwt j = 0 to !iterations do + if j >= !iterations then + (* Terminate program, having reached our max iterations *) + Lwt.return @@ Atomic.set stop true + else + (* parent scope is found via thread local storage *) let@ scope = - T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" + Atomic.incr num_tr; + T.Trace.with_ ~scope ~kind:T.Span.Span_kind_internal + ~attrs:[ "j", `Int j ] + "loop.inner" in - (* allocate some stuff *) - if !stress_alloc_ then ( - let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in - ignore _arr - ); - let* () = Lwt_unix.sleep !sleep_inner in + let* () = Lwt_unix.sleep !sleep_outer in Atomic.incr num_sleep; - if j = 4 && !i mod 13 = 0 then failwith "oh no"; + T.Logs.( + emit + [ + make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id + ~severity:Severity_number_info "inner at %d" j; + ]); - (* simulate a failure *) - Opentelemetry.Scope.add_event scope (fun () -> - T.Event.make "done with alloc"); - Lwt.return () - with Failure _ -> Lwt.return () + incr i; + + try%lwt + Atomic.incr num_tr; + let@ scope = + T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" + in + (* allocate some stuff *) + if !stress_alloc_ then ( + let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in + ignore _arr + ); + + let* () = Lwt_unix.sleep !sleep_inner in + Atomic.incr num_sleep; + + (* simulate a failure *) + if j = 4 && !i mod 13 = 0 then failwith "oh no"; + + Opentelemetry.Scope.add_event scope (fun () -> + T.Event.make "done with alloc"); + Lwt.return () + with Failure _ -> Lwt.return () done done @@ -87,7 +95,7 @@ let run () : unit Lwt.t = let n_jobs = max 1 !n_jobs in Printf.printf "run %d jobs\n%!" n_jobs; - let jobs = Array.init n_jobs (fun _ -> run_job ()) |> Array.to_list in + let jobs = List.init n_jobs run_job in Lwt.join jobs let () = @@ -99,18 +107,21 @@ let () = let debug = ref false in let batch_traces = ref 400 in let batch_metrics = ref 3 in + let batch_logs = ref 400 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); - "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; ( "--batch-metrics", Arg.Int (( := ) batch_metrics), " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + "--iterations", Arg.Set_int iterations, " the number of iterations to run"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; ] |> Arg.align @@ -128,7 +139,7 @@ let () = Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) - () + ~batch_logs:(some_if_nzero batch_logs) () in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_cohttp_lwt.Config.pp config; diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml new file mode 100644 index 00000000..81f4695c --- /dev/null +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -0,0 +1,193 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto + +let batch_size : Client.Signal.t -> int = function + | Traces ts -> List.length ts + | Logs ls -> List.length ls + | Metrics ms -> List.length ms + +let avg_batch_size (p : Client.Signal.t -> bool) + (batches : Client.Signal.t list) : int = + let sum = + List.fold_left + (fun acc b -> + if p b then + acc + batch_size b + else + acc) + 0 batches + in + sum / List.length batches + +let signals_from_batch (signal_batch : Client.Signal.t) = + match signal_batch with + | Traces ts -> List.map (fun t -> `Trace t) ts + | Logs ls -> List.map (fun l -> `Log l) ls + | Metrics ms -> List.map (fun m -> `Metric m) ms + +let filter_map_spans f signals = + signals + |> List.filter_map (function + | `Log _ | `Metric _ -> None + | `Trace (r : Proto.Trace.resource_spans) -> + r.scope_spans + |> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f)) + +let count_spans_with_name name signals = + signals + |> filter_map_spans (fun s -> + if s.Proto.Trace.name = name then + Some s + else + None) + |> List.length + +let filter_map_metrics f signals = + signals + |> List.filter_map (function + | `Log _ | `Trace _ -> None + | `Metric (r : Proto.Metrics.resource_metrics) -> + r.scope_metrics + |> List.find_map (fun ss -> + ss.Proto.Metrics.metrics |> List.find_map f)) + +let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float + = function + | Proto.Metrics.As_double f -> f + | Proto.Metrics.As_int i64 -> Int64.to_float i64 + +let get_metric_values name signals = + signals + |> filter_map_metrics (fun (m : Proto.Metrics.metric) -> + if m.name <> name then + None + else + Option.some + @@ + match m.data with + | Sum { data_points; is_monotonic = true; _ } -> + List.fold_left + (fun acc (p : Proto.Metrics.number_data_point) -> + acc +. number_data_point_to_float p.value) + 0. data_points + | _ -> failwith "TODO: Support for getting other metrics") + +let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list = + signals + |> List.filter_map (function + | `Metric _ | `Trace _ -> None + | `Log (r : Proto.Logs.resource_logs) -> + r.scope_logs + |> List.find_map (fun ss -> + ss.Proto.Logs.log_records |> List.find_map f)) + +let count_logs_with_body p signals = + signals + |> filter_map_logs (fun (l : Proto.Logs.log_record) -> + if p l.body then + Some () + else + None) + |> List.length + +type params = { + jobs: int; + batch_traces: int; + batch_metrics: int; + batch_logs: int; + iterations: int; +} + +let cmd exec params = + [ + exec; + "-j"; + string_of_int params.jobs; + "--iterations"; + string_of_int params.iterations; + "--batch-traces"; + string_of_int params.batch_traces; + "--batch-metrics"; + string_of_int params.batch_metrics; + "--batch-logs"; + string_of_int params.batch_logs; + ] + +let test name f = Alcotest.test_case name `Quick f + +let tests params signal_batches = + let signals = + signal_batches + |> List.fold_left + (fun acc b -> List.rev_append (signals_from_batch b) acc) + [] + in + [ + (* TODO: What properties of batch sizes does it make sense to test? *) + test "loop.outer spans" (fun () -> + Alcotest.(check' int) + ~msg:"number of occurrences should equal the configured jobs" + ~expected:params.jobs + ~actual:(count_spans_with_name "loop.outer" signals)); + test "loop.inner spans" (fun () -> + Alcotest.(check' int) + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured iterations" + ~expected:(params.jobs * params.iterations) + ~actual:(count_spans_with_name "loop.inner" signals)); + test "alloc spans" (fun () -> + Alcotest.(check' int) + ~msg: + "number of occurrences should equal the configured jobs * the \ + configured iterations" + ~expected:(params.jobs * params.iterations) + ~actual:(count_spans_with_name "alloc" signals); + Alcotest.(check' bool) + ~msg:"should have 'done with alloc' event" ~expected:true + ~actual: + (let all_alloc_events = + signals + |> filter_map_spans (fun s -> + if s.name <> "alloc" then + Some s.events + else + None) + |> List.flatten + in + all_alloc_events + |> List.for_all (fun (e : Proto.Trace.span_event) -> + e.name = "done with alloc"))); + test "num-sleep metrics" (fun () -> + Alcotest.(check' (float 0.)) + ~msg:"should record jobs * iterations sleeps" + ~expected:(params.jobs * params.iterations |> float_of_int) + ~actual: + (get_metric_values "num-sleep" signals + |> List.sort Float.compare |> List.rev |> List.hd)); + test "logs" (fun () -> + Alcotest.(check' int) + ~msg:"should record jobs * iterations occurrences of 'inner at n'" + ~expected:(params.jobs * params.iterations) + ~actual: + (signals + |> count_logs_with_body (function + | Some (Proto.Common.String_value s) + when String.starts_with ~prefix:"inner at" s -> + true + | _ -> false))); + ] + +let run_tests cmds = + let suites = + cmds + |> List.map (fun (exec, params) -> + let cmd = cmd exec params in + let name = cmd |> String.concat " " in + let signal_batches = Signal_gatherer.gather_signals cmd in + (* Let server reset *) + Unix.sleep 1; + name, tests params signal_batches) + in + let open Alcotest in + run "Collector integration tests" suites diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune new file mode 100644 index 00000000..45c234cb --- /dev/null +++ b/tests/client_e2e/dune @@ -0,0 +1,35 @@ +(env + (_ + ; Make the binaries for the test emitters available on the path for the components defined in this dir. + ; See https://dune.readthedocs.io/en/stable/reference/dune/env.html + (binaries + (../bin/emit1.exe as emit1) + (../bin/emit1_cohttp.exe as emit1_cohttp) + (./gather_signals.exe as gather_signals)))) + +(library + (name signal_gatherer) + (modules signal_gatherer) + (libraries + str + alcotest + cohttp-lwt-unix + fmt + unix + logs.fmt + logs.threaded + opentelemetry.client)) + +(library + (name clients_e2e_lib) + (modules clients_e2e_lib) + (libraries alcotest signal_gatherer)) + +(tests + (names test_cottp_lwt_client_e2e) + (modules test_cottp_lwt_client_e2e) + (package opentelemetry-client-cohttp-lwt) + (deps %{bin:emit1_cohttp}) + (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) + +; TODO : Add tests for ocurl's emit1 diff --git a/tests/client_e2e/gather_signals.ml b/tests/client_e2e/gather_signals.ml new file mode 100644 index 00000000..1a5360ff --- /dev/null +++ b/tests/client_e2e/gather_signals.ml @@ -0,0 +1,3 @@ +let () = + let program_to_test = Sys.argv |> Array.to_list |> List.tl in + Signal_gatherer.run ~program_to_test () diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml new file mode 100644 index 00000000..adf87ce7 --- /dev/null +++ b/tests/client_e2e/signal_gatherer.ml @@ -0,0 +1,151 @@ +(* A runs tests against a OTel-instrumented program *) + +module Client = Opentelemetry_client +module Signal = Client.Signal +module Proto = Opentelemetry.Proto +open Lwt.Syntax + +(* Server to collect telemetry data *) +module Server = struct + let dbg_request kind req pp data : unit Lwt.t = + let _ = kind, req, pp, data in + (* NOTE: Uncomment for debugging *) + (* let* () = *) + (* let req : string = Format.asprintf "%a" Http.Request.pp req in *) + (* let data_s : string = Format.asprintf "%a" pp data in *) + (* Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@." *) + (* kind req data_s *) + (* in *) + Lwt.return () + + let metrics req data = + let metrics = Signal.Decode.metrics data in + let+ () = dbg_request "metrics" req Signal.Pp.metrics metrics in + Signal.Metrics metrics + + let handler push_signal _socket (request : Http.Request.t) + (body : Cohttp_lwt.Body.t) = + let* data = Cohttp_lwt.Body.to_string body in + let* status, signal = + match Http.Request.resource request with + | "/v1/traces" -> + let traces = Signal.Decode.traces data in + let+ () = dbg_request "trace" request Signal.Pp.traces traces in + `OK, Some (Signal.Traces traces) + | "/v1/metrics" -> + let metrics = Signal.Decode.metrics data in + let+ () = dbg_request "metrics" request Signal.Pp.metrics metrics in + `OK, Some (Signal.Metrics metrics) + | "/v1/logs" -> + let logs = Signal.Decode.logs data in + let+ () = dbg_request "logs" request Signal.Pp.logs logs in + `OK, Some (Signal.Logs logs) + | unexepected -> + let+ () = Lwt_io.eprintf "unexpected endpoint %s\n" unexepected in + `Not_found, None + in + push_signal signal; + let resp_body = Cohttp_lwt.Body.of_string "" in + Cohttp_lwt_unix.Server.respond ~status ~body:resp_body () + + let run port push_signals = + let* () = Lwt_io.eprintf "starting server\n" in + Cohttp_lwt_unix.Server.( + make ~callback:(handler push_signals) () + |> create ~mode:(`TCP (`Port port))) +end + +(** Manage launching and cleaning up the program we are testing *) +module Tested_program = struct + let validate_exit = function + | Unix.WEXITED 0 -> () + | Unix.WEXITED bad_code -> + failwith + @@ Printf.sprintf "process under test ended with bad exit code %d" + bad_code + | Unix.WSIGNALED i -> + failwith + @@ Printf.sprintf "process under test ended with unexpected signal %d" i + | Unix.WSTOPPED i -> + failwith + @@ Printf.sprintf "process under test ended with unexpected stop %d" i + + let run program_to_test = + let redirect = `FD_copy Unix.stderr in + let cmd = "", Array.of_list program_to_test in + (* Give server time to be online *) + let* () = Lwt_unix.sleep 0.5 in + let* () = + Lwt_io.eprintf "running command: %s\n" + (Format.asprintf "%a" + (Format.pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt " ") + Format.pp_print_string) + program_to_test) + in + let* result = Lwt_process.exec ~stdout:redirect cmd in + (* Give server time process signals *) + let+ () = Lwt_unix.sleep 0.5 in + validate_exit result +end + +let collect_traces ~port program_to_test push_signals () = + let* () = + Lwt.pick + [ Server.run port push_signals; Tested_program.run program_to_test ] + in + (* Let the tester know all the signals have be sent *) + Lwt.return (push_signals None) + +let normalize_scope_span : Proto.Trace.scope_spans -> Proto.Trace.scope_spans = + function + | scope_span -> + { + scope_span with + spans = + scope_span.spans + |> List.map (fun (span : Proto.Trace.span) -> + { + span with + start_time_unix_nano = -1L; + end_time_unix_nano = -1L; + }); + } + +let normalize_signal : Signal.t -> Signal.t = function + | Traces ts -> + Traces + (ts + |> List.map (fun (trace : Proto.Trace.resource_spans) -> + { + trace with + scope_spans = trace.scope_spans |> List.map normalize_scope_span; + })) + | x -> x + +(* normalize trace output by redacting non-deterministic values from output *) +let normalize = + let re = + Str.regexp + {|\(start_time_unix_nano\|time_unix_nano\|end_time_unix_nano\|value\) = \([0-9]*\|As_int([0-9]*)\|As_double([0-9]*\.)\);|} + in + fun s -> Str.global_replace re {|\1 = ;|} s + +let default_port = + String.split_on_char ':' Client.Config.default_url |> function + (* Extracting the port from 'http://foo:' *) + | [ _; _; port ] -> int_of_string port + | _ -> failwith "unexpected format in Client.Config.default_url" + +let gather_signals ?(port = default_port) program_to_test = + Lwt_main.run + @@ + let stream, push = Lwt_stream.create () in + let* () = collect_traces ~port program_to_test push () in + Lwt_stream.to_list stream + +let run ?(port = default_port) ~program_to_test () = + gather_signals ~port program_to_test + |> List.map (fun s -> s |> Format.asprintf "%a" Signal.Pp.pp |> normalize) + |> List.stable_sort String.compare (* Produce a deterministic order *) + |> List.iter print_string diff --git a/tests/client_e2e/test_cottp_lwt_client_e2e.ml b/tests/client_e2e/test_cottp_lwt_client_e2e.ml new file mode 100644 index 00000000..c093155f --- /dev/null +++ b/tests/client_e2e/test_cottp_lwt_client_e2e.ml @@ -0,0 +1,33 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto +open Clients_e2e_lib + +let () = + Clients_e2e_lib.run_tests + [ + (* TODO: Running with batch-traces = 1 causes deadlocks *) + (* ( "emit1_cohttp", *) + (* { *) + (* jobs = 1; *) + (* iterations = 1; *) + (* batch_traces = 1; *) + (* batch_metrics = 1; *) + (* batch_logs = 1; *) + (* } ); *) + ( "emit1_cohttp", + { + jobs = 1; + iterations = 1; + batch_traces = 2; + batch_metrics = 2; + batch_logs = 2; + } ); + ( "emit1_cohttp", + { + jobs = 3; + iterations = 1; + batch_traces = 400; + batch_metrics = 3; + batch_logs = 400; + } ); + ] From ef5d7af3e7fa78f41f91a9bc4ddfaec4c039029d Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Tue, 8 Jul 2025 21:32:28 -0400 Subject: [PATCH 10/16] Remove unneeded dependency --- dune-project | 3 +-- opentelemetry-cohttp-lwt.opam | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dune-project b/dune-project index a28374f2..ec273074 100644 --- a/dune-project +++ b/dune-project @@ -102,8 +102,7 @@ (>= "5.3")) (cohttp-lwt (>= "6.0.0")) - (alcotest :with-test) - (cohttp-eio :with-test)) + (alcotest :with-test)) (synopsis "Opentelemetry tracing for Cohttp HTTP servers")) (package diff --git a/opentelemetry-cohttp-lwt.opam b/opentelemetry-cohttp-lwt.opam index a56e353b..ad66a0ef 100644 --- a/opentelemetry-cohttp-lwt.opam +++ b/opentelemetry-cohttp-lwt.opam @@ -20,7 +20,6 @@ depends: [ "lwt" {>= "5.3"} "cohttp-lwt" {>= "6.0.0"} "alcotest" {with-test} - "cohttp-eio" {with-test} ] build: [ ["dune" "subst"] {dev} From 5bf8eea5f1b57806a74a20bc1fb3ed45c6b881bf Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Tue, 8 Jul 2025 21:44:04 -0400 Subject: [PATCH 11/16] Define find_map for 4.08 compat --- tests/client_e2e/clients_e2e_lib.ml | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml index 81f4695c..d7752e2a 100644 --- a/tests/client_e2e/clients_e2e_lib.ml +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -25,13 +25,22 @@ let signals_from_batch (signal_batch : Client.Signal.t) = | Logs ls -> List.map (fun l -> `Log l) ls | Metrics ms -> List.map (fun m -> `Metric m) ms +(* For backwards compat with OCaml 4.08. + Copied from the standard library. *) +let rec find_map f = function + | [] -> None + | x :: l -> + (match f x with + | Some _ as result -> result + | None -> find_map f l) + let filter_map_spans f signals = signals |> List.filter_map (function | `Log _ | `Metric _ -> None | `Trace (r : Proto.Trace.resource_spans) -> r.scope_spans - |> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f)) + |> find_map (fun ss -> ss.Proto.Trace.spans |> find_map f)) let count_spans_with_name name signals = signals @@ -48,8 +57,7 @@ let filter_map_metrics f signals = | `Log _ | `Trace _ -> None | `Metric (r : Proto.Metrics.resource_metrics) -> r.scope_metrics - |> List.find_map (fun ss -> - ss.Proto.Metrics.metrics |> List.find_map f)) + |> find_map (fun ss -> ss.Proto.Metrics.metrics |> find_map f)) let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float = function @@ -78,8 +86,7 @@ let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list = | `Metric _ | `Trace _ -> None | `Log (r : Proto.Logs.resource_logs) -> r.scope_logs - |> List.find_map (fun ss -> - ss.Proto.Logs.log_records |> List.find_map f)) + |> find_map (fun ss -> ss.Proto.Logs.log_records |> find_map f)) let count_logs_with_body p signals = signals From 16daccb6df8b6f04ee4e1443ebfdad88094c30e3 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Thu, 10 Jul 2025 15:55:16 -0400 Subject: [PATCH 12/16] Remove cruft from old testing method Also document the signal reporter executable --- tests/client_e2e/dune | 5 ++- tests/client_e2e/gather_signals.ml | 3 -- tests/client_e2e/signal_gatherer.ml | 51 +++++----------------- tests/client_e2e/signal_reporter_server.ml | 4 ++ 4 files changed, 20 insertions(+), 43 deletions(-) delete mode 100644 tests/client_e2e/gather_signals.ml create mode 100644 tests/client_e2e/signal_reporter_server.ml diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune index 45c234cb..142b6630 100644 --- a/tests/client_e2e/dune +++ b/tests/client_e2e/dune @@ -32,4 +32,7 @@ (deps %{bin:emit1_cohttp}) (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) -; TODO : Add tests for ocurl's emit1 +(executable + (name signal_reporter_server) + (modules signal_reporter_server) + (libraries signal_gatherer)) diff --git a/tests/client_e2e/gather_signals.ml b/tests/client_e2e/gather_signals.ml deleted file mode 100644 index 1a5360ff..00000000 --- a/tests/client_e2e/gather_signals.ml +++ /dev/null @@ -1,3 +0,0 @@ -let () = - let program_to_test = Sys.argv |> Array.to_list |> List.tl in - Signal_gatherer.run ~program_to_test () diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml index adf87ce7..ac4e6c26 100644 --- a/tests/client_e2e/signal_gatherer.ml +++ b/tests/client_e2e/signal_gatherer.ml @@ -97,40 +97,6 @@ let collect_traces ~port program_to_test push_signals () = (* Let the tester know all the signals have be sent *) Lwt.return (push_signals None) -let normalize_scope_span : Proto.Trace.scope_spans -> Proto.Trace.scope_spans = - function - | scope_span -> - { - scope_span with - spans = - scope_span.spans - |> List.map (fun (span : Proto.Trace.span) -> - { - span with - start_time_unix_nano = -1L; - end_time_unix_nano = -1L; - }); - } - -let normalize_signal : Signal.t -> Signal.t = function - | Traces ts -> - Traces - (ts - |> List.map (fun (trace : Proto.Trace.resource_spans) -> - { - trace with - scope_spans = trace.scope_spans |> List.map normalize_scope_span; - })) - | x -> x - -(* normalize trace output by redacting non-deterministic values from output *) -let normalize = - let re = - Str.regexp - {|\(start_time_unix_nano\|time_unix_nano\|end_time_unix_nano\|value\) = \([0-9]*\|As_int([0-9]*)\|As_double([0-9]*\.)\);|} - in - fun s -> Str.global_replace re {|\1 = ;|} s - let default_port = String.split_on_char ':' Client.Config.default_url |> function (* Extracting the port from 'http://foo:' *) @@ -144,8 +110,15 @@ let gather_signals ?(port = default_port) program_to_test = let* () = collect_traces ~port program_to_test push () in Lwt_stream.to_list stream -let run ?(port = default_port) ~program_to_test () = - gather_signals ~port program_to_test - |> List.map (fun s -> s |> Format.asprintf "%a" Signal.Pp.pp |> normalize) - |> List.stable_sort String.compare (* Produce a deterministic order *) - |> List.iter print_string +(* Just run the server, and print the signals gathered. *) +let run ?(port = default_port) () = + Lwt_main.run + @@ + let stream, push = Lwt_stream.create () in + Lwt.join + [ + Server.run port push; + Lwt_stream.iter_s + (fun s -> Format.asprintf "%a" Signal.Pp.pp s |> Lwt_io.printl) + stream; + ] diff --git a/tests/client_e2e/signal_reporter_server.ml b/tests/client_e2e/signal_reporter_server.ml new file mode 100644 index 00000000..a7f17708 --- /dev/null +++ b/tests/client_e2e/signal_reporter_server.ml @@ -0,0 +1,4 @@ +(** Runs a signal gatherer server, and prints out every batch of signals + received to stdout. This can be used to monitor the signals sent by an + application, e.g., the test executables defined in /tests/bin/emit1*.ml *) +let () = Signal_gatherer.run () From 8288bcb59b82748ee2c8da4df379960c3bd410a2 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Thu, 10 Jul 2025 16:02:15 -0400 Subject: [PATCH 13/16] Use containers in tests To get access to useful functions that are not in the Stdlib in OCaml 4.08. --- dune-project | 3 ++- opentelemetry-client-cohttp-lwt.opam | 1 + tests/client_e2e/clients_e2e_lib.ml | 28 +++++++++++----------------- tests/client_e2e/dune | 1 + 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/dune-project b/dune-project index ec273074..8dd272eb 100644 --- a/dune-project +++ b/dune-project @@ -122,5 +122,6 @@ (>= "2.0")) cohttp-lwt cohttp-lwt-unix - (alcotest :with-test)) + (alcotest :with-test) + (containers :with-test)) (synopsis "Collector client for opentelemetry, using cohttp + lwt")) diff --git a/opentelemetry-client-cohttp-lwt.opam b/opentelemetry-client-cohttp-lwt.opam index 1a07ec45..988ad44a 100644 --- a/opentelemetry-client-cohttp-lwt.opam +++ b/opentelemetry-client-cohttp-lwt.opam @@ -22,6 +22,7 @@ depends: [ "cohttp-lwt" "cohttp-lwt-unix" "alcotest" {with-test} + "containers" {with-test} ] build: [ ["dune" "subst"] {dev} diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml index d7752e2a..aa79e84a 100644 --- a/tests/client_e2e/clients_e2e_lib.ml +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -1,5 +1,6 @@ module Client = Opentelemetry_client module Proto = Opentelemetry.Proto +open Containers let batch_size : Client.Signal.t -> int = function | Traces ts -> List.length ts @@ -25,27 +26,18 @@ let signals_from_batch (signal_batch : Client.Signal.t) = | Logs ls -> List.map (fun l -> `Log l) ls | Metrics ms -> List.map (fun m -> `Metric m) ms -(* For backwards compat with OCaml 4.08. - Copied from the standard library. *) -let rec find_map f = function - | [] -> None - | x :: l -> - (match f x with - | Some _ as result -> result - | None -> find_map f l) - let filter_map_spans f signals = signals |> List.filter_map (function | `Log _ | `Metric _ -> None | `Trace (r : Proto.Trace.resource_spans) -> r.scope_spans - |> find_map (fun ss -> ss.Proto.Trace.spans |> find_map f)) + |> List.find_map (fun ss -> ss.Proto.Trace.spans |> List.find_map f)) let count_spans_with_name name signals = signals |> filter_map_spans (fun s -> - if s.Proto.Trace.name = name then + if String.equal s.Proto.Trace.name name then Some s else None) @@ -57,7 +49,8 @@ let filter_map_metrics f signals = | `Log _ | `Trace _ -> None | `Metric (r : Proto.Metrics.resource_metrics) -> r.scope_metrics - |> find_map (fun ss -> ss.Proto.Metrics.metrics |> find_map f)) + |> List.find_map (fun ss -> + ss.Proto.Metrics.metrics |> List.find_map f)) let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float = function @@ -67,7 +60,7 @@ let number_data_point_to_float : Proto.Metrics.number_data_point_value -> float let get_metric_values name signals = signals |> filter_map_metrics (fun (m : Proto.Metrics.metric) -> - if m.name <> name then + if not (String.equal m.name name) then None else Option.some @@ -86,7 +79,8 @@ let filter_map_logs (f : Proto.Logs.log_record -> 'a option) signals : 'a list = | `Metric _ | `Trace _ -> None | `Log (r : Proto.Logs.resource_logs) -> r.scope_logs - |> find_map (fun ss -> ss.Proto.Logs.log_records |> find_map f)) + |> List.find_map (fun ss -> + ss.Proto.Logs.log_records |> List.find_map f)) let count_logs_with_body p signals = signals @@ -156,7 +150,7 @@ let tests params signal_batches = (let all_alloc_events = signals |> filter_map_spans (fun s -> - if s.name <> "alloc" then + if not (String.equal s.name "alloc") then Some s.events else None) @@ -164,7 +158,7 @@ let tests params signal_batches = in all_alloc_events |> List.for_all (fun (e : Proto.Trace.span_event) -> - e.name = "done with alloc"))); + String.equal e.name "done with alloc"))); test "num-sleep metrics" (fun () -> Alcotest.(check' (float 0.)) ~msg:"should record jobs * iterations sleeps" @@ -180,7 +174,7 @@ let tests params signal_batches = (signals |> count_logs_with_body (function | Some (Proto.Common.String_value s) - when String.starts_with ~prefix:"inner at" s -> + when String.prefix ~pre:"inner at" s -> true | _ -> false))); ] diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune index 142b6630..706740da 100644 --- a/tests/client_e2e/dune +++ b/tests/client_e2e/dune @@ -16,6 +16,7 @@ cohttp-lwt-unix fmt unix + containers logs.fmt logs.threaded opentelemetry.client)) From f1b7a2237c8294cceb15cc4621787a15923356b9 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Fri, 11 Jul 2025 13:48:40 -0400 Subject: [PATCH 14/16] Specify and document the Signal_gatherer API --- tests/client_e2e/signal_gatherer.ml | 20 +++++--------------- tests/client_e2e/signal_gatherer.mli | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 15 deletions(-) create mode 100644 tests/client_e2e/signal_gatherer.mli diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml index ac4e6c26..41e98eef 100644 --- a/tests/client_e2e/signal_gatherer.ml +++ b/tests/client_e2e/signal_gatherer.ml @@ -2,7 +2,6 @@ module Client = Opentelemetry_client module Signal = Client.Signal -module Proto = Opentelemetry.Proto open Lwt.Syntax (* Server to collect telemetry data *) @@ -18,11 +17,6 @@ module Server = struct (* in *) Lwt.return () - let metrics req data = - let metrics = Signal.Decode.metrics data in - let+ () = dbg_request "metrics" req Signal.Pp.metrics metrics in - Signal.Metrics metrics - let handler push_signal _socket (request : Http.Request.t) (body : Cohttp_lwt.Body.t) = let* data = Cohttp_lwt.Body.to_string body in @@ -89,14 +83,6 @@ module Tested_program = struct validate_exit result end -let collect_traces ~port program_to_test push_signals () = - let* () = - Lwt.pick - [ Server.run port push_signals; Tested_program.run program_to_test ] - in - (* Let the tester know all the signals have be sent *) - Lwt.return (push_signals None) - let default_port = String.split_on_char ':' Client.Config.default_url |> function (* Extracting the port from 'http://foo:' *) @@ -107,7 +93,11 @@ let gather_signals ?(port = default_port) program_to_test = Lwt_main.run @@ let stream, push = Lwt_stream.create () in - let* () = collect_traces ~port program_to_test push () in + let* () = + Lwt.pick [ Server.run port push; Tested_program.run program_to_test ] + in + (* Close out the stream *) + push None; Lwt_stream.to_list stream (* Just run the server, and print the signals gathered. *) diff --git a/tests/client_e2e/signal_gatherer.mli b/tests/client_e2e/signal_gatherer.mli new file mode 100644 index 00000000..b948d059 --- /dev/null +++ b/tests/client_e2e/signal_gatherer.mli @@ -0,0 +1,21 @@ +(** A test utility for running a signal emitting executable alongside a minimal + server that can receive the signals make them available for inspection. *) + +val gather_signals : + ?port:int -> string list -> Opentelemetry_client.Signal.t list +(** [gather_signals program_to_test] is a list of all the signals emitted by the + [program_to_test], which the server was able to record. This function + assumes that the program to test will be sending its signals to the + localhost on [port]. + + @param port + the port where signals will be received. Default is port set in + {!Opentelemetry_client.Config.default_url}. *) + +val run : ?port:int -> unit -> unit +(** [run ()] runs a signal gathering server and prints all batches of signals + received to stdout. + + @param port + the port where signals will be received. Default is port set in + {!Opentelemetry_client.Config.default_url}. *) From 7cd3d0321badf9f29132360ad40aec58507ba1df Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 11 Jul 2025 15:41:07 -0400 Subject: [PATCH 15/16] detail --- tests/client_e2e/signal_gatherer.ml | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml index 41e98eef..dcc44bfd 100644 --- a/tests/client_e2e/signal_gatherer.ml +++ b/tests/client_e2e/signal_gatherer.ml @@ -4,18 +4,22 @@ module Client = Opentelemetry_client module Signal = Client.Signal open Lwt.Syntax +let debug = + match Sys.getenv_opt "DEBUG" with + | Some "1" -> true + | _ -> false + (* Server to collect telemetry data *) module Server = struct let dbg_request kind req pp data : unit Lwt.t = - let _ = kind, req, pp, data in - (* NOTE: Uncomment for debugging *) - (* let* () = *) - (* let req : string = Format.asprintf "%a" Http.Request.pp req in *) - (* let data_s : string = Format.asprintf "%a" pp data in *) - (* Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@." *) - (* kind req data_s *) - (* in *) - Lwt.return () + if debug then ( + let _ = kind, req, pp, data in + let req : string = Format.asprintf "%a" Http.Request.pp req in + let data_s : string = Format.asprintf "%a" pp data in + Lwt_io.fprintf Lwt_io.stderr "# received %s\nREQUEST: %s\nBODY: %s\n@." + kind req data_s + ) else + Lwt.return_unit let handler push_signal _socket (request : Http.Request.t) (body : Cohttp_lwt.Body.t) = From a71fc32091622fb2e65c77ad79a15172ac70f204 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Fri, 11 Jul 2025 17:57:14 -0400 Subject: [PATCH 16/16] Fix docs --- src/client/signal.mli | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/client/signal.mli b/src/client/signal.mli index cbe252b1..a10f9e00 100644 --- a/src/client/signal.mli +++ b/src/client/signal.mli @@ -45,7 +45,7 @@ module Encode : sig ?encoder:Pbrt.Encoder.t -> Opentelemetry_proto.Trace.resource_spans list -> string - (** [metrics ts] is a protobuf encoded string of the traces [ts] + (** [traces ts] is a protobuf encoded string of the traces [ts] @param encoder provide an encoder state to reuse *) end @@ -53,19 +53,22 @@ end (** Decode signals from protobuf encoded strings, received over the wire *) module Decode : sig val logs : string -> Opentelemetry_proto.Logs.resource_logs list - (** [logs ls] is a protobuf encoded string of the logs [ls] + (** [logs s] is a list of log resources decoded from the protobuf encoded + string [s]. - @param encoder provide an encoder state to reuse *) + @raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *) val metrics : string -> Opentelemetry_proto.Metrics.resource_metrics list - (** [metrics ms] is a protobuf encoded string of the metrics [ms] + (** [metrics s] is a list of metrics resources decoded from the protobuf + encoded string [s]. - @param encoder provide an encoder state to reuse *) + @raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *) val traces : string -> Opentelemetry_proto.Trace.resource_spans list - (** [metrics ts] is a protobuf encoded string of the traces [ts] + (** [traces s] is a list of span resources decoded from the protobuf encoded + string [s]. - @param encoder provide an encoder state to reuse *) + @raise Pbrt.Decoder.Failure if [s] is not a valid protobuf encoding. *) end module Pp : sig