From 7746c871c208a7bb2f7c15aadbb3ad7dd95ccc20 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Sun, 29 Jun 2025 19:09:06 -0400 Subject: [PATCH 1/8] Add Eio collector --- dune-project | 20 + opentelemetry-client-cohttp-eio.opam | 44 ++ src/client-cohttp-eio/config.ml | 7 + src/client-cohttp-eio/config.mli | 12 + src/client-cohttp-eio/dune | 18 + .../opentelemetry_client_cohttp_eio.ml | 500 ++++++++++++++++++ .../opentelemetry_client_cohttp_eio.mli | 50 ++ tests/bin/dune | 14 + tests/bin/emit1_cohttp.ml | 6 +- tests/bin/emit1_eio.ml | 158 ++++++ tests/client_e2e/clients_e2e_lib.ml | 11 +- tests/client_e2e/dune | 10 +- tests/client_e2e/signal_gatherer.ml | 33 +- tests/client_e2e/test_cottp_eio_client_e2e.ml | 32 ++ tests/client_e2e/test_cottp_lwt_client_e2e.ml | 13 +- 15 files changed, 917 insertions(+), 11 deletions(-) create mode 100644 opentelemetry-client-cohttp-eio.opam create mode 100644 src/client-cohttp-eio/config.ml create mode 100644 src/client-cohttp-eio/config.mli create mode 100644 src/client-cohttp-eio/dune create mode 100644 src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml create mode 100644 src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli create mode 100644 tests/bin/emit1_eio.ml create mode 100644 tests/client_e2e/test_cottp_eio_client_e2e.ml diff --git a/dune-project b/dune-project index 8dd272eb..8407c455 100644 --- a/dune-project +++ b/dune-project @@ -125,3 +125,23 @@ (alcotest :with-test) (containers :with-test)) (synopsis "Collector client for opentelemetry, using cohttp + lwt")) + +(package + (name opentelemetry-client-cohttp-eio) + (depends + (ocaml + (>= "5.00")) + (mtime + (>= "1.4")) + tls-eio + ca-certs + mirage-crypto-rng-eio + (opentelemetry + (= :version)) + (odoc :with-doc) + (cohttp-eio + (>= 6.1.0)) + eio_main + (alcotest :with-test) + (containers :with-test)) + (synopsis "Collector client for opentelemetry, using cohttp + eio")) diff --git a/opentelemetry-client-cohttp-eio.opam b/opentelemetry-client-cohttp-eio.opam new file mode 100644 index 00000000..ee7f47c6 --- /dev/null +++ b/opentelemetry-client-cohttp-eio.opam @@ -0,0 +1,44 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.11.2" +synopsis: "Collector client for opentelemetry, using cohttp + eio" +maintainer: [ + "Simon Cruanes " + "Matt Bray " + "ELLIOTTCABLE " +] +authors: ["the Imandra team and contributors"] +license: "MIT" +homepage: "https://github.com/imandra-ai/ocaml-opentelemetry" +bug-reports: "https://github.com/imandra-ai/ocaml-opentelemetry/issues" +depends: [ + "dune" {>= "2.9"} + "ocaml" {>= "5.00"} + "mtime" {>= "1.4"} + "tls-eio" + "ca-certs" + "mirage-crypto-rng-eio" + "opentelemetry" {= version} + "odoc" {with-doc} + "cohttp-eio" {>= "6.1.0"} + "eio_main" + "alcotest" {with-test} + "containers" {with-test} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/imandra-ai/ocaml-opentelemetry.git" diff --git a/src/client-cohttp-eio/config.ml b/src/client-cohttp-eio/config.ml new file mode 100644 index 00000000..930881ff --- /dev/null +++ b/src/client-cohttp-eio/config.ml @@ -0,0 +1,7 @@ +type t = Opentelemetry_client.Config.t + +module Env = Opentelemetry_client.Config.Env () + +let pp = Opentelemetry_client.Config.pp + +let make = Env.make (fun common () -> common) diff --git a/src/client-cohttp-eio/config.mli b/src/client-cohttp-eio/config.mli new file mode 100644 index 00000000..100bb696 --- /dev/null +++ b/src/client-cohttp-eio/config.mli @@ -0,0 +1,12 @@ +type t = Opentelemetry_client.Config.t +(** Configuration. + + To build one, use {!make} below. This might be extended with more fields in + the future. *) + +val pp : Format.formatter -> t -> unit + +val make : (unit -> t) Opentelemetry_client.Config.make +(** Make a configuration {!t}. *) + +module Env : Opentelemetry_client.Config.ENV diff --git a/src/client-cohttp-eio/dune b/src/client-cohttp-eio/dune new file mode 100644 index 00000000..0c75d658 --- /dev/null +++ b/src/client-cohttp-eio/dune @@ -0,0 +1,18 @@ +(library + (name opentelemetry_client_cohttp_eio) + (public_name opentelemetry-client-cohttp-eio) + (synopsis "Opentelemetry collector using cohttp+eio+unix") + (libraries + opentelemetry + opentelemetry.client + eio + cohttp + cohttp-eio + eio_main + uri + pbrt + mtime + mtime.clock.os + tls-eio + ca-certs + mirage-crypto-rng.unix)) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml new file mode 100644 index 00000000..cb21a374 --- /dev/null +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -0,0 +1,500 @@ +open Eio.Std + +(* + https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md + https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md + *) + +module OT = Opentelemetry +module Config = Config +module Signal = Opentelemetry_client.Signal +module Batch = Opentelemetry_client.Batch +open Opentelemetry + +let ( let@ ) = ( @@ ) + +let spf = Printf.sprintf + +let tid () = Thread.id @@ Thread.self () + +let set_headers = Config.Env.set_headers + +let get_headers = Config.Env.get_headers + +let needs_gc_metrics = Atomic.make false + +let last_gc_metrics = Atomic.make (Mtime_clock.now ()) + +let timeout_gc_metrics = Mtime.Span.(20 * s) + +(* Cross-domain, thread-safe storage for GC metrics gathered from different fibres. *) +module GC_metrics : sig + val add : Proto.Metrics.resource_metrics -> unit + + val drain : unit -> Proto.Metrics.resource_metrics list +end = struct + (* Used to prevent data races across domains *) + let mutex = Eio.Mutex.create () + + let gc_metrics = ref [] + + let add m = + Eio.Mutex.use_rw ~protect:true mutex (fun () -> + gc_metrics := m :: !gc_metrics) + + let drain () = + Eio.Mutex.use_rw ~protect:true mutex (fun () -> + let metrics = !gc_metrics in + gc_metrics := []; + metrics) +end + +(* capture current GC metrics if {!needs_gc_metrics} is true, + or it has been a long time since the last GC metrics collection, + and push them into {!gc_metrics} for later collection *) +let sample_gc_metrics_if_needed () = + let now = Mtime_clock.now () in + let alarm = Atomic.compare_and_set needs_gc_metrics true false in + let timeout () = + let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in + Mtime.Span.compare elapsed timeout_gc_metrics > 0 + in + if alarm || timeout () then ( + Atomic.set last_gc_metrics now; + let l = + OT.Metrics.make_resource_metrics + ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) + @@ Opentelemetry.GC_metrics.get_metrics () + in + GC_metrics.add l + ) + +type error = + [ `Status of int * Opentelemetry.Proto.Status.status + | `Failure of string + | `Sysbreak + ] + +let n_errors = Atomic.make 0 + +let n_dropped = Atomic.make 0 + +let report_err_ = function + | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" + | `Failure msg -> + Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg + | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details }) + -> + let pp_details out l = + List.iter + (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) + l + in + Format.eprintf + "@[<2>opentelemetry: export failed with@ http code=%d@ status \ + {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." + code scode + (Bytes.unsafe_to_string message) + pp_details details + +module Httpc : sig + type t + + val create : _ Eio.Net.t -> t + + val send : + t -> + url:string -> + decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> + string -> + ('a, error) result +end = struct + open Opentelemetry.Proto + module Httpc = Cohttp_eio.Client + + type t = Httpc.t + + let authenticator = + match Ca_certs.authenticator () with + | Ok x -> x + | Error (`Msg m) -> + Fmt.failwith "Failed to create system store X509 authenticator: %s" m + + let https ~authenticator = + let tls_config = + match Tls.Config.client ~authenticator () with + | Error (`Msg msg) -> failwith ("tls configuration problem: " ^ msg) + | Ok tls_config -> tls_config + in + fun uri raw -> + let host = + Uri.host uri + |> Option.map (fun x -> Domain_name.(host_exn (of_string_exn x))) + in + Tls_eio.client_of_flow ?host tls_config raw + + let create net = Httpc.make ~https:(Some (https ~authenticator)) net + + (* send the content to the remote endpoint/path *) + let send (client : t) ~url ~decode (bod : string) : ('a, error) result = + Switch.run @@ fun sw -> + let uri = Uri.of_string url in + + let open Cohttp in + let headers = Header.(add_list (init ()) (Config.Env.get_headers ())) in + let headers = + Header.(add headers "Content-Type" "application/x-protobuf") + in + + let body = Cohttp_eio.Body.of_string bod in + let r = + try + let r = Httpc.post client ~sw ~headers ~body uri in + Ok r + with e -> Error e + in + match r with + | Error e -> + let err = + `Failure + (spf "sending signals via http POST to %S\nfailed with:\n%s" url + (Printexc.to_string e)) + in + Error err + | Ok (resp, body) -> + let body = Eio.Buf_read.(parse_exn take_all) body ~max_size:max_int in + let code = Response.status resp |> Code.code_of_status in + if not (Code.is_error code) then ( + match decode with + | `Ret x -> Ok x + | `Dec f -> + let dec = Pbrt.Decoder.of_string body in + let r = + try Ok (f dec) + with e -> + let bt = Printexc.get_backtrace () in + Error + (`Failure + (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) + bt)) + in + r + ) else ( + let dec = Pbrt.Decoder.of_string body in + + let r = + try + let status = Status.decode_pb_status dec in + Error (`Status (code, status)) + with e -> + let bt = Printexc.get_backtrace () in + Error + (`Failure + (spf + "httpc: decoding of status (url=%S, code=%d) failed with:\n\ + %s\n\ + status: %S\n\ + %s" + url code (Printexc.to_string e) body bt)) + in + r + ) +end + +(** An emitter. This is used by {!Backend} below to forward traces/metrics/… + from the program to whatever collector client we have. *) +module type EMITTER = sig + open Opentelemetry.Proto + + val push_trace : Trace.resource_spans list -> unit + + val push_metrics : Metrics.resource_metrics list -> unit + + val push_logs : Logs.resource_logs list -> unit + + val set_on_tick_callbacks : (unit -> unit) AList.t -> unit + + val tick : unit -> unit + + val cleanup : on_done:(unit -> unit) -> unit -> unit +end + +(* make an emitter. + + exceptions inside should be caught, see + https://opentelemetry.io/docs/reference/specification/error-handling/ *) +let mk_emitter ~sw ~stop ~(config : Config.t) ~(net : _ Eio.Net.t) () : + (module EMITTER) = + let open Proto in + (* local helpers *) + let open struct + let timeout = + if config.batch_timeout_ms > 0 then + Some Mtime.Span.(config.batch_timeout_ms * ms) + else + None + + let batch_traces : Trace.resource_spans Batch.t = + Batch.make ?batch:config.batch_traces ?timeout () + + let batch_metrics : Metrics.resource_metrics Batch.t = + Batch.make ?batch:config.batch_metrics ?timeout () + + let batch_logs : Logs.resource_logs Batch.t = + Batch.make ?batch:config.batch_logs ?timeout () + + let on_tick_cbs_ = Atomic.make (AList.make ()) + + let set_on_tick_callbacks = Atomic.set on_tick_cbs_ + + let send_http_ (httpc : Httpc.t) ~url data : unit = + let r = Httpc.send httpc ~url ~decode:(`Ret ()) data in + match r with + | Ok () -> () + | Error `Sysbreak -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + Atomic.set stop true + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err; + (* avoid crazy error loop *) + Eio_unix.sleep 3. + + (* emit metrics, if the batch is full or timeout lapsed *) + let emit_metrics_maybe ~now ?force client () = + Batch.pop_if_ready ?force ~now batch_metrics + |> Option.iter (fun collected_metrics -> + let gc_metrics = GC_metrics.drain () in + gc_metrics @ collected_metrics + |> Signal.Encode.metrics + |> send_http_ client ~url:config.url_metrics) + + let emit_traces_maybe ~now ?force client () = + Batch.pop_if_ready ?force ~now batch_traces + |> Option.iter (fun ts -> + Signal.Encode.traces ts |> send_http_ client ~url:config.url_traces) + + let emit_logs_maybe ~now ?force client () = + Batch.pop_if_ready ?force ~now batch_logs + |> Option.iter (fun ls -> + Signal.Encode.logs ls |> send_http_ client ~url:config.url_logs) + + let[@inline] guard_exn_ where f = + try f () + with e -> + let bt = Printexc.get_backtrace () in + Printf.eprintf + "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where + (Printexc.to_string e) bt + + let emit_all_force (httpc : Httpc.t) : unit = + let now = Mtime_clock.now () in + Fiber.all + [ + emit_logs_maybe ~now ~force:true httpc; + emit_metrics_maybe ~now ~force:true httpc; + emit_traces_maybe ~now ~force:true httpc; + ] + + let tick_common_ () = + if Config.Env.get_debug () then + Printf.eprintf "tick (from %d)\n%!" (tid ()); + sample_gc_metrics_if_needed (); + List.iter + (fun f -> + try f () + with e -> + Printf.eprintf "on tick callback raised: %s\n" + (Printexc.to_string e)) + (AList.get @@ Atomic.get on_tick_cbs_); + () + + (* thread that calls [tick()] regularly, to help enforce timeouts *) + let ticker_fiber ~tick : unit -> [ `Stop_daemon ] = + let rec loop () = + if Atomic.get stop then + `Stop_daemon + else ( + tick (); + Eio_unix.sleep 0.5; + loop () + ) + in + loop + end in + let httpc = + (* Prime RNG state for TLS *) + Mirage_crypto_rng_unix.use_default (); + Httpc.create net + in + let module M = struct + let push_to_batch b e = + match Batch.push b e with + | `Ok -> () + | `Dropped -> Atomic.incr n_errors + + let push_trace e = + let@ () = guard_exn_ "push trace" in + push_to_batch batch_traces e; + let now = Mtime_clock.now () in + Fiber.fork ~sw (emit_traces_maybe ~now httpc) + + let push_metrics e = + let@ () = guard_exn_ "push metrics" in + sample_gc_metrics_if_needed (); + push_to_batch batch_metrics e; + let now = Mtime_clock.now () in + Fiber.fork ~sw (emit_metrics_maybe ~now httpc) + + let push_logs e = + let@ () = guard_exn_ "push logs" in + push_to_batch batch_logs e; + let now = Mtime_clock.now () in + Fiber.fork ~sw (emit_logs_maybe ~now httpc) + + let set_on_tick_callbacks = set_on_tick_callbacks + + let tick_ () = + tick_common_ (); + sample_gc_metrics_if_needed (); + let now = Mtime_clock.now () in + Fiber.all + [ + emit_logs_maybe ~now httpc; + emit_metrics_maybe ~now httpc; + emit_traces_maybe ~now httpc; + ] + + let () = Eio.Fiber.fork_daemon ~sw (ticker_fiber ~tick:tick_) + + let tick () = Fiber.fork ~sw tick_ + + let cleanup ~on_done () = + if Config.Env.get_debug () then + Printf.eprintf "opentelemetry: exiting…\n%!"; + (* This must be in its own switch, because it MUST run even if the + surrounding switch in the environment has been cancelled. *) + Switch.run @@ fun sw -> + Fiber.fork ~sw (fun () -> + emit_all_force httpc; + on_done ()) + end in + (module M : EMITTER) + +module Backend (Emitter : EMITTER) : Opentelemetry.Collector.BACKEND = struct + include Emitter + open Opentelemetry.Proto + open Opentelemetry.Collector + + let send_trace : Trace.resource_spans list sender = + { + send = + (fun l ~ret -> + (if Config.Env.get_debug () then + let@ () = Lock.with_lock in + Format.eprintf "send spans %a@." + (Format.pp_print_list Trace.pp_resource_spans) + l); + push_trace l; + ret ()); + } + + let last_sent_metrics = Atomic.make (Mtime_clock.now ()) + + let timeout_sent_metrics = Mtime.Span.(5 * s) + (* send metrics from time to time *) + + let signal_emit_gc_metrics () = + if Config.Env.get_debug () then + Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; + Atomic.set needs_gc_metrics true + + let additional_metrics () : Metrics.resource_metrics list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now () in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + (* there is a possible race condition here, as several threads might update + metrics at the same time. But that's harmless. *) + if add_own_metrics then ( + Atomic.set last_sent_metrics now; + let open OT.Metrics in + [ + make_resource_metrics + [ + sum ~name:"otel.export.dropped" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel.export.errors" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]; + ] + ) else + [] + + let send_metrics : Metrics.resource_metrics list sender = + { + send = + (fun m ~ret -> + (if Config.Env.get_debug () then + let@ () = Lock.with_lock in + Format.eprintf "send metrics %a@." + (Format.pp_print_list Metrics.pp_resource_metrics) + m); + + let m = List.rev_append (additional_metrics ()) m in + push_metrics m; + ret ()); + } + + let send_logs : Logs.resource_logs list sender = + { + send = + (fun m ~ret -> + (if Config.Env.get_debug () then + let@ () = Lock.with_lock in + Format.eprintf "send logs %a@." + (Format.pp_print_list Logs.pp_resource_logs) + m); + push_logs m; + ret ()); + } +end + +let create_backend ~sw ?(stop = Atomic.make false) ?(config = Config.make ()) + (env : Eio_unix.Stdenv.base) : (module OT.Collector.BACKEND) = + let module E = (val mk_emitter ~sw ~stop ~config ~net:env#net ()) in + (module Backend (E)) + +let setup_ ~sw ?stop ?config env : unit = + let backend = create_backend ~sw ?stop ?config env in + OT.Collector.set_backend backend; + () + +let setup ?stop ?config ?(enable = true) env = + if enable then Switch.run @@ fun sw -> setup_ ~sw ?stop ?config env + +let remove_backend () = OT.Collector.remove_backend ~on_done:ignore () + +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) f env = + (* NOTE: We must thread the switch [sw] through to all the forked threads in + the Backend's Emitter, to ensure that we can wait on all of them to + complete before before removing the backend during cleanup. *) + Switch.run (fun sw -> + if enable then ( + setup_ ~sw ?stop ~config env; + Switch.on_release sw remove_backend + ); + f env) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli new file mode 100644 index 00000000..9010e5be --- /dev/null +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli @@ -0,0 +1,50 @@ +(* + TODO: more options from + https://opentelemetry.io/docs/reference/specification/protocol/exporter/ + *) + +val get_headers : unit -> (string * string) list + +val set_headers : (string * string) list -> unit +(** Set http headers that are sent on every http query to the collector. *) + +module Config = Config + +val create_backend : + sw:Eio.Switch.t -> + ?stop:bool Atomic.t -> + ?config:Config.t -> + Eio_unix.Stdenv.base -> + (module Opentelemetry.Collector.BACKEND) +(** Create a new backend using Cohttp_eio + + NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE *) + +val setup : + ?stop:bool Atomic.t -> + ?config:Config.t -> + ?enable:bool -> + Eio_unix.Stdenv.base -> + unit +(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. + @param enable + actually setup the backend (default true). This can be used to + enable/disable the setup depending on CLI arguments or environment. + @param config configuration to use + @param stop + an atomic boolean. When it becomes true, background threads will all stop + after a little while. *) + +val remove_backend : unit -> unit +(** Shutdown current backend + @since NEXT_RELEASE *) + +val with_setup : + ?stop:bool Atomic.t -> + ?config:Config.t -> + ?enable:bool -> + (Eio_unix.Stdenv.base -> 'a) -> + Eio_unix.Stdenv.base -> + 'a +(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after + [f()] returns See {!setup} for more details. *) diff --git a/tests/bin/dune b/tests/bin/dune index 84ac795f..60da511a 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -20,6 +20,20 @@ opentelemetry-client-cohttp-lwt lwt.unix)) +(executable + (name emit1_eio) + (modules emit1_eio) + (preprocess + (pps lwt_ppx)) + (libraries + unix + logs + logs.fmt + logs.threaded + opentelemetry + opentelemetry.client + opentelemetry-client-cohttp-eio)) + (executable (name cohttp_client) (modules cohttp_client) diff --git a/tests/bin/emit1_cohttp.ml b/tests/bin/emit1_cohttp.ml index 59acf285..0611c754 100644 --- a/tests/bin/emit1_cohttp.ml +++ b/tests/bin/emit1_cohttp.ml @@ -108,9 +108,13 @@ let () = let batch_traces = ref 400 in let batch_metrics = ref 3 in let batch_logs = ref 400 in + let url = ref None in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; + ( "--url", + Arg.String (fun s -> url := Some s), + " set the url for the OTel collector" ); ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); @@ -136,7 +140,7 @@ let () = None in let config = - Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug + Opentelemetry_client_cohttp_lwt.Config.make ~debug:!debug ?url:!url ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) ~batch_logs:(some_if_nzero batch_logs) () diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml new file mode 100644 index 00000000..dc5198ae --- /dev/null +++ b/tests/bin/emit1_eio.ml @@ -0,0 +1,158 @@ +module OT = Opentelemetry +module Atomic = Opentelemetry_atomic.Atomic + +let spf = Printf.sprintf + +let ( let@ ) f x = f x + +let sleep_inner = ref 0.1 + +let sleep_outer = ref 2.0 + +let n_jobs = ref 1 + +let iterations = ref 1 + +let num_sleep = Atomic.make 0 + +let stress_alloc_ = ref true + +let stop = Atomic.make false + +let num_tr = Atomic.make 0 + +(* Counter used to mark simulated failures *) +let i = ref 0 + +let run_job clock _job_id : unit = + while not @@ Atomic.get stop do + let@ scope = + Atomic.incr num_tr; + OT.Trace.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" + ~attrs:[ "i", `Int !i ] + in + + for j = 0 to !iterations do + if j >= !iterations then + (* Terminate program, having reached our max iterations *) + Atomic.set stop true + else + (* parent scope is found via thread local storage *) + let@ scope = + Atomic.incr num_tr; + OT.Trace.with_ ~scope ~kind:OT.Span.Span_kind_internal + ~attrs:[ "j", `Int j ] + "loop.inner" + in + + let () = Eio.Time.sleep clock !sleep_outer in + Atomic.incr num_sleep; + + OT.Logs.( + emit + [ + make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id + ~severity:Severity_number_info "inner at %d" j; + ]); + + incr i; + + try + Atomic.incr num_tr; + let@ scope = + OT.Trace.with_ ~kind:OT.Span.Span_kind_internal ~scope "alloc" + in + (* allocate some stuff *) + if !stress_alloc_ then ( + let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in + ignore _arr + ); + + let () = Eio.Time.sleep clock !sleep_inner in + Atomic.incr num_sleep; + + if j = 4 && !i mod 13 = 0 then failwith "oh no"; + + (* simulate a failure *) + Opentelemetry.Scope.add_event scope (fun () -> + OT.Event.make "done with alloc") + with Failure _ -> () + done + done + +let run env : unit = + OT.GC_metrics.basic_setup (); + + OT.Metrics_callbacks.register (fun () -> + OT.Metrics. + [ + sum ~name:"num-sleep" ~is_monotonic:true + [ int (Atomic.get num_sleep) ]; + ]); + + let n_jobs = max 1 !n_jobs in + Printf.printf "run %d jobs\n%!" n_jobs; + + Eio.Switch.run (fun sw -> + for j = 1 to n_jobs do + Eio.Fiber.fork ~sw (fun () -> run_job env#clock j) + done) + +let () = + Sys.catch_break true; + OT.Globals.service_name := "t1"; + OT.Globals.service_namespace := Some "ocaml-otel.test"; + let ts_start = Unix.gettimeofday () in + + let debug = ref false in + let batch_traces = ref 400 in + let batch_metrics = ref 3 in + let batch_logs = ref 400 in + let url = ref None in + let opts = + [ + "--debug", Arg.Bool (( := ) debug), " enable debug output"; + ( "--url", + Arg.String (fun s -> url := Some s), + " set the url for the OTel collector" ); + ( "--stress-alloc", + Arg.Bool (( := ) stress_alloc_), + " perform heavy allocs in inner loop" ); + ( "--batch-metrics", + Arg.Int (( := ) batch_metrics), + " size of metrics batch" ); + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + "--batch-logs", Arg.Int (( := ) batch_logs), " size of logs batch"; + "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; + "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + "--iterations", Arg.Set_int iterations, " the number of iterations to run"; + "-j", Arg.Set_int n_jobs, " number of parallel jobs"; + ] + |> Arg.align + in + + Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; + + let some_if_nzero r = + if !r > 0 then + Some !r + else + None + in + let config = + Opentelemetry_client_cohttp_eio.Config.make ~debug:!debug ?url:!url + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~batch_logs:(some_if_nzero batch_logs) () + in + Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." + !sleep_outer !sleep_inner Opentelemetry_client_cohttp_eio.Config.pp config; + + let@ () = + Fun.protect ~finally:(fun () -> + let elapsed = Unix.gettimeofday () -. ts_start in + let n_per_sec = float (Atomic.get num_tr) /. elapsed in + Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" + (Atomic.get num_tr) elapsed n_per_sec) + in + Opentelemetry_client_cohttp_eio.with_setup ~stop ~config run |> Eio_main.run diff --git a/tests/client_e2e/clients_e2e_lib.ml b/tests/client_e2e/clients_e2e_lib.ml index aa79e84a..8c085a22 100644 --- a/tests/client_e2e/clients_e2e_lib.ml +++ b/tests/client_e2e/clients_e2e_lib.ml @@ -1,3 +1,7 @@ +(** This library defines a set of tests expecting that are meant to be run on an + application whick emits a set of signals that is isomorphic to those emitted + by the ../bin/emit1_cohttp.ml and ../bin/emit1_eio.ml executables. *) + module Client = Opentelemetry_client module Proto = Opentelemetry.Proto open Containers @@ -92,6 +96,7 @@ let count_logs_with_body p signals = |> List.length type params = { + url: string; jobs: int; batch_traces: int; batch_metrics: int; @@ -104,6 +109,8 @@ let cmd exec params = exec; "-j"; string_of_int params.jobs; + "--url"; + params.url; "--iterations"; string_of_int params.iterations; "--batch-traces"; @@ -179,13 +186,13 @@ let tests params signal_batches = | _ -> false))); ] -let run_tests cmds = +let run_tests ~port cmds = let suites = cmds |> List.map (fun (exec, params) -> let cmd = cmd exec params in let name = cmd |> String.concat " " in - let signal_batches = Signal_gatherer.gather_signals cmd in + let signal_batches = Signal_gatherer.gather_signals ~port cmd in (* Let server reset *) Unix.sleep 1; name, tests params signal_batches) diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune index 706740da..44897079 100644 --- a/tests/client_e2e/dune +++ b/tests/client_e2e/dune @@ -5,6 +5,7 @@ (binaries (../bin/emit1.exe as emit1) (../bin/emit1_cohttp.exe as emit1_cohttp) + (../bin/emit1_eio.exe as emit1_eio) (./gather_signals.exe as gather_signals)))) (library @@ -30,7 +31,14 @@ (names test_cottp_lwt_client_e2e) (modules test_cottp_lwt_client_e2e) (package opentelemetry-client-cohttp-lwt) - (deps %{bin:emit1_cohttp}) + (deps %{bin:emit1_cohttp} %{bin:emit1_eio}) + (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) + +(tests + (names test_cottp_eio_client_e2e) + (modules test_cottp_eio_client_e2e) + (package opentelemetry-client-cohttp-eio) + (deps %{bin:emit1_eio}) (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) (executable diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml index dcc44bfd..d9a0dab8 100644 --- a/tests/client_e2e/signal_gatherer.ml +++ b/tests/client_e2e/signal_gatherer.ml @@ -39,7 +39,10 @@ module Server = struct let+ () = dbg_request "logs" request Signal.Pp.logs logs in `OK, Some (Signal.Logs logs) | unexepected -> - let+ () = Lwt_io.eprintf "unexpected endpoint %s\n" unexepected in + let+ () = + Lwt_io.eprintf "unexpected endpoint %s -- status %s\n" unexepected + (Http.Status.to_string `Not_found) + in `Not_found, None in push_signal signal; @@ -47,10 +50,30 @@ module Server = struct Cohttp_lwt_unix.Server.respond ~status ~body:resp_body () let run port push_signals = - let* () = Lwt_io.eprintf "starting server\n" in - Cohttp_lwt_unix.Server.( - make ~callback:(handler push_signals) () - |> create ~mode:(`TCP (`Port port))) + let request_handler = + Cohttp_lwt_unix.Server.make ~callback:(handler push_signals) () + in + let mode = `TCP (`Port port) in + let* () = Lwt_io.eprintf "starting server on http://localhost:%d\n" port in + let ipv4_server = Cohttp_lwt_unix.Server.(create ~mode request_handler) in + let ipv6_server = + (* TODO: Ideally we could bind both IPv6 and IPv4 in a dual stack server. + However, Cohttp depends on Conduit for this, which is not currently able + to support a dual stack. See https://github.com/mirage/ocaml-conduit/issues/323. + + We need an IPv6 server, because Cotthp_eio will try to bind this first, + if it is available on the local loop, and if we have an open port at + [::1] without a sever to handle requests, we'll end up with the + connection refused. + + For the time being, we fix this by running one server for each case. *) + let* ipv6_ctx = + let+ ctx = Conduit_lwt_unix.init ~src:"::1" () in + Cohttp_lwt_unix.Net.init ~ctx () + in + Cohttp_lwt_unix.Server.(create ~ctx:ipv6_ctx ~mode request_handler) + in + Lwt.join [ ipv4_server; ipv6_server ] end (** Manage launching and cleaning up the program we are testing *) diff --git a/tests/client_e2e/test_cottp_eio_client_e2e.ml b/tests/client_e2e/test_cottp_eio_client_e2e.ml new file mode 100644 index 00000000..539a1ba4 --- /dev/null +++ b/tests/client_e2e/test_cottp_eio_client_e2e.ml @@ -0,0 +1,32 @@ +module Client = Opentelemetry_client +module Proto = Opentelemetry.Proto +open Clients_e2e_lib + +(* NOTE: This port must be different from that used by other integration tests, + to prevent socket binding clashes. *) +let port = 4328 + +let url = Printf.sprintf "http://localhost:%d" port + +let () = + Clients_e2e_lib.run_tests ~port + [ + ( "emit1_eio", + { + url; + jobs = 1; + iterations = 1; + batch_traces = 2; + batch_metrics = 2; + batch_logs = 2; + } ); + ( "emit1_eio", + { + url; + jobs = 3; + iterations = 1; + batch_traces = 400; + batch_metrics = 3; + batch_logs = 400; + } ); + ] diff --git a/tests/client_e2e/test_cottp_lwt_client_e2e.ml b/tests/client_e2e/test_cottp_lwt_client_e2e.ml index c093155f..5c72165e 100644 --- a/tests/client_e2e/test_cottp_lwt_client_e2e.ml +++ b/tests/client_e2e/test_cottp_lwt_client_e2e.ml @@ -2,12 +2,19 @@ module Client = Opentelemetry_client module Proto = Opentelemetry.Proto open Clients_e2e_lib +(* NOTE: This port must be different from that used by other integration tests, + to prevent socket binding clashes. *) +let port = 4338 + +let url = Printf.sprintf "http://localhost:%d" port + let () = - Clients_e2e_lib.run_tests + Clients_e2e_lib.run_tests ~port [ - (* TODO: Running with batch-traces = 1 causes deadlocks *) + (* TODO: Running with batch sizes of 1 causes deadlocks *) (* ( "emit1_cohttp", *) (* { *) + (* ipv6 = false; *) (* jobs = 1; *) (* iterations = 1; *) (* batch_traces = 1; *) @@ -16,6 +23,7 @@ let () = (* } ); *) ( "emit1_cohttp", { + url; jobs = 1; iterations = 1; batch_traces = 2; @@ -24,6 +32,7 @@ let () = } ); ( "emit1_cohttp", { + url; jobs = 3; iterations = 1; batch_traces = 400; From a0bee6bfcca9562e20fcf0dd4d47c61cb038310c Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Thu, 10 Jul 2025 17:36:50 -0400 Subject: [PATCH 2/8] Only install Eio in builds for ocaml >= 5 --- .github/workflows/main.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f65a6192..78fbfb4a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,7 +46,15 @@ jobs: opam pin pbrt 3.0.1 -y -n opam install pbrt -y - - run: opam install . --deps-only --with-test --solver=mccs + # We cannot install packages that need eio on ocaml versions before 5 + - if: ${{ ! startsWith("5" matrix.ocaml-compiler) }} + run: | + packages=$(ls *.opam | grep -v eio) + opam install $packages --deps-only --with-test --solver=mccs + + # We should be able to install all packages on ocaml 5 + - if: ${{ startsWith("5" matrix.ocaml-compiler) }} + run: opam install . --deps-only --with-test --solver=mccs - run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl,opentelemetry-cohttp-lwt,opentelemetry-client-cohttp-lwt From ea66f65187849de50b1405e4d4c932bdee5b7a27 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Thu, 10 Jul 2025 22:51:44 -0400 Subject: [PATCH 3/8] Add tls-eio dep --- dune-project | 1 + opentelemetry-client-cohttp-eio.opam | 1 + 2 files changed, 2 insertions(+) diff --git a/dune-project b/dune-project index 8407c455..0d0e8bae 100644 --- a/dune-project +++ b/dune-project @@ -142,6 +142,7 @@ (cohttp-eio (>= 6.1.0)) eio_main + tls-eio (alcotest :with-test) (containers :with-test)) (synopsis "Collector client for opentelemetry, using cohttp + eio")) diff --git a/opentelemetry-client-cohttp-eio.opam b/opentelemetry-client-cohttp-eio.opam index ee7f47c6..ae142446 100644 --- a/opentelemetry-client-cohttp-eio.opam +++ b/opentelemetry-client-cohttp-eio.opam @@ -22,6 +22,7 @@ depends: [ "odoc" {with-doc} "cohttp-eio" {>= "6.1.0"} "eio_main" + "tls-eio" "alcotest" {with-test} "containers" {with-test} ] From 94772c7fe49740a91564d2f6854568c5bbf21a4d Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Thu, 10 Jul 2025 17:48:50 -0400 Subject: [PATCH 4/8] Fix GitHub actions for Ocaml5 specific tests Co-authored-by: Puneeth Chaganti --- .github/workflows/main.yml | 13 +++++++------ src/client-cohttp-eio/dune | 2 ++ tests/bin/dune | 2 ++ tests/client_e2e/dune | 4 ++++ 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 78fbfb4a..beea2eae 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,11 +5,12 @@ on: push: branches: - main + - add-eio-backend jobs: build: strategy: - fail-fast: true + fail-fast: false matrix: os: - ubuntu-latest @@ -47,14 +48,14 @@ jobs: opam install pbrt -y # We cannot install packages that need eio on ocaml versions before 5 - - if: ${{ ! startsWith("5" matrix.ocaml-compiler) }} - run: | - packages=$(ls *.opam | grep -v eio) + - run: | + packages=$(ls ./*.opam | grep -v eio) opam install $packages --deps-only --with-test --solver=mccs + if: ${{ ! (startsWith(matrix.ocaml-compiler, '5')) }} # We should be able to install all packages on ocaml 5 - - if: ${{ startsWith("5" matrix.ocaml-compiler) }} - run: opam install . --deps-only --with-test --solver=mccs + - run: opam install . --deps-only --with-test --solver=mccs + if: ${{ startsWith(matrix.ocaml-compiler, '5') }} - run: opam exec -- dune build @install -p opentelemetry,opentelemetry-lwt,opentelemetry-client-ocurl,opentelemetry-cohttp-lwt,opentelemetry-client-cohttp-lwt diff --git a/src/client-cohttp-eio/dune b/src/client-cohttp-eio/dune index 0c75d658..a1494d02 100644 --- a/src/client-cohttp-eio/dune +++ b/src/client-cohttp-eio/dune @@ -2,6 +2,8 @@ (name opentelemetry_client_cohttp_eio) (public_name opentelemetry-client-cohttp-eio) (synopsis "Opentelemetry collector using cohttp+eio+unix") + (enabled_if + (>= %{ocaml_version} 5.0)) (libraries opentelemetry opentelemetry.client diff --git a/tests/bin/dune b/tests/bin/dune index 60da511a..58a2e110 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -25,6 +25,8 @@ (modules emit1_eio) (preprocess (pps lwt_ppx)) + (enabled_if + (>= %{ocaml_version} 5.0)) (libraries unix logs diff --git a/tests/client_e2e/dune b/tests/client_e2e/dune index 44897079..d553490f 100644 --- a/tests/client_e2e/dune +++ b/tests/client_e2e/dune @@ -31,6 +31,8 @@ (names test_cottp_lwt_client_e2e) (modules test_cottp_lwt_client_e2e) (package opentelemetry-client-cohttp-lwt) + (enabled_if + (>= %{ocaml_version} 5.0)) (deps %{bin:emit1_cohttp} %{bin:emit1_eio}) (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) @@ -39,6 +41,8 @@ (modules test_cottp_eio_client_e2e) (package opentelemetry-client-cohttp-eio) (deps %{bin:emit1_eio}) + (enabled_if + (>= %{ocaml_version} 5.0)) (libraries clients_e2e_lib alcotest opentelemetry opentelemetry.client)) (executable From 26baa4d26ba4ec7c01279646411b5da730188f37 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Sat, 12 Jul 2025 12:34:23 -0400 Subject: [PATCH 5/8] Remove testing branch from PR --- .github/workflows/main.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index beea2eae..0d206121 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,7 +5,6 @@ on: push: branches: - main - - add-eio-backend jobs: build: From 69d15df4f1606f3d09c7d14dfaf0e6857ae7ada2 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 14 Jul 2025 12:33:21 -0400 Subject: [PATCH 6/8] Fix deps --- dune-project | 1 - opentelemetry-client-cohttp-eio.opam | 1 - src/client-cohttp-eio/dune | 4 ++-- tests/bin/dune | 1 + 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dune-project b/dune-project index 0d0e8bae..57a857f4 100644 --- a/dune-project +++ b/dune-project @@ -133,7 +133,6 @@ (>= "5.00")) (mtime (>= "1.4")) - tls-eio ca-certs mirage-crypto-rng-eio (opentelemetry diff --git a/opentelemetry-client-cohttp-eio.opam b/opentelemetry-client-cohttp-eio.opam index ae142446..fc59603a 100644 --- a/opentelemetry-client-cohttp-eio.opam +++ b/opentelemetry-client-cohttp-eio.opam @@ -15,7 +15,6 @@ depends: [ "dune" {>= "2.9"} "ocaml" {>= "5.00"} "mtime" {>= "1.4"} - "tls-eio" "ca-certs" "mirage-crypto-rng-eio" "opentelemetry" {= version} diff --git a/src/client-cohttp-eio/dune b/src/client-cohttp-eio/dune index a1494d02..79cf9393 100644 --- a/src/client-cohttp-eio/dune +++ b/src/client-cohttp-eio/dune @@ -8,13 +8,13 @@ opentelemetry opentelemetry.client eio + eio.unix cohttp cohttp-eio - eio_main + tls-eio uri pbrt mtime mtime.clock.os - tls-eio ca-certs mirage-crypto-rng.unix)) diff --git a/tests/bin/dune b/tests/bin/dune index 58a2e110..040c8838 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -29,6 +29,7 @@ (>= %{ocaml_version} 5.0)) (libraries unix + eio_main logs logs.fmt logs.threaded From e678a93570b3bc91212bf54ca3a4a6ea246ff86c Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 14 Jul 2025 12:34:07 -0400 Subject: [PATCH 7/8] Fix spelling of variable --- tests/client_e2e/signal_gatherer.ml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/client_e2e/signal_gatherer.ml b/tests/client_e2e/signal_gatherer.ml index d9a0dab8..234feaf2 100644 --- a/tests/client_e2e/signal_gatherer.ml +++ b/tests/client_e2e/signal_gatherer.ml @@ -38,9 +38,9 @@ module Server = struct let logs = Signal.Decode.logs data in let+ () = dbg_request "logs" request Signal.Pp.logs logs in `OK, Some (Signal.Logs logs) - | unexepected -> + | unexpected -> let+ () = - Lwt_io.eprintf "unexpected endpoint %s -- status %s\n" unexepected + Lwt_io.eprintf "unexpected endpoint %s -- status %s\n" unexpected (Http.Status.to_string `Not_found) in `Not_found, None From 621045435c66cf5bcc61a3a6f37223dd6cb5322a Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 14 Jul 2025 15:09:04 -0400 Subject: [PATCH 8/8] Fix eio_main dep --- dune-project | 2 +- opentelemetry-client-cohttp-eio.opam | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dune-project b/dune-project index 57a857f4..952bcc36 100644 --- a/dune-project +++ b/dune-project @@ -140,7 +140,7 @@ (odoc :with-doc) (cohttp-eio (>= 6.1.0)) - eio_main + (eio_main :with-test) tls-eio (alcotest :with-test) (containers :with-test)) diff --git a/opentelemetry-client-cohttp-eio.opam b/opentelemetry-client-cohttp-eio.opam index fc59603a..4a4eb785 100644 --- a/opentelemetry-client-cohttp-eio.opam +++ b/opentelemetry-client-cohttp-eio.opam @@ -20,7 +20,7 @@ depends: [ "opentelemetry" {= version} "odoc" {with-doc} "cohttp-eio" {>= "6.1.0"} - "eio_main" + "eio_main" {with-test} "tls-eio" "alcotest" {with-test} "containers" {with-test}