From 0bf561b5866f0b77f4a49611e8d0849a018ac7e7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Dec 2025 15:37:03 -0500 Subject: [PATCH] update client libraries, remove `stop:bool atomic` in favor of switches --- .../opentelemetry_client_cohttp_eio.ml | 26 +++++----- .../opentelemetry_client_cohttp_eio.mli | 11 +---- .../opentelemetry_client_cohttp_lwt.ml | 20 ++++---- .../opentelemetry_client_cohttp_lwt.mli | 23 ++------- .../opentelemetry_client_ocurl_lwt.ml | 20 ++++---- .../opentelemetry_client_ocurl_lwt.mli | 28 +++-------- .../opentelemetry_client_ocurl.ml | 47 ++++++++++++------- .../opentelemetry_client_ocurl.mli | 24 ++-------- 8 files changed, 78 insertions(+), 121 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 07e17b9b..890f65d8 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -93,7 +93,7 @@ struct (* send the content to the remote endpoint/path *) let send (client : t) ~url ~decode (body : string) : ('a, Export_error.t) result = - Switch.run @@ fun sw -> + Eio.Switch.run @@ fun sw -> let uri = Uri.of_string url in let open Cohttp in @@ -158,18 +158,18 @@ struct end end -let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) ~sw - ~env () : Consumer.any_resource_builder = +let create_consumer ?(config = Config.make ()) ~sw ~env () : + Consumer.any_resource_builder = let module M = Make (struct let sw = sw let env = env end) in let module C = Generic_http_consumer.Make (M.IO) (M.Notifier) (M.Httpc) in - C.consumer ~ticker_task:(Some 0.5) ~stop ~config () + C.consumer ~ticker_task:(Some 0.5) ~config () -let create_exporter ?stop ?(config = Config.make ()) ~sw ~env () = - let consumer = create_consumer ?stop ~config ~sw ~env () in +let create_exporter ?(config = Config.make ()) ~sw ~env () = + let consumer = create_consumer ~config ~sw ~env () in let bq = Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () @@ -179,12 +179,12 @@ let create_exporter ?stop ?(config = Config.make ()) ~sw ~env () = let create_backend = create_exporter -let setup_ ~sw ?stop ?config env : unit = - let exp = create_exporter ?stop ?config ~sw ~env () in +let setup_ ~sw ?config env : unit = + let exp = create_exporter ?config ~sw ~env () in Main_exporter.set exp -let setup ?stop ?config ?(enable = true) ~sw env = - if enable then setup_ ~sw ?stop ?config env +let setup ?config ?(enable = true) ~sw env = + if enable then setup_ ~sw ?config env let remove_exporter () = let p, waker = Eio.Promise.create () in @@ -193,12 +193,12 @@ let remove_exporter () = let remove_backend = remove_exporter -let with_setup ?stop ?config ?(enable = true) f env = +let with_setup ?config ?(enable = true) f env = if enable then - Switch.run @@ fun sw -> + Eio.Switch.run @@ fun sw -> snd @@ Fiber.pair - (fun () -> setup_ ~sw ?stop ?config env) + (fun () -> setup_ ~sw ?config env) (fun () -> Fun.protect ~finally:(fun () -> remove_backend ()) f) else f () diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli index e3ccbe4e..1b8eb7e7 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli @@ -11,7 +11,6 @@ val set_headers : (string * string) list -> unit module Config = Config val create_consumer : - ?stop:bool Atomic.t -> ?config:Config.t -> sw:Eio.Switch.t -> env:Eio_unix.Stdenv.base -> @@ -20,7 +19,6 @@ val create_consumer : (** Consumer that pulls from a queue *) val create_exporter : - ?stop:bool Atomic.t -> ?config:Config.t -> sw:Eio.Switch.t -> env:Eio_unix.Stdenv.base -> @@ -29,7 +27,6 @@ val create_exporter : (** NOTE [after_cleanup] optional parameter removed @since 0.12 *) val create_backend : - ?stop:bool Atomic.t -> ?config:Config.t -> sw:Eio.Switch.t -> env:Eio_unix.Stdenv.base -> @@ -38,7 +35,6 @@ val create_backend : [@@deprecated "use create_exporter"] val setup : - ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> sw:Eio.Switch.t -> @@ -58,11 +54,6 @@ val remove_backend : unit -> unit @since 0.12 *) val with_setup : - ?stop:bool Atomic.t -> - ?config:Config.t -> - ?enable:bool -> - (unit -> 'a) -> - Eio_unix.Stdenv.base -> - 'a + ?config:Config.t -> ?enable:bool -> (unit -> 'a) -> Eio_unix.Stdenv.base -> 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 53cf515b..cab5fc3d 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -105,11 +105,11 @@ module Consumer_impl = Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt) (Httpc) -let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = - Consumer_impl.consumer ~ticker_task:(Some 0.5) ~stop ~config () +let create_consumer ?(config = Config.make ()) () = + Consumer_impl.consumer ~ticker_task:(Some 0.5) ~config () -let create_exporter ?stop ?(config = Config.make ()) () = - let consumer = create_consumer ?stop ~config () in +let create_exporter ?(config = Config.make ()) () = + let consumer = create_consumer ~config () in let bq = Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () @@ -119,13 +119,12 @@ let create_exporter ?stop ?(config = Config.make ()) () = let create_backend = create_exporter -let setup_ ?stop ?config () : unit = - let backend = create_backend ?stop ?config () in +let setup_ ?config () : unit = + let backend = create_backend ?config () in Main_exporter.set backend; () -let setup ?stop ?config ?(enable = true) () = - if enable then setup_ ?stop ?config () +let setup ?config ?(enable = true) () = if enable then setup_ ?config () let remove_exporter () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in @@ -134,11 +133,10 @@ let remove_exporter () : unit Lwt.t = let remove_backend = remove_exporter -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t - = +let with_setup ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t = if enable then ( let open Lwt.Syntax in - setup_ ?stop ~config (); + setup_ ~config (); Lwt.catch (fun () -> diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index 2f12121f..556f0e0e 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -3,8 +3,6 @@ https://opentelemetry.io/docs/reference/specification/protocol/exporter/ *) -open Common_ - val get_headers : unit -> (string * string) list val set_headers : (string * string) list -> unit @@ -13,22 +11,16 @@ val set_headers : (string * string) list -> unit module Config = Config val create_consumer : - ?stop:bool Atomic.t -> - ?config:Config.t -> - unit -> - Opentelemetry_client.Consumer.any_resource_builder + ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_resource_builder (** Consumer that pulls from a queue *) -val create_exporter : - ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_exporter : ?config:Config.t -> unit -> Opentelemetry.Exporter.t (** Create a new backend using lwt and ezcurl-lwt *) -val create_backend : - ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_backend : ?config:Config.t -> unit -> Opentelemetry.Exporter.t [@@deprecated "use create_exporter"] -val setup : - ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit +val setup : ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param enable actually setup the backend (default true). This can be used to @@ -43,11 +35,6 @@ val remove_backend : unit -> unit Lwt.t @since 0.12 *) val with_setup : - ?stop:bool Atomic.t -> - ?config:Config.t -> - ?enable:bool -> - unit -> - (unit -> 'a Lwt.t) -> - 'a Lwt.t + ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a Lwt.t) -> 'a Lwt.t (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *) diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml index 8db204aa..d860ae4a 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.ml @@ -76,11 +76,11 @@ module Consumer_impl = Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt) (Httpc) -let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = - Consumer_impl.consumer ~ticker_task:(Some 0.5) ~stop ~config () +let create_consumer ?(config = Config.make ()) () = + Consumer_impl.consumer ~ticker_task:(Some 0.5) ~config () -let create_exporter ?stop ?(config = Config.make ()) () = - let consumer = create_consumer ?stop ~config () in +let create_exporter ?(config = Config.make ()) () = + let consumer = create_consumer ~config () in let bq = Bounded_queue_sync.create ~high_watermark:Bounded_queue.Defaults.high_watermark () @@ -90,13 +90,12 @@ let create_exporter ?stop ?(config = Config.make ()) () = let create_backend = create_exporter -let setup_ ?stop ?config () : unit = - let exp = create_backend ?stop ?config () in +let setup_ ?config () : unit = + let exp = create_backend ?config () in Main_exporter.set exp; () -let setup ?stop ?config ?(enable = true) () = - if enable then setup_ ?stop ?config () +let setup ?config ?(enable = true) () = if enable then setup_ ?config () let remove_exporter () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in @@ -105,11 +104,10 @@ let remove_exporter () : unit Lwt.t = let remove_backend = remove_exporter -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t - = +let with_setup ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t = if enable then ( let open Lwt.Syntax in - setup_ ?stop ~config (); + setup_ ~config (); Lwt.catch (fun () -> diff --git a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli index 713ea70a..a5ea4ff1 100644 --- a/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli +++ b/src/client-ocurl-lwt/opentelemetry_client_ocurl_lwt.mli @@ -3,8 +3,6 @@ https://opentelemetry.io/docs/reference/specification/protocol/exporter/ *) -open Common_ - val get_headers : unit -> (string * string) list val set_headers : (string * string) list -> unit @@ -13,41 +11,27 @@ val set_headers : (string * string) list -> unit module Config = Config val create_consumer : - ?stop:bool Atomic.t -> - ?config:Config.t -> - unit -> - Opentelemetry_client.Consumer.any_resource_builder + ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_resource_builder (** Consumer that pulls from a queue *) -val create_exporter : - ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_exporter : ?config:Config.t -> unit -> Opentelemetry.Exporter.t (** Create a new backend using lwt and ezcurl-lwt *) -val create_backend : - ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_backend : ?config:Config.t -> unit -> Opentelemetry.Exporter.t [@@deprecated "use create_exporter"] -val setup : - ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit +val setup : ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param enable actually setup the backend (default true). This can be used to enable/disable the setup depending on CLI arguments or environment. - @param config configuration to use - @param stop - an atomic boolean. When it becomes true, background threads will all stop - after a little while. *) + @param config configuration to use *) val remove_backend : unit -> unit Lwt.t (** Shutdown current backend @since NEXT_RELEASE *) val with_setup : - ?stop:bool Atomic.t -> - ?config:Config.t -> - ?enable:bool -> - unit -> - (unit -> 'a Lwt.t) -> - 'a Lwt.t + ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a Lwt.t) -> 'a Lwt.t (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index de20c894..30248b54 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -5,8 +5,8 @@ module Config = Config module OTELC = Opentelemetry_client -open Common_ module OTEL = Opentelemetry +open Common_ let get_headers = Config.Env.get_headers @@ -81,7 +81,7 @@ end module Consumer_impl = OTELC.Generic_http_consumer.Make (IO) (Notifier) (Httpc) -let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () : +let consumer ?(config = Config.make ()) () : Opentelemetry_client.Consumer.any_resource_builder = let n_workers = max 2 (min 32 config.bg_threads) in let ticker_task = @@ -90,11 +90,11 @@ let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () : else None in - Consumer_impl.consumer ~override_n_workers:n_workers ~ticker_task ~stop + Consumer_impl.consumer ~override_n_workers:n_workers ~ticker_task ~config:config.common () -let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t = - let consumer = consumer ?stop ~config () in +let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t = + let consumer = consumer ~config () in let bq = OTELC.Bounded_queue_sync.create ~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark () @@ -105,9 +105,17 @@ let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t = let create_backend = create_exporter -let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () - : unit = - let exporter = create_exporter ~stop ~config () in +let shutdown_and_wait (self : OTEL.Exporter.t) : unit = + let open Opentelemetry_client in + let sq = Sync_queue.create () in + OTEL.Aswitch.on_turn_off (OTEL.Exporter.active self) (fun () -> + Printf.eprintf "ocurl: push queue\n%!"; + Sync_queue.push sq ()); + OTEL.Exporter.shutdown self; + Sync_queue.pop sq + +let setup_ ?(config : Config.t = Config.make ()) () : OTEL.Exporter.t = + let exporter = create_exporter ~config () in OTEL.Main_exporter.set exporter; OTELC.Self_trace.set_enabled config.common.self_trace; @@ -115,24 +123,29 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () if config.ticker_thread then ( (* at most a minute *) let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in + let active = OTEL.Exporter.active exporter in ignore - (OTELC.Util_thread.setup_ticker_thread ~stop ~sleep_ms exporter () + (OTELC.Util_thread.setup_ticker_thread ~active ~sleep_ms exporter () : Thread.t) - ) + ); + exporter let remove_exporter () : unit = - (* we don't need the callback, this runs in the same thread *) - OTEL.Main_exporter.remove () ~on_done:ignore + let open Opentelemetry_client in + (* used to wait *) + let sq = Sync_queue.create () in + OTEL.Main_exporter.remove () ~on_done:(fun () -> Sync_queue.push sq ()); + Sync_queue.pop sq let remove_backend = remove_exporter -let setup ?stop ?config ?(enable = true) () = - if enable then setup_ ?stop ?config () +let setup ?config ?(enable = true) () = + if enable then ignore (setup_ ?config () : OTEL.Exporter.t) -let with_setup ?stop ?config ?(enable = true) () f = +let with_setup ?config ?(enable = true) () f = if enable then ( - setup_ ?stop ?config (); - Fun.protect ~finally:remove_backend f + let exp = setup_ ?config () in + Fun.protect f ~finally:(fun () -> shutdown_and_wait exp) ) else f () diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index 5ac4fdd9..bebd486c 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -3,8 +3,6 @@ https://opentelemetry.io/docs/reference/specification/protocol/exporter/ *) -open Opentelemetry_atomic - val get_headers : unit -> (string * string) list val set_headers : (string * string) list -> unit @@ -16,22 +14,16 @@ val n_bytes_sent : unit -> int (** Global counter of bytes sent (or attempted to be sent) *) val consumer : - ?stop:bool Atomic.t -> - ?config:Config.t -> - unit -> - Opentelemetry_client.Consumer.any_resource_builder + ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_resource_builder (** Consumer that pulls from a queue *) -val create_exporter : - ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_exporter : ?config:Config.t -> unit -> Opentelemetry.Exporter.t (** @since NEXT_RELEASE *) -val create_backend : - ?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t +val create_backend : ?config:Config.t -> unit -> Opentelemetry.Exporter.t [@@deprecated "use create_exporter"] -val setup : - ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit +val setup : ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. @param enable actually setup the backend (default true). This can be used to @@ -48,12 +40,6 @@ val remove_backend : unit -> unit [@@deprecated "use remove_exporter"] (** @since 0.12 *) -val with_setup : - ?stop:bool Atomic.t -> - ?config:Config.t -> - ?enable:bool -> - unit -> - (unit -> 'a) -> - 'a +val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *)