From ddbdc80d57960011848ba7444e9c808b2454eeb7 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Tue, 29 Jul 2025 23:37:26 -0400 Subject: [PATCH] make Eio collector thread safe The backend cannot take a switch, because switches cannot be shared across domains, but the backend is accessed across domains from a global variable. --- .../opentelemetry_client_cohttp_eio.ml | 249 ++++++++---------- .../opentelemetry_client_cohttp_eio.mli | 3 +- 2 files changed, 117 insertions(+), 135 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 3c5f6906..b6266ad9 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -136,7 +136,7 @@ end = struct let create net = Httpc.make ~https:(Some (https ~authenticator)) net (* send the content to the remote endpoint/path *) - let send (client : t) ~url ~decode (bod : string) : ('a, error) result = + let send (client : t) ~url ~decode (body : string) : ('a, error) result = Switch.run @@ fun sw -> let uri = Uri.of_string url in @@ -146,7 +146,7 @@ end = struct Header.(add headers "Content-Type" "application/x-protobuf") in - let body = Cohttp_eio.Body.of_string bod in + let body = Cohttp_eio.Body.of_string body in let r = try let r = Httpc.post client ~sw ~headers ~body uri in @@ -223,32 +223,16 @@ end exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~sw ~stop ~(config : Config.t) ~(net : _ Eio.Net.t) () : - (module EMITTER) = - let open Proto in +let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = (* local helpers *) let open struct - let timeout = - if config.batch_timeout_ms > 0 then - Some Mtime.Span.(config.batch_timeout_ms * ms) - else - None + let client = + (* Prime RNG state for TLS *) + Mirage_crypto_rng_unix.use_default (); + Httpc.create net - let batch_traces : Trace.resource_spans Batch.t = - Batch.make ?batch:config.batch_traces ?timeout () - - let batch_metrics : Metrics.resource_metrics Batch.t = - Batch.make ?batch:config.batch_metrics ?timeout () - - let batch_logs : Logs.resource_logs Batch.t = - Batch.make ?batch:config.batch_logs ?timeout () - - let on_tick_cbs_ = Atomic.make (AList.make ()) - - let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - - let send_http_ (httpc : Httpc.t) ~url data : unit = - let r = Httpc.send httpc ~url ~decode:(`Ret ()) data in + let send_http ~url data : unit = + let r = Httpc.send client ~url ~decode:(`Ret ()) data in match r with | Ok () -> () | Error `Sysbreak -> @@ -261,24 +245,25 @@ let mk_emitter ~sw ~stop ~(config : Config.t) ~(net : _ Eio.Net.t) () : (* avoid crazy error loop *) Eio_unix.sleep 3. - (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force client () = - Batch.pop_if_ready ?force ~now batch_metrics - |> Option.iter (fun collected_metrics -> - let gc_metrics = GC_metrics.drain () in - gc_metrics @ collected_metrics - |> Signal.Encode.metrics - |> send_http_ client ~url:config.url_metrics) + let timeout = + if config.batch_timeout_ms > 0 then + Some Mtime.Span.(config.batch_timeout_ms * ms) + else + None - let emit_traces_maybe ~now ?force client () = - Batch.pop_if_ready ?force ~now batch_traces - |> Option.iter (fun ts -> - Signal.Encode.traces ts |> send_http_ client ~url:config.url_traces) + let batch_traces : Proto.Trace.resource_spans Batch.t = + Batch.make ?batch:config.batch_traces ?timeout () - let emit_logs_maybe ~now ?force client () = - Batch.pop_if_ready ?force ~now batch_logs - |> Option.iter (fun ls -> - Signal.Encode.logs ls |> send_http_ client ~url:config.url_logs) + let batch_metrics : Proto.Metrics.resource_metrics Batch.t = + Batch.make ?batch:config.batch_metrics ?timeout () + + let batch_logs : Proto.Logs.resource_logs Batch.t = + Batch.make ?batch:config.batch_logs ?timeout () + + let push_to_batch b e = + match Batch.push b e with + | `Ok -> () + | `Dropped -> Atomic.incr n_errors let[@inline] guard_exn_ where f = try f () @@ -287,104 +272,84 @@ let mk_emitter ~sw ~stop ~(config : Config.t) ~(net : _ Eio.Net.t) () : Printf.eprintf "opentelemetry-eio: uncaught exception in %s: %s\n%s\n%!" where (Printexc.to_string e) bt - let emit_all_force (httpc : Httpc.t) : unit = - let now = Mtime_clock.now () in - Fiber.all - [ - emit_logs_maybe ~now ~force:true httpc; - emit_metrics_maybe ~now ~force:true httpc; - emit_traces_maybe ~now ~force:true httpc; - ] + let push_traces x = + let@ () = guard_exn_ "push trace" in + push_to_batch batch_traces x - let tick_common_ () = - if Config.Env.get_debug () then - Printf.eprintf "tick (from %d)\n%!" (tid ()); + let push_metrics x = + let@ () = guard_exn_ "push metrics" in sample_gc_metrics_if_needed (); + push_to_batch batch_metrics x + + let push_logs x = + let@ () = guard_exn_ "push logs" in + push_to_batch batch_logs x + + let maybe_emit (batch : 'a Batch.t) url (f : 'a list -> string) ~now ~force + () : unit = + Batch.pop_if_ready ~force ~now batch + |> Option.iter (fun signals -> f signals |> send_http ~url) + + let emit_traces_maybe = + maybe_emit batch_traces config.url_traces Signal.Encode.traces + + let emit_metrics_maybe = + maybe_emit batch_metrics config.url_metrics (fun collected_metrics -> + let gc_metrics = GC_metrics.drain () in + gc_metrics @ collected_metrics |> Signal.Encode.metrics) + + let emit_logs_maybe = + maybe_emit batch_logs config.url_logs Signal.Encode.logs + + let emit_all ~force : unit = + Switch.run @@ fun sw -> + let now = Mtime_clock.now () in + Fiber.fork ~sw @@ emit_logs_maybe ~now ~force; + Fiber.fork ~sw @@ emit_metrics_maybe ~now ~force; + Fiber.fork ~sw @@ emit_traces_maybe ~now ~force + + let on_tick_cbs_ = Atomic.make (AList.make ()) + + let run_tick_callbacks () = List.iter (fun f -> try f () with e -> Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) - (AList.get @@ Atomic.get on_tick_cbs_); - () - - (* thread that calls [tick()] regularly, to help enforce timeouts *) - let ticker_fiber ~tick : unit -> [ `Stop_daemon ] = - let rec loop () = - if Atomic.get stop then - `Stop_daemon - else ( - tick (); - Eio_unix.sleep 0.5; - loop () - ) - in - loop + (AList.get @@ Atomic.get on_tick_cbs_) end in - let httpc = - (* Prime RNG state for TLS *) - Mirage_crypto_rng_unix.use_default (); - Httpc.create net - in let module M = struct - let push_to_batch b e = - match Batch.push b e with - | `Ok -> () - | `Dropped -> Atomic.incr n_errors + let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let push_trace e = - let@ () = guard_exn_ "push trace" in - push_to_batch batch_traces e; - let now = Mtime_clock.now () in - Fiber.fork ~sw (emit_traces_maybe ~now httpc) + let push_trace e = push_traces e - let push_metrics e = - let@ () = guard_exn_ "push metrics" in + let push_metrics e = push_metrics e + + let push_logs e = push_logs e + + let tick () = + if Config.Env.get_debug () then + Printf.eprintf "tick (from %d)\n%!" (tid ()); + run_tick_callbacks (); sample_gc_metrics_if_needed (); - push_to_batch batch_metrics e; - let now = Mtime_clock.now () in - Fiber.fork ~sw (emit_metrics_maybe ~now httpc) - - let push_logs e = - let@ () = guard_exn_ "push logs" in - push_to_batch batch_logs e; - let now = Mtime_clock.now () in - Fiber.fork ~sw (emit_logs_maybe ~now httpc) - - let set_on_tick_callbacks = set_on_tick_callbacks - - let tick_ () = - tick_common_ (); - sample_gc_metrics_if_needed (); - let now = Mtime_clock.now () in - Fiber.all - [ - emit_logs_maybe ~now httpc; - emit_metrics_maybe ~now httpc; - emit_traces_maybe ~now httpc; - ] - - let () = Eio.Fiber.fork_daemon ~sw (ticker_fiber ~tick:tick_) - - let tick () = Fiber.fork ~sw tick_ + emit_all ~force:false let cleanup ~on_done () = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: exiting…\n%!"; - (* This must be in its own switch, because it MUST run even if the - surrounding switch in the environment has been cancelled. *) - Switch.run @@ fun sw -> - Fiber.fork ~sw (fun () -> - emit_all_force httpc; - on_done ()) + Atomic.set stop true; + run_tick_callbacks (); + sample_gc_metrics_if_needed (); + emit_all ~force:true; + on_done () end in (module M : EMITTER) module Backend (Emitter : EMITTER) : Opentelemetry.Collector.BACKEND = struct - include Emitter open Opentelemetry.Proto open Opentelemetry.Collector + open Emitter let send_trace : Trace.resource_spans list sender = { @@ -470,30 +435,46 @@ module Backend (Emitter : EMITTER) : Opentelemetry.Collector.BACKEND = struct push_logs m; ret ()); } + + let tick = Emitter.tick + + let cleanup = Emitter.cleanup + + let set_on_tick_callbacks = Emitter.set_on_tick_callbacks end let create_backend ~sw ?(stop = Atomic.make false) ?(config = Config.make ()) - (env : Eio_unix.Stdenv.base) : (module OT.Collector.BACKEND) = - let module E = (val mk_emitter ~sw ~stop ~config ~net:env#net ()) in - (module Backend (E)) + env : (module OT.Collector.BACKEND) = + let module E = (val mk_emitter ~stop ~net:env#net config) in + let module B = Backend (E) in + (* Run a background fiber to keep the backend ticking regularly. + + NOTE: This cannot be located inside the [Backend], because switches + are not thread safe, and cannot be used accross domains, but the + backend is accessed across domains. *) + Eio.Fiber.fork ~sw (fun () -> + while not @@ Atomic.get stop do + Eio.Time.sleep env#clock 0.5; + B.tick () + done); + + (module B) let setup_ ~sw ?stop ?config env : unit = - let backend = create_backend ~sw ?stop ?config env in - OT.Collector.set_backend backend; - () + let backend = create_backend ?stop ?config ~sw env in + OT.Collector.set_backend backend -let setup ?stop ?config ?(enable = true) env = - if enable then Switch.run @@ fun sw -> setup_ ~sw ?stop ?config env +let setup ?stop ?config ?(enable = true) ~sw env = + if enable then setup_ ~sw ?stop ?config env let remove_backend () = OT.Collector.remove_backend ~on_done:ignore () -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) f env = - (* NOTE: We must thread the switch [sw] through to all the forked threads in - the Backend's Emitter, to ensure that we can wait on all of them to - complete before before removing the backend during cleanup. *) - Switch.run (fun sw -> - if enable then ( - setup_ ~sw ?stop ~config env; - Switch.on_release sw remove_backend - ); - f env) +let with_setup ?stop ?config ?(enable = true) f env = + if enable then + Switch.run @@ fun sw -> + snd + @@ Fiber.pair + (fun () -> setup_ ~sw ?stop ?config env) + (fun () -> Fun.protect ~finally:(fun () -> remove_backend ()) f) + else + f () diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli index 9010e5be..40b26a8e 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli @@ -24,6 +24,7 @@ val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> + sw:Eio.Switch.t -> Eio_unix.Stdenv.base -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @@ -43,7 +44,7 @@ val with_setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> - (Eio_unix.Stdenv.base -> 'a) -> + (unit -> 'a) -> Eio_unix.Stdenv.base -> 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after