fix: wait for cleanup in cohttp client

in `Opentelemetry_client_cohttp_lwt.with_setup` we should now wait for
the cleanup to be done, by sneaking in a `unit Lwt.u` that is only
resolved after the cleanup is done.

close #41
This commit is contained in:
Simon Cruanes 2024-10-17 15:06:45 -04:00
parent e789ecf3da
commit 55977b13d8
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 43 additions and 14 deletions

View file

@ -258,7 +258,8 @@ end
exceptions inside should be caught, see exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *) https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t)
() : (module EMITTER) =
let open Proto in let open Proto in
let open Lwt.Syntax in let open Lwt.Syntax in
(* local helpers *) (* local helpers *)
@ -448,6 +449,8 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Lwt.async (fun () -> Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc; Httpc.cleanup httpc;
(* resolve [after_cleanup], if provided *)
Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup;
Lwt.return ()) Lwt.return ())
end in end in
(module M) (module M)
@ -457,9 +460,13 @@ module Backend
val stop : bool Atomic.t val stop : bool Atomic.t
val config : Config.t val config : Config.t
val after_cleanup : unit Lwt.u option
end) end)
() : Opentelemetry.Collector.BACKEND = struct () : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) include
(val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop
~config:Arg.config ())
open Opentelemetry.Proto open Opentelemetry.Proto
open Opentelemetry.Collector open Opentelemetry.Collector
@ -551,7 +558,8 @@ module Backend
} }
end end
let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = let create_backend ?after_cleanup ?(stop = Atomic.make false)
?(config = Config.make ()) () =
debug_ := config.debug; debug_ := config.debug;
let module B = let module B =
@ -560,25 +568,43 @@ let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
let stop = stop let stop = stop
let config = config let config = config
let after_cleanup = after_cleanup
end) end)
() ()
in in
(module B : OT.Collector.BACKEND) (module B : OT.Collector.BACKEND)
let setup_ ?stop ?config () = let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t =
let backend = create_backend ?stop ?config () in let cleanup_done, cleanup_done_prom = Lwt.wait () in
let backend =
create_backend ~after_cleanup:cleanup_done_prom ?stop ?config ()
in
OT.Collector.set_backend backend; OT.Collector.set_backend backend;
OT.Collector.remove_backend
OT.Collector.remove_backend, cleanup_done
let setup ?stop ?config ?(enable = true) () = let setup ?stop ?config ?(enable = true) () =
if enable then ( if enable then (
let cleanup = setup_ ?stop ?config () in let cleanup, _lwt = setup_ ?stop ?config () in
at_exit cleanup at_exit cleanup
) )
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f = let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
if enable then ( =
let cleanup = setup_ ?stop ~config () in if enable then
Fun.protect ~finally:cleanup f let open Lwt.Syntax in
) else let cleanup, cleanup_done = setup_ ?stop ~config () in
Lwt.catch
(fun () ->
let* res = f () in
cleanup ();
let+ () = cleanup_done in
res)
(fun exn ->
cleanup ();
let* () = cleanup_done in
Lwt.reraise exn)
else
f () f ()

View file

@ -13,10 +13,13 @@ val set_headers : (string * string) list -> unit
module Config = Config module Config = Config
val create_backend : val create_backend :
?after_cleanup:unit Lwt.u ->
?stop:bool Atomic.t -> ?stop:bool Atomic.t ->
?config:Config.t -> ?config:Config.t ->
unit -> unit ->
(module Opentelemetry.Collector.BACKEND) (module Opentelemetry.Collector.BACKEND)
(** Create a new backend using lwt and cohttp
@param after_cleanup if provided, this is resolved into [()] after cleanup is done (since NEXT_RELEASE) *)
val setup : val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
@ -34,8 +37,8 @@ val with_setup :
?config:Config.t -> ?config:Config.t ->
?enable:bool -> ?enable:bool ->
unit -> unit ->
(unit -> 'a) -> (unit -> 'a Lwt.t) ->
'a 'a Lwt.t
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns after [f()] returns
See {!setup} for more details. *) See {!setup} for more details. *)