diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index bd78c983..b5d0e4ff 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -1,3 +1,5 @@ +open Eio.Std + (* https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md @@ -79,7 +81,7 @@ module Httpc : sig url:string -> decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> string -> - ('a, error) result Lwt.t + ('a, error) result val cleanup : t -> unit end = struct @@ -94,7 +96,8 @@ end = struct let cleanup _self = () (* send the content to the remote endpoint/path *) - let send (_self : t) ~url ~decode (bod : string) : ('a, error) result Lwt.t = + let send (_self : t) ~url ~decode (bod : string) : + ('a, error) result Promise.t = let uri = Uri.of_string url in let open Cohttp in @@ -104,12 +107,11 @@ end = struct in let body = Cohttp_lwt.Body.of_string bod in - - let* r = + let r = try%lwt - let+ r = Httpc.post ~headers ~body uri in + let r = Httpc.post ~headers ~body uri in Ok r - with e -> Lwt.return @@ Error e + with e -> Error e in match r with | Error e -> @@ -118,13 +120,13 @@ end = struct (spf "sending signals via http POST to %S\nfailed with:\n%s" url (Printexc.to_string e)) in - Lwt.return @@ Error err + Error err | Ok (resp, body) -> - let* body = Cohttp_lwt.Body.to_string body in + let body = Cohttp_lwt.Body.to_string body in let code = Response.status resp |> Code.code_of_status in if not (Code.is_error code) then ( match decode with - | `Ret x -> Lwt.return @@ Ok x + | `Ret x -> Ok x | `Dec f -> let dec = Pbrt.Decoder.of_string body in let r = @@ -136,7 +138,7 @@ end = struct (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt)) in - Lwt.return r + r ) else ( let dec = Pbrt.Decoder.of_string body in @@ -155,7 +157,7 @@ end = struct %s" url code (Printexc.to_string e) body bt)) in - Lwt.return r + r ) end @@ -287,23 +289,23 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Lwt.t = + let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Promise.t = Pbrt.Encoder.reset encoder; encode x encoder; let data = Pbrt.Encoder.to_string encoder in - let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in + let r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with - | Ok () -> Lwt.return () + | Ok () -> () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set stop true; - Lwt.return () + Atomic.set stop true | Error err -> (* TODO: log error _via_ otel? *) Atomic.incr n_errors; report_err_ err; - (* avoid crazy error loop *) - Lwt_unix.sleep 3. + Eio_unix.sleep + (* avoid crazy error loop *) + 3. let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) = @@ -335,27 +337,27 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = ~encode:Logs_service.encode_pb_export_logs_service_request x (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc encoder : bool Lwt.t = + let emit_metrics_maybe ~now ?force httpc encoder : bool Promise.t = match Batch.pop_if_ready ?force ~now batch_metrics with - | None -> Lwt.return false + | None -> false | Some l -> let batch = !gc_metrics :: l in gc_metrics := []; - let+ () = send_metrics_http httpc encoder batch in + let () = send_metrics_http httpc encoder batch in true - let emit_traces_maybe ~now ?force httpc encoder : bool Lwt.t = + let emit_traces_maybe ~now ?force httpc encoder : bool Promise.t = match Batch.pop_if_ready ?force ~now batch_traces with - | None -> Lwt.return false + | None -> false | Some l -> - let+ () = send_traces_http httpc encoder l in + let () = send_traces_http httpc encoder l in true - let emit_logs_maybe ~now ?force httpc encoder : bool Lwt.t = + let emit_logs_maybe ~now ?force httpc encoder : bool Promise.t = match Batch.pop_if_ready ?force ~now batch_logs with - | None -> Lwt.return false + | None -> false | Some l -> - let+ () = send_logs_http httpc encoder l in + let () = send_logs_http httpc encoder l in true let[@inline] guard_exn_ where f = @@ -366,11 +368,15 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where (Printexc.to_string e) bt - let emit_all_force (httpc : Httpc.t) encoder : unit Lwt.t = + let emit_all_force (httpc : Httpc.t) encoder : unit Promise.t = let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc encoder - and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc encoder - and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc encoder in + let (_ : bool), ((_ : bool), (_ : bool)) = + Fiber.pair + (emit_logs_maybe ~now ~force:true httpc encoder) + (Fiber.pair + (emit_metrics_maybe ~now ~force:true httpc encoder) + (emit_traces_maybe ~now ~force:true httpc encoder)) + in () let tick_common_ () = @@ -388,15 +394,17 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = (* thread that calls [tick()] regularly, to help enforce timeouts *) let setup_ticker_thread ~tick ~finally () = let rec tick_thread () = - if Atomic.get stop then ( - finally (); - Lwt.return () - ) else - let* () = Lwt_unix.sleep 0.5 in - let* () = tick () in + if Atomic.get stop then + finally () + else ( + let () = Eio_unix.sleep 0.5 in + let () = tick () in tick_thread () + ) in - Lwt.async tick_thread + Fiber.fork ~sw + (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) + tick_thread end in let httpc = Httpc.create () in let encoder = Pbrt.Encoder.create () in @@ -411,8 +419,10 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let@ () = guard_exn_ "push trace" in Batch.push' batch_traces e; let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_traces_maybe ~now httpc encoder in + Fiber.fork ~sw + (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) + (fun () -> + let (_ : bool) = emit_traces_maybe ~now httpc encoder in ()) let push_metrics e = @@ -420,16 +430,20 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = sample_gc_metrics_if_needed (); Batch.push' batch_metrics e; let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in + Fiber.fork ~sw + (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) + (fun () -> + let (_ : bool) = emit_metrics_maybe ~now httpc encoder in ()) let push_logs e = let@ () = guard_exn_ "push logs" in Batch.push' batch_logs e; let now = Mtime_clock.now () in - Lwt.async (fun () -> - let+ (_ : bool) = emit_logs_maybe ~now httpc encoder in + Fiber.fork ~sw + (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) + (fun () -> + let (_ : bool) = emit_logs_maybe ~now httpc encoder in ()) let set_on_tick_callbacks = set_on_tick_callbacks @@ -438,23 +452,31 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = tick_common_ (); sample_gc_metrics_if_needed (); let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now httpc encoder - and+ (_ : bool) = emit_logs_maybe ~now httpc encoder - and+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in + let (_ : bool), ((_ : bool), (_ : bool)) = + Fiber.pair + (emit_logs_maybe ~now httpc encoder) + (Fiber.pair + (emit_metrics_maybe ~now httpc encoder) + (emit_traces_maybe ~now httpc encoder)) + in () let () = setup_ticker_thread ~tick:tick_ ~finally:ignore () (* if called in a blocking context: work in the background *) - let tick () = Lwt.async tick_ + let tick () = + Fiber.fork ~sw + (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) + tick_ let cleanup ~on_done () = if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - Lwt.async (fun () -> - let* () = emit_all_force httpc encoder in + Fiber.fork ~sw + (* TODO: lwt-to-direct-style: [sw] must be propagated here. *) + (fun () -> + let () = emit_all_force httpc encoder in Httpc.cleanup httpc; - on_done (); - Lwt.return ()) + on_done ()) end in (module M) @@ -579,24 +601,26 @@ let setup_ ?stop ?config () : unit = let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () -let remove_backend () : unit Lwt.t = - let done_fut, done_u = Lwt.wait () in - OT.Collector.remove_backend ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); +let remove_backend () : unit Promise.t = + let done_fut, done_u + (* TODO: lwt-to-direct-style: Translation is incomplete, [Promise.await] must be called on the promise when it's part of control-flow. *) + = + Promise.create () + in + OT.Collector.remove_backend ~on_done:(fun () -> Promise.resolve done_u ()) (); done_fut -let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t - = +let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : + _ Promise.t = if enable then ( let open Lwt.Syntax in setup_ ?stop ~config (); - - Lwt.catch - (fun () -> - let* res = f () in - let+ () = remove_backend () in - res) - (fun exn -> - let* () = remove_backend () in - reraise exn) + try + let res = f () in + let () = remove_backend () in + res + with exn -> + let () = remove_backend () in + reraise exn ) else f () diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli index 675dbd52..c1dd53ad 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.mli @@ -32,7 +32,7 @@ val setup : an atomic boolean. When it becomes true, background threads will all stop after a little while. *) -val remove_backend : unit -> unit Lwt.t +val remove_backend : unit -> unit (** Shutdown current backend @since NEXT_RELEASE *) @@ -41,7 +41,7 @@ val with_setup : ?config:Config.t -> ?enable:bool -> unit -> - (unit -> 'a Lwt.t) -> - 'a Lwt.t + (unit -> 'a) -> + 'a (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after [f()] returns See {!setup} for more details. *) diff --git a/tests/bin/cohttp_client_eio.ml b/tests/bin/cohttp_client_eio.ml index 3286f1bf..70b60cda 100644 --- a/tests/bin/cohttp_client_eio.ml +++ b/tests/bin/cohttp_client_eio.ml @@ -18,12 +18,12 @@ let run () = let@ scope = Otel_lwt.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" in - let* () = Lwt_unix.sleep !sleep_outer in + let () = Eio_unix.sleep !sleep_outer in let module C = (val mk_client ~scope) in - let* _res, body = + let _res, body = C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") in - let* () = Cohttp_lwt.Body.drain_body body in + let () = Cohttp_lwt.Body.drain_body body in go () in go ()