breaking: change Collector.cleanup so it takes a callback

this callback can be used to resolve a Lwt future, for example, to make
sure we indeed wait for the cleanup to be done before exiting.
This commit is contained in:
Simon Cruanes 2025-04-17 16:09:14 -04:00
parent 6a378e49ce
commit 5788492946
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 100 additions and 93 deletions

View file

@ -9,8 +9,8 @@ open Opentelemetry
include Common_ include Common_
external reraise : exn -> 'a = "%reraise" external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
to use Lwt's latest version *) use Lwt's latest version *)
let needs_gc_metrics = Atomic.make false let needs_gc_metrics = Atomic.make false
@ -133,7 +133,8 @@ end = struct
let bt = Printexc.get_backtrace () in let bt = Printexc.get_backtrace () in
Error Error
(`Failure (`Failure
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt)) (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e)
bt))
in in
Lwt.return r Lwt.return r
) else ( ) else (
@ -147,12 +148,12 @@ end = struct
let bt = Printexc.get_backtrace () in let bt = Printexc.get_backtrace () in
Error Error
(`Failure (`Failure
(spf (spf
"httpc: decoding of status (url=%S, code=%d) failed with:\n\ "httpc: decoding of status (url=%S, code=%d) failed with:\n\
%s\n\ %s\n\
status: %S\n\ status: %S\n\
%s" %s"
url code (Printexc.to_string e) body bt)) url code (Printexc.to_string e) body bt))
in in
Lwt.return r Lwt.return r
) )
@ -167,10 +168,10 @@ module Batch : sig
val push' : 'a t -> 'a -> unit val push' : 'a t -> 'a -> unit
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled, (** Is the batch ready to be emitted? If batching is disabled, this is true as
this is true as soon as {!is_empty} is false. If a timeout is provided soon as {!is_empty} is false. If a timeout is provided for this batch,
for this batch, then it will be ready if an element has been in it then it will be ready if an element has been in it for at least the
for at least the timeout. timeout.
@param now passed to implement timeout *) @param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
@ -255,15 +256,14 @@ module type EMITTER = sig
val tick : unit -> unit val tick : unit -> unit
val cleanup : unit -> unit val cleanup : on_done:(unit -> unit) -> unit -> unit
end end
(* make an emitter. (* make an emitter.
exceptions inside should be caught, see exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *) https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t) let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
() : (module EMITTER) =
let open Proto in let open Proto in
let open Lwt.Syntax in let open Lwt.Syntax in
(* local helpers *) (* local helpers *)
@ -448,13 +448,12 @@ let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t)
(* if called in a blocking context: work in the background *) (* if called in a blocking context: work in the background *)
let tick () = Lwt.async tick_ let tick () = Lwt.async tick_
let cleanup () = let cleanup ~on_done () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
Lwt.async (fun () -> Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc; Httpc.cleanup httpc;
(* resolve [after_cleanup], if provided *) on_done ();
Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup;
Lwt.return ()) Lwt.return ())
end in end in
(module M) (module M)
@ -464,13 +463,9 @@ module Backend
val stop : bool Atomic.t val stop : bool Atomic.t
val config : Config.t val config : Config.t
val after_cleanup : unit Lwt.u option
end) end)
() : Opentelemetry.Collector.BACKEND = struct () : Opentelemetry.Collector.BACKEND = struct
include include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
(val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop
~config:Arg.config ())
open Opentelemetry.Proto open Opentelemetry.Proto
open Opentelemetry.Collector open Opentelemetry.Collector
@ -562,8 +557,7 @@ module Backend
} }
end end
let create_backend ?after_cleanup ?(stop = Atomic.make false) let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
?(config = Config.make ()) () =
debug_ := config.debug; debug_ := config.debug;
let module B = let module B =
@ -572,43 +566,37 @@ let create_backend ?after_cleanup ?(stop = Atomic.make false)
let stop = stop let stop = stop
let config = config let config = config
let after_cleanup = after_cleanup
end) end)
() ()
in in
(module B : OT.Collector.BACKEND) (module B : OT.Collector.BACKEND)
let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t = let setup_ ?stop ?config () : unit =
let cleanup_done, cleanup_done_prom = Lwt.wait () in let backend = create_backend ?stop ?config () in
let backend =
create_backend ~after_cleanup:cleanup_done_prom ?stop ?config ()
in
OT.Collector.set_backend backend; OT.Collector.set_backend backend;
()
OT.Collector.remove_backend, cleanup_done
let setup ?stop ?config ?(enable = true) () = let setup ?stop ?config ?(enable = true) () =
if enable then ( if enable then setup_ ?stop ?config ()
let cleanup, _lwt = setup_ ?stop ?config () in
at_exit cleanup let remove_backend () : unit Lwt.t =
) let done_fut, done_u = Lwt.wait () in
OT.Collector.remove_backend ~on_done:(fun () -> Lwt.wakeup_later done_u ()) ();
done_fut
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
= =
if enable then if enable then (
let open Lwt.Syntax in let open Lwt.Syntax in
let cleanup, cleanup_done = setup_ ?stop ~config () in setup_ ?stop ~config ();
Lwt.catch Lwt.catch
(fun () -> (fun () ->
let* res = f () in let* res = f () in
cleanup (); let+ () = remove_backend () in
let+ () = cleanup_done in
res) res)
(fun exn -> (fun exn ->
cleanup (); let* () = remove_backend () in
let* () = cleanup_done in
reraise exn) reraise exn)
else ) else
f () f ()

