update client libraries, remove stop:bool atomic in favor of switches

This commit is contained in:
Simon Cruanes 2025-12-08 15:37:03 -05:00
parent 15268270df
commit 0bf561b586
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
8 changed files with 78 additions and 121 deletions

View file

@ -93,7 +93,7 @@ struct
(* send the content to the remote endpoint/path *) (* send the content to the remote endpoint/path *)
let send (client : t) ~url ~decode (body : string) : let send (client : t) ~url ~decode (body : string) :
('a, Export_error.t) result = ('a, Export_error.t) result =
Switch.run @@ fun sw -> Eio.Switch.run @@ fun sw ->
let uri = Uri.of_string url in let uri = Uri.of_string url in
let open Cohttp in let open Cohttp in
@ -158,18 +158,18 @@ struct
end end
end end
let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) ~sw let create_consumer ?(config = Config.make ()) ~sw ~env () :
~env () : Consumer.any_resource_builder = Consumer.any_resource_builder =
let module M = Make (struct let module M = Make (struct
let sw = sw let sw = sw
let env = env let env = env
end) in end) in
let module C = Generic_http_consumer.Make (M.IO) (M.Notifier) (M.Httpc) in let module C = Generic_http_consumer.Make (M.IO) (M.Notifier) (M.Httpc) in
C.consumer ~ticker_task:(Some 0.5) ~stop ~config () C.consumer ~ticker_task:(Some 0.5) ~config ()
let create_exporter ?stop ?(config = Config.make ()) ~sw ~env () = let create_exporter ?(config = Config.make ()) ~sw ~env () =
let consumer = create_consumer ?stop ~config ~sw ~env () in let consumer = create_consumer ~config ~sw ~env () in
let bq = let bq =
Bounded_queue_sync.create Bounded_queue_sync.create
~high_watermark:Bounded_queue.Defaults.high_watermark () ~high_watermark:Bounded_queue.Defaults.high_watermark ()
@ -179,12 +179,12 @@ let create_exporter ?stop ?(config = Config.make ()) ~sw ~env () =
let create_backend = create_exporter let create_backend = create_exporter
let setup_ ~sw ?stop ?config env : unit = let setup_ ~sw ?config env : unit =
let exp = create_exporter ?stop ?config ~sw ~env () in let exp = create_exporter ?config ~sw ~env () in
Main_exporter.set exp Main_exporter.set exp
let setup ?stop ?config ?(enable = true) ~sw env = let setup ?config ?(enable = true) ~sw env =
if enable then setup_ ~sw ?stop ?config env if enable then setup_ ~sw ?config env
let remove_exporter () = let remove_exporter () =
let p, waker = Eio.Promise.create () in let p, waker = Eio.Promise.create () in
@ -193,12 +193,12 @@ let remove_exporter () =
let remove_backend = remove_exporter let remove_backend = remove_exporter
let with_setup ?stop ?config ?(enable = true) f env = let with_setup ?config ?(enable = true) f env =
if enable then if enable then
Switch.run @@ fun sw -> Eio.Switch.run @@ fun sw ->
snd snd
@@ Fiber.pair @@ Fiber.pair
(fun () -> setup_ ~sw ?stop ?config env) (fun () -> setup_ ~sw ?config env)
(fun () -> Fun.protect ~finally:(fun () -> remove_backend ()) f) (fun () -> Fun.protect ~finally:(fun () -> remove_backend ()) f)
else else
f () f ()

View file

@ -11,7 +11,6 @@ val set_headers : (string * string) list -> unit
module Config = Config module Config = Config
val create_consumer : val create_consumer :
?stop:bool Atomic.t ->
?config:Config.t -> ?config:Config.t ->
sw:Eio.Switch.t -> sw:Eio.Switch.t ->
env:Eio_unix.Stdenv.base -> env:Eio_unix.Stdenv.base ->
@ -20,7 +19,6 @@ val create_consumer :
(** Consumer that pulls from a queue *) (** Consumer that pulls from a queue *)
val create_exporter : val create_exporter :
?stop:bool Atomic.t ->
?config:Config.t -> ?config:Config.t ->
sw:Eio.Switch.t -> sw:Eio.Switch.t ->
env:Eio_unix.Stdenv.base -> env:Eio_unix.Stdenv.base ->
@ -29,7 +27,6 @@ val create_exporter :
(** NOTE [after_cleanup] optional parameter removed @since 0.12 *) (** NOTE [after_cleanup] optional parameter removed @since 0.12 *)
val create_backend : val create_backend :
?stop:bool Atomic.t ->
?config:Config.t -> ?config:Config.t ->
sw:Eio.Switch.t -> sw:Eio.Switch.t ->
env:Eio_unix.Stdenv.base -> env:Eio_unix.Stdenv.base ->
@ -38,7 +35,6 @@ val create_backend :
[@@deprecated "use create_exporter"] [@@deprecated "use create_exporter"]
val setup : val setup :
?stop:bool Atomic.t ->
?config:Config.t -> ?config:Config.t ->
?enable:bool -> ?enable:bool ->
sw:Eio.Switch.t -> sw:Eio.Switch.t ->
@ -58,11 +54,6 @@ val remove_backend : unit -> unit
@since 0.12 *) @since 0.12 *)
val with_setup : val with_setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> (unit -> 'a) -> Eio_unix.Stdenv.base -> 'a
?config:Config.t ->
?enable:bool ->
(unit -> 'a) ->
Eio_unix.Stdenv.base ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
[f()] returns See {!setup} for more details. *) [f()] returns See {!setup} for more details. *)

View file

@ -105,11 +105,11 @@ module Consumer_impl =
Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt) Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt)
(Httpc) (Httpc)
let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = let create_consumer ?(config = Config.make ()) () =
Consumer_impl.consumer ~ticker_task:(Some 0.5) ~stop ~config () Consumer_impl.consumer ~ticker_task:(Some 0.5) ~config ()
let create_exporter ?stop ?(config = Config.make ()) () = let create_exporter ?(config = Config.make ()) () =
let consumer = create_consumer ?stop ~config () in let consumer = create_consumer ~config () in
let bq = let bq =
Bounded_queue_sync.create Bounded_queue_sync.create
~high_watermark:Bounded_queue.Defaults.high_watermark () ~high_watermark:Bounded_queue.Defaults.high_watermark ()
@ -119,13 +119,12 @@ let create_exporter ?stop ?(config = Config.make ()) () =
let create_backend = create_exporter let create_backend = create_exporter
let setup_ ?stop ?config () : unit = let setup_ ?config () : unit =
let backend = create_backend ?stop ?config () in let backend = create_backend ?config () in
Main_exporter.set backend; Main_exporter.set backend;
() ()
let setup ?stop ?config ?(enable = true) () = let setup ?config ?(enable = true) () = if enable then setup_ ?config ()
if enable then setup_ ?stop ?config ()
let remove_exporter () : unit Lwt.t = let remove_exporter () : unit Lwt.t =
let done_fut, done_u = Lwt.wait () in let done_fut, done_u = Lwt.wait () in
@ -134,11 +133,10 @@ let remove_exporter () : unit Lwt.t =
let remove_backend = remove_exporter let remove_backend = remove_exporter
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t let with_setup ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t =
=
if enable then ( if enable then (
let open Lwt.Syntax in let open Lwt.Syntax in
setup_ ?stop ~config (); setup_ ~config ();
Lwt.catch Lwt.catch
(fun () -> (fun () ->

View file

@ -3,8 +3,6 @@
https://opentelemetry.io/docs/reference/specification/protocol/exporter/ https://opentelemetry.io/docs/reference/specification/protocol/exporter/
*) *)
open Common_
val get_headers : unit -> (string * string) list val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit val set_headers : (string * string) list -> unit
@ -13,22 +11,16 @@ val set_headers : (string * string) list -> unit
module Config = Config module Config = Config
val create_consumer : val create_consumer :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_resource_builder
?config:Config.t ->
unit ->
Opentelemetry_client.Consumer.any_resource_builder
(** Consumer that pulls from a queue *) (** Consumer that pulls from a queue *)
val create_exporter : val create_exporter : ?config:Config.t -> unit -> Opentelemetry.Exporter.t
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
(** Create a new backend using lwt and ezcurl-lwt *) (** Create a new backend using lwt and ezcurl-lwt *)
val create_backend : val create_backend : ?config:Config.t -> unit -> Opentelemetry.Exporter.t
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
[@@deprecated "use create_exporter"] [@@deprecated "use create_exporter"]
val setup : val setup : ?config:Config.t -> ?enable:bool -> unit -> unit
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable @param enable
actually setup the backend (default true). This can be used to actually setup the backend (default true). This can be used to
@ -43,11 +35,6 @@ val remove_backend : unit -> unit Lwt.t
@since 0.12 *) @since 0.12 *)
val with_setup : val with_setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a Lwt.t) -> 'a Lwt.t
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a Lwt.t) ->
'a Lwt.t
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
[f()] returns See {!setup} for more details. *) [f()] returns See {!setup} for more details. *)

View file

