diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index ae759cb8..ec73e9be 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -10,18 +10,31 @@ module Config = Config open Opentelemetry include Common_ -external reraise : exn -> 'a = "%reraise" -(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to - use Lwt's latest version *) - let needs_gc_metrics = Atomic.make false let last_gc_metrics = Atomic.make (Mtime_clock.now ()) let timeout_gc_metrics = Mtime.Span.(20 * s) -let gc_metrics = ref [] -(* side channel for GC, appended to {!E_metrics}'s data *) +(* Cross domain thread-safe storage for GC metrics gathered from different fibres. + Ultimately appended to {!E_metrics}'s data on ticks. *) +module GC_metrics : sig + val add : Proto.Metrics.resource_metrics -> unit + val drain : unit -> Proto.Metrics.resource_metrics list +end += struct + (* Used to prevent data races across domains *) + let mutex = Eio.Mutex.create () + let gc_metrics = ref [] + + let add m = Eio.Mutex.use_rw ~protect:true mutex (fun () -> + gc_metrics := m :: !gc_metrics) + + let drain () = Eio.Mutex.use_rw ~protect:true mutex (fun () -> + let metrics = !gc_metrics in + gc_metrics := []; + metrics) +end (* capture current GC metrics if {!needs_gc_metrics} is true, or it has been a long time since the last GC metrics collection, @@ -40,7 +53,7 @@ let sample_gc_metrics_if_needed () = ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) @@ Opentelemetry.GC_metrics.get_metrics () in - gc_metrics := l :: !gc_metrics + GC_metrics.add l ) type error = @@ -74,7 +87,10 @@ let report_err_ = function module Httpc : sig type t - val create : unit -> t + val create : + https:(Uri.t -> [ `Generic ] Eio.Net.stream_socket_ty r -> [> Eio.Flow.two_way_ty ] r) option -> + [> [> `Generic ] Eio.Net.ty ] r -> + t val send : t -> @@ -88,15 +104,16 @@ end = struct open Opentelemetry.Proto module Httpc = Cohttp_eio.Client - type t = unit + type t = Httpc.t - let create () : t = () + let create = Httpc.make let cleanup _self = () (* send the content to the remote endpoint/path *) - let send (_self : t) ~url ~decode (bod : string) : - ('a, error) result Promise.t = + let send (client : t) ~url ~decode (bod : string) : + ('a, error) result (* TODO: Does this need to return a promise? *)= + Switch.run @@ fun sw -> let uri = Uri.of_string url in let open Cohttp in @@ -108,7 +125,7 @@ end = struct let body = Cohttp_eio.Body.of_string bod in let r = try - let r = Httpc.post ~headers ~body uri in + let r = Httpc.post client ~sw ~headers ~body uri in Ok r with e -> Error e in @@ -121,7 +138,7 @@ end = struct in Error err | Ok (resp, body) -> - let body = Cohttp_lwt.Body.to_string body in + let body = Eio.Buf_read.(parse_exn take_all) body ~max_size:max_int in let code = Response.status resp |> Code.code_of_status in if not (Code.is_error code) then ( match decode with @@ -260,13 +277,15 @@ module type EMITTER = sig val cleanup : on_done:(unit -> unit) -> unit -> unit end +(* Used by Switch.fail to signal when the collector is stopped. *) +exception Collector_stopped + (* make an emitter. exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = +let mk_emitter ~stop ~(config : Config.t) ~(net:_ Eio.Net.t) () : (module EMITTER) = let open Proto in - let open Lwt.Syntax in (* local helpers *) let open struct let timeout = @@ -288,7 +307,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Promise.t = + let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit = Pbrt.Encoder.reset encoder; encode x encoder; let data = Pbrt.Encoder.to_string encoder in @@ -336,23 +355,25 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ~encode:Logs_service.encode_pb_export_logs_service_request x (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc encoder : bool Promise.t = + let emit_metrics_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_metrics with | None -> false - | Some l -> - let batch = !gc_metrics :: l in - gc_metrics := []; + | Some collected_metrics -> + (* We take a large but fixed number of gc_metrics so that can't be trapped + forever waiting to exhaust a stream that is constantly being updated. *) + let gc_metrics = GC_metrics.drain () in + let batch = gc_metrics :: collected_metrics in let () = send_metrics_http httpc encoder batch in true - let emit_traces_maybe ~now ?force httpc encoder : bool Promise.t = + let emit_traces_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_traces with | None -> false | Some l -> let () = send_traces_http httpc encoder l in true - let emit_logs_maybe ~now ?force httpc encoder : bool Promise.t = + let emit_logs_maybe ~now ?force httpc encoder : bool = match Batch.pop_if_ready ?force ~now batch_logs with | None -> false | Some l -> @@ -367,16 +388,13 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where (Printexc.to_string e) bt - let emit_all_force (httpc : Httpc.t) encoder : unit Promise.t = + let emit_all_force (httpc : Httpc.t) encoder : unit = let now = Mtime_clock.now () in - let (_ : bool), ((_ : bool), (_ : bool)) = - Fiber.pair - (emit_logs_maybe ~now ~force:true httpc encoder) - (Fiber.pair - (emit_metrics_maybe ~now ~force:true httpc encoder) - (emit_traces_maybe ~now ~force:true httpc encoder)) - in - () + Fiber.all [ + (fun () -> ignore @@ emit_logs_maybe ~now ~force:true httpc encoder); + (fun () -> ignore @@ emit_metrics_maybe ~now ~force:true httpc encoder); + (fun () -> ignore @@ emit_traces_maybe ~now ~force:true httpc encoder) + ] let tick_common_ () = if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ()); @@ -391,23 +409,26 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = () (* thread that calls [tick()] regularly, to help enforce timeouts *) - let setup_ticker_thread ~tick ~finally () = + let setup_ticker_thread ~tick ~sw () : unit = let rec tick_thread () = if Atomic.get stop then - finally () + Switch.fail sw Collector_stopped else ( let () = Eio_unix.sleep 0.5 in let () = tick () in tick_thread () ) in - Fiber.fork ~sw - (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) - tick_thread + Fiber.fork ~sw tick_thread end in - let httpc = Httpc.create () in + (* TODO: need https? *) + let httpc = Httpc.create ~https:None net in let encoder = Pbrt.Encoder.create () in + (* The entire module [M] shares the switch [sw]. This means that when the the + switch is cancelled, every thread started within the module will be cancelled + and have its resources disposed of. *) + Switch.run @@ fun sw -> let module M = struct (* we make sure that this is thread-safe, even though we don't have a background thread. There can still be a ticker thread, and there @@ -419,7 +440,6 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_traces e; let now = Mtime_clock.now () in Fiber.fork ~sw - (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) (fun () -> let (_ : bool) = emit_traces_maybe ~now httpc encoder in ()) @@ -430,7 +450,6 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_metrics e; let now = Mtime_clock.now () in Fiber.fork ~sw - (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) (fun () -> let (_ : bool) = emit_metrics_maybe ~now httpc encoder in ()) @@ -440,7 +459,6 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_logs e; let now = Mtime_clock.now () in Fiber.fork ~sw - (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) (fun () -> let (_ : bool) = emit_logs_maybe ~now httpc encoder in ()) @@ -451,42 +469,45 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = tick_common_ (); sample_gc_metrics_if_needed (); let now = Mtime_clock.now () in - let (_ : bool), ((_ : bool), (_ : bool)) = - Fiber.pair - (emit_logs_maybe ~now httpc encoder) - (Fiber.pair - (emit_metrics_maybe ~now httpc encoder) - (emit_traces_maybe ~now httpc encoder)) - in - () + Fiber.all [ + (fun () -> ignore @@ emit_logs_maybe ~now httpc encoder); + (fun () -> ignore @@ emit_metrics_maybe ~now httpc encoder); + (fun () -> ignore @@ emit_traces_maybe ~now httpc encoder) + ] - let () = setup_ticker_thread ~tick:tick_ ~finally:ignore () + let () = setup_ticker_thread ~sw ~tick:tick_ () (* if called in a blocking context: work in the background *) let tick () = - Fiber.fork ~sw - (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) - tick_ + Fiber.fork ~sw tick_ let cleanup ~on_done () = if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; Fiber.fork ~sw - (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) (fun () -> let () = emit_all_force httpc encoder in Httpc.cleanup httpc; on_done ()) end in - (module M) + (module M : EMITTER) + +(* Eio environment capabilities needed *) +(* TODO: not able to supply just this type for env due to weak :( *) +type 'a _env = 'a constraint 'a = < + net : _ Eio.Net.t; (** Network access *) + > as 'a module Backend (Arg : sig val stop : bool Atomic.t + (* Networking capability *) + val env : Eio_unix.Stdenv.base + val config : Config.t end) () : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) + include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ~net:Arg.env#net ()) open Opentelemetry.Proto open Opentelemetry.Collector @@ -578,48 +599,37 @@ module Backend } end -let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = +let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) (env : Eio_unix.Stdenv.base) = debug_ := config.debug; let module B = Backend (struct + let env = env let stop = stop - let config = config end) () in (module B : OT.Collector.BACKEND) -let setup_ ?stop ?config () : unit = - let backend = create_backend ?stop ?config () in +let setup_ ?stop ?config env : unit = + let backend = create_backend ?stop ?config env in OT.Collector.set_backend backend; () -let setup ?stop ?config ?(enable = true) () = - if enable then setup_ ?stop ?config () +let setup ?stop ?config ?(enable = true) env = + if enable then setup_ ?stop ?config env -let remove_backend () : unit Promise.t = - let done_fut, done_u - (* TODO: lwt-to-direct-style: Translation is incomplete, [Promise.await] must be called on the promise when it's part of control-flow. *) - = - Promise.create () - in - OT.Collector.remove_backend ~on_done:(fun () -> Promise.resolve done_u ()) (); - done_fut +let remove_backend () = + OT.Collector.remove_backend ~on_done:ignore () -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : - _ Promise.t = - if enable then ( - let open Lwt.Syntax in - setup_ ?stop ~config (); - try - let res = f () in - let () = remove_backend () in - res - with exn -> - let () = remove_backend () in - reraise exn - ) else +(* TODO: Work thru stop logic to make sure it is sensible *) +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) env f = + Switch.run begin fun sw -> + if enable then ( + setup_ ?stop ~config env; + Switch.on_release sw remove_backend + ); f () + end diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli index c1dd53ad..86710fbd 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli @@ -15,14 +15,14 @@ module Config = Config val create_backend : ?stop:bool Atomic.t -> ?config:Config.t -> - unit -> + Eio_unix.Stdenv.base -> (module Opentelemetry.Collector.BACKEND) (** Create a new backend using lwt and cohttp NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE *) val setup : - ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit + ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> Eio_unix.Stdenv.base -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param enable actually setup the backend (default true). This can be used to @@ -40,7 +40,7 @@ val with_setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> - unit -> + Eio_unix.Stdenv.base -> (unit -> 'a) -> 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after