View file

@ -13,24 +13,29 @@ val set_headers : (string * string) list -> unit
module Config = Config module Config = Config
val create_backend : val create_backend :
?after_cleanup:unit Lwt.u ->
?stop:bool Atomic.t -> ?stop:bool Atomic.t ->
?config:Config.t -> ?config:Config.t ->
unit -> unit ->
(module Opentelemetry.Collector.BACKEND) (module Opentelemetry.Collector.BACKEND)
(** Create a new backend using lwt and cohttp (** Create a new backend using lwt and cohttp
@param after_cleanup if provided, this is resolved into [()] after cleanup is done (since 0.11) *) @param after_cleanup
if provided, this is resolved into [()] after cleanup is done (since 0.11)
*)
val setup : val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can @param enable
be used to enable/disable the setup depending on CLI arguments actually setup the backend (default true). This can be used to
or environment. enable/disable the setup depending on CLI arguments or environment.
@param config configuration to use @param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads @param stop
will all stop after a little while. an atomic boolean. When it becomes true, background threads will all stop
*) after a little while. *)
val remove_backend : unit -> unit Lwt.t
(** Shutdown current backend
@since NEXT_RELEASE *)
val with_setup : val with_setup :
?stop:bool Atomic.t -> ?stop:bool Atomic.t ->
@ -39,6 +44,5 @@ val with_setup :
unit -> unit ->
(unit -> 'a Lwt.t) -> (unit -> 'a Lwt.t) ->
'a Lwt.t 'a Lwt.t
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
after [f()] returns [f()] returns See {!setup} for more details. *)
See {!setup} for more details. *)

View file

@ -39,9 +39,9 @@ module Self_trace = struct
) )
end end
(** capture current GC metrics if {!needs_gc_metrics} is true (** capture current GC metrics if {!needs_gc_metrics} is true or it has been a
or it has been a long time since the last GC metrics collection, long time since the last GC metrics collection, and push them into
and push them into {!gc_metrics} for later collection *) {!gc_metrics} for later collection *)
let sample_gc_metrics_if_needed () = let sample_gc_metrics_if_needed () =
let now = Mtime_clock.now () in let now = Mtime_clock.now () in
let alarm = Atomic.exchange needs_gc_metrics false in let alarm = Atomic.exchange needs_gc_metrics false in
@ -102,7 +102,12 @@ let start_bg_thread (f : unit -> unit) : Thread.t =
f () f ()
in in
(* no signals on Windows *) (* no signals on Windows *)
let run () = if Sys.win32 then f () else unix_run () in let run () =
if Sys.win32 then
f ()
else
unix_run ()
in
Thread.create run () Thread.create run ()
let str_to_hex (s : string) : string = let str_to_hex (s : string) : string =
@ -128,7 +133,7 @@ module Backend_impl : sig
val send_event : t -> Event.t -> unit val send_event : t -> Event.t -> unit
val shutdown : t -> unit val shutdown : t -> on_done:(unit -> unit) -> unit
end = struct end = struct
open Opentelemetry.Proto open Opentelemetry.Proto
@ -250,8 +255,8 @@ end = struct
let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev
(** Thread that, in a loop, reads from [q] to get the (** Thread that, in a loop, reads from [q] to get the next message to send via
next message to send via http *) http *)
let bg_thread_loop (self : t) : unit = let bg_thread_loop (self : t) : unit =
Ezcurl.with_client ?set_opts:None @@ fun client -> Ezcurl.with_client ?set_opts:None @@ fun client ->
let stop = self.stop in let stop = self.stop in
@ -379,7 +384,7 @@ end = struct
self self
let shutdown self : unit = let shutdown self ~on_done : unit =
Atomic.set self.stop true; Atomic.set self.stop true;
if not (Atomic.exchange self.cleaned true) then ( if not (Atomic.exchange self.cleaned true) then (
(* empty batches *) (* empty batches *)
@ -392,7 +397,8 @@ end = struct
(* close send queues, then wait for all threads *) (* close send queues, then wait for all threads *)
B_queue.close self.send_q; B_queue.close self.send_q;
Array.iter Thread.join self.send_threads Array.iter Thread.join self.send_threads
) );
on_done ()
end end
let create_backend ?(stop = Atomic.make false) let create_backend ?(stop = Atomic.make false)
@ -480,7 +486,7 @@ let create_backend ?(stop = Atomic.make false)
Backend_impl.send_event backend Event.E_tick; Backend_impl.send_event backend Event.E_tick;
List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_) List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_)
let cleanup () = Backend_impl.shutdown backend let cleanup ~on_done () = Backend_impl.shutdown backend ~on_done
end in end in
(module M) (module M)
@ -498,7 +504,7 @@ let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () =
start_bg_thread tick_loop start_bg_thread tick_loop
let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
= : unit =
let backend = create_backend ~stop ~config () in let backend = create_backend ~stop ~config () in
Opentelemetry.Collector.set_backend backend; Opentelemetry.Collector.set_backend backend;
@ -508,18 +514,18 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
(* at most a minute *) (* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
); )
OT.Collector.remove_backend
let remove_backend () : unit =
(* we don't need the callback, this runs in the same thread *)
OT.Collector.remove_backend () ~on_done:ignore
let setup ?stop ?config ?(enable = true) () = let setup ?stop ?config ?(enable = true) () =
if enable then ( if enable then setup_ ?stop ?config ()
let cleanup = setup_ ?stop ?config () in
at_exit cleanup
)
let with_setup ?stop ?config ?(enable = true) () f = let with_setup ?stop ?config ?(enable = true) () f =
if enable then ( if enable then (
let cleanup = setup_ ?stop ?config () in setup_ ?stop ?config ();
Fun.protect ~finally:cleanup f Fun.protect ~finally:remove_backend f
) else ) else
f () f ()

View file

@ -20,13 +20,16 @@ val create_backend :
val setup : val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can @param enable
be used to enable/disable the setup depending on CLI arguments actually setup the backend (default true). This can be used to
or environment. enable/disable the setup depending on CLI arguments or environment.
@param config configuration to use @param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads @param stop
will all stop after a little while. an atomic boolean. When it becomes true, background threads will all stop
*) after a little while. *)
val remove_backend : unit -> unit
(** @since NEXT_RELEASE *)
val with_setup : val with_setup :
?stop:bool Atomic.t -> ?stop:bool Atomic.t ->
@ -35,6 +38,5 @@ val with_setup :
unit -> unit ->
(unit -> 'a) -> (unit -> 'a) ->
'a 'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
after [f()] returns [f()] returns See {!setup} for more details. *)
See {!setup} for more details. *)