@ -76,11 +76,11 @@ module Consumer_impl =
Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt) Generic_http_consumer.Make (IO) (Opentelemetry_client_lwt.Notifier_lwt)
(Httpc) (Httpc)
let create_consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () = let create_consumer ?(config = Config.make ()) () =
Consumer_impl.consumer ~ticker_task:(Some 0.5) ~stop ~config () Consumer_impl.consumer ~ticker_task:(Some 0.5) ~config ()
let create_exporter ?stop ?(config = Config.make ()) () = let create_exporter ?(config = Config.make ()) () =
let consumer = create_consumer ?stop ~config () in let consumer = create_consumer ~config () in
let bq = let bq =
Bounded_queue_sync.create Bounded_queue_sync.create
~high_watermark:Bounded_queue.Defaults.high_watermark () ~high_watermark:Bounded_queue.Defaults.high_watermark ()
@ -90,13 +90,12 @@ let create_exporter ?stop ?(config = Config.make ()) () =
let create_backend = create_exporter let create_backend = create_exporter
let setup_ ?stop ?config () : unit = let setup_ ?config () : unit =
let exp = create_backend ?stop ?config () in let exp = create_backend ?config () in
Main_exporter.set exp; Main_exporter.set exp;
() ()
let setup ?stop ?config ?(enable = true) () = let setup ?config ?(enable = true) () = if enable then setup_ ?config ()
if enable then setup_ ?stop ?config ()
let remove_exporter () : unit Lwt.t = let remove_exporter () : unit Lwt.t =
let done_fut, done_u = Lwt.wait () in let done_fut, done_u = Lwt.wait () in
@ -105,11 +104,10 @@ let remove_exporter () : unit Lwt.t =
let remove_backend = remove_exporter let remove_backend = remove_exporter
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t let with_setup ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t =
=
if enable then ( if enable then (
let open Lwt.Syntax in let open Lwt.Syntax in
setup_ ?stop ~config (); setup_ ~config ();
Lwt.catch Lwt.catch
(fun () -> (fun () ->

View file

@ -3,8 +3,6 @@
https://opentelemetry.io/docs/reference/specification/protocol/exporter/ https://opentelemetry.io/docs/reference/specification/protocol/exporter/
*) *)
open Common_
val get_headers : unit -> (string * string) list val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit val set_headers : (string * string) list -> unit
@ -13,41 +11,27 @@ val set_headers : (string * string) list -> unit
module Config = Config module Config = Config
val create_consumer : val create_consumer :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_resource_builder
?config:Config.t ->
unit ->
Opentelemetry_client.Consumer.any_resource_builder
(** Consumer that pulls from a queue *) (** Consumer that pulls from a queue *)
val create_exporter : val create_exporter : ?config:Config.t -> unit -> Opentelemetry.Exporter.t
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
(** Create a new backend using lwt and ezcurl-lwt *) (** Create a new backend using lwt and ezcurl-lwt *)
val create_backend : val create_backend : ?config:Config.t -> unit -> Opentelemetry.Exporter.t
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
[@@deprecated "use create_exporter"] [@@deprecated "use create_exporter"]
val setup : val setup : ?config:Config.t -> ?enable:bool -> unit -> unit
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable @param enable
actually setup the backend (default true). This can be used to actually setup the backend (default true). This can be used to
enable/disable the setup depending on CLI arguments or environment. enable/disable the setup depending on CLI arguments or environment.
@param config configuration to use @param config configuration to use *)
@param stop
an atomic boolean. When it becomes true, background threads will all stop
after a little while. *)
val remove_backend : unit -> unit Lwt.t val remove_backend : unit -> unit Lwt.t
(** Shutdown current backend (** Shutdown current backend
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
val with_setup : val with_setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a Lwt.t) -> 'a Lwt.t
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a Lwt.t) ->
'a Lwt.t
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
[f()] returns See {!setup} for more details. *) [f()] returns See {!setup} for more details. *)

View file