View file

@ -89,7 +89,11 @@ module Collector = struct
the collector's implementation, it might be called from a thread that is the collector's implementation, it might be called from a thread that is
not the one that called [on_tick]. *) not the one that called [on_tick]. *)
val cleanup : unit -> unit val cleanup : on_done:(unit -> unit) -> unit -> unit
(** [cleanup ~on_done ()] is called when the collector is shut down, and is
responsible for sending remaining batches, flushing sockets, etc.
@param on_done
callback invoked after the cleanup is done. since NEXT_RELEASE *)
end end
type backend = (module BACKEND) type backend = (module BACKEND)
@ -110,7 +114,9 @@ module Collector = struct
let set_on_tick_callbacks _cbs = () let set_on_tick_callbacks _cbs = ()
let cleanup () = () let cleanup ~on_done () =
on_done ();
()
end end
module Debug_backend (B : BACKEND) : BACKEND = struct module Debug_backend (B : BACKEND) : BACKEND = struct
@ -152,7 +158,7 @@ module Collector = struct
let set_on_tick_callbacks cbs = B.set_on_tick_callbacks cbs let set_on_tick_callbacks cbs = B.set_on_tick_callbacks cbs
let cleanup () = B.cleanup () let cleanup ~on_done () = B.cleanup ~on_done ()
end end
let debug_backend : backend = (module Debug_backend (Noop_backend)) let debug_backend : backend = (module Debug_backend (Noop_backend))
@ -171,13 +177,14 @@ module Collector = struct
Atomic.set backend (Some b) Atomic.set backend (Some b)
(** Remove current backend, if any. (** Remove current backend, if any.
@since 0.11 *) @since 0.11
let remove_backend () : unit = @param on_done see {!BACKEND.cleanup}, since NEXT_RELEASE *)
let remove_backend ~on_done () : unit =
match Atomic.exchange backend None with match Atomic.exchange backend None with
| None -> () | None -> ()
| Some (module B) -> | Some (module B) ->
B.tick (); B.tick ();
B.cleanup () B.cleanup ~on_done ()
(** Is there a configured backend? *) (** Is there a configured backend? *)
let[@inline] has_backend () : bool = Atomic.get backend != None let[@inline] has_backend () : bool = Atomic.get backend != None
@ -213,11 +220,11 @@ module Collector = struct
| None -> () | None -> ()
| Some (module B) -> B.tick () | Some (module B) -> B.tick ()
let with_setup_debug_backend b ?(enable = true) () f = let with_setup_debug_backend ?(on_done = ignore) b ?(enable = true) () f =
let (module B : BACKEND) = b in let (module B : BACKEND) = b in
if enable then ( if enable then (
set_backend b; set_backend b;
Fun.protect ~finally:B.cleanup f Fun.protect ~finally:(B.cleanup ~on_done) f
) else ) else
f () f ()
end end