@ -5,8 +5,8 @@
module Config = Config module Config = Config
module OTELC = Opentelemetry_client module OTELC = Opentelemetry_client
open Common_
module OTEL = Opentelemetry module OTEL = Opentelemetry
open Common_
let get_headers = Config.Env.get_headers let get_headers = Config.Env.get_headers
@ -81,7 +81,7 @@ end
module Consumer_impl = OTELC.Generic_http_consumer.Make (IO) (Notifier) (Httpc) module Consumer_impl = OTELC.Generic_http_consumer.Make (IO) (Notifier) (Httpc)
let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () : let consumer ?(config = Config.make ()) () :
Opentelemetry_client.Consumer.any_resource_builder = Opentelemetry_client.Consumer.any_resource_builder =
let n_workers = max 2 (min 32 config.bg_threads) in let n_workers = max 2 (min 32 config.bg_threads) in
let ticker_task = let ticker_task =
@ -90,11 +90,11 @@ let consumer ?(stop = Atomic.make false) ?(config = Config.make ()) () :
else else
None None
in in
Consumer_impl.consumer ~override_n_workers:n_workers ~ticker_task ~stop Consumer_impl.consumer ~override_n_workers:n_workers ~ticker_task
~config:config.common () ~config:config.common ()
let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t = let create_exporter ?(config = Config.make ()) () : OTEL.Exporter.t =
let consumer = consumer ?stop ~config () in let consumer = consumer ~config () in
let bq = let bq =
OTELC.Bounded_queue_sync.create OTELC.Bounded_queue_sync.create
~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark () ~high_watermark:OTELC.Bounded_queue.Defaults.high_watermark ()
@ -105,9 +105,17 @@ let create_exporter ?stop ?(config = Config.make ()) () : OTEL.Exporter.t =
let create_backend = create_exporter let create_backend = create_exporter
let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () let shutdown_and_wait (self : OTEL.Exporter.t) : unit =
: unit = let open Opentelemetry_client in
let exporter = create_exporter ~stop ~config () in let sq = Sync_queue.create () in
OTEL.Aswitch.on_turn_off (OTEL.Exporter.active self) (fun () ->
Printf.eprintf "ocurl: push queue\n%!";
Sync_queue.push sq ());
OTEL.Exporter.shutdown self;
Sync_queue.pop sq
let setup_ ?(config : Config.t = Config.make ()) () : OTEL.Exporter.t =
let exporter = create_exporter ~config () in
OTEL.Main_exporter.set exporter; OTEL.Main_exporter.set exporter;
OTELC.Self_trace.set_enabled config.common.self_trace; OTELC.Self_trace.set_enabled config.common.self_trace;
@ -115,24 +123,29 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
if config.ticker_thread then ( if config.ticker_thread then (
(* at most a minute *) (* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
let active = OTEL.Exporter.active exporter in
ignore ignore
(OTELC.Util_thread.setup_ticker_thread ~stop ~sleep_ms exporter () (OTELC.Util_thread.setup_ticker_thread ~active ~sleep_ms exporter ()
: Thread.t) : Thread.t)
) );
exporter
let remove_exporter () : unit = let remove_exporter () : unit =
(* we don't need the callback, this runs in the same thread *) let open Opentelemetry_client in
OTEL.Main_exporter.remove () ~on_done:ignore (* used to wait *)
let sq = Sync_queue.create () in
OTEL.Main_exporter.remove () ~on_done:(fun () -> Sync_queue.push sq ());
Sync_queue.pop sq
let remove_backend = remove_exporter let remove_backend = remove_exporter
let setup ?stop ?config ?(enable = true) () = let setup ?config ?(enable = true) () =
if enable then setup_ ?stop ?config () if enable then ignore (setup_ ?config () : OTEL.Exporter.t)
let with_setup ?stop ?config ?(enable = true) () f = let with_setup ?config ?(enable = true) () f =
if enable then ( if enable then (
setup_ ?stop ?config (); let exp = setup_ ?config () in
Fun.protect ~finally:remove_backend f Fun.protect f ~finally:(fun () -> shutdown_and_wait exp)
) else ) else
f () f ()

View file

@ -3,8 +3,6 @@
https://opentelemetry.io/docs/reference/specification/protocol/exporter/ https://opentelemetry.io/docs/reference/specification/protocol/exporter/
*) *)
open Opentelemetry_atomic
val get_headers : unit -> (string * string) list val get_headers : unit -> (string * string) list
val set_headers : (string * string) list -> unit val set_headers : (string * string) list -> unit
@ -16,22 +14,16 @@ val n_bytes_sent : unit -> int
(** Global counter of bytes sent (or attempted to be sent) *) (** Global counter of bytes sent (or attempted to be sent) *)
val consumer : val consumer :
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_resource_builder
?config:Config.t ->
unit ->
Opentelemetry_client.Consumer.any_resource_builder
(** Consumer that pulls from a queue *) (** Consumer that pulls from a queue *)
val create_exporter : val create_exporter : ?config:Config.t -> unit -> Opentelemetry.Exporter.t
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
(** @since NEXT_RELEASE *) (** @since NEXT_RELEASE *)
val create_backend : val create_backend : ?config:Config.t -> unit -> Opentelemetry.Exporter.t
?stop:bool Atomic.t -> ?config:Config.t -> unit -> Opentelemetry.Exporter.t
[@@deprecated "use create_exporter"] [@@deprecated "use create_exporter"]
val setup : val setup : ?config:Config.t -> ?enable:bool -> unit -> unit
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable @param enable
actually setup the backend (default true). This can be used to actually setup the backend (default true). This can be used to
@ -48,12 +40,6 @@ val remove_backend : unit -> unit
[@@deprecated "use remove_exporter"] [@@deprecated "use remove_exporter"]
(** @since 0.12 *) (** @since 0.12 *)
val with_setup : val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a
?stop:bool Atomic.t ->
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a) ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
[f()] returns See {!setup} for more details. *) [f()] returns See {!setup} for more details. *)