diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index d51192e3..e96ca664 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -5,6 +5,7 @@ module OT = Opentelemetry module Config = Config +module Signal = Opentelemetry_client.Signal open Opentelemetry open Common_ @@ -270,6 +271,7 @@ end let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in let open Lwt.Syntax in + let module Conv = Signal.Converter () in (* local helpers *) let open struct let timeout = @@ -291,10 +293,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Lwt.t = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in + let send_http_ (httpc : Httpc.t) ~url data : unit Lwt.t = let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with | Ok () -> Lwt.return () @@ -309,57 +308,41 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = (* avoid crazy error loop *) Lwt_unix.sleep 3. - let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) - = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () - in - let url = config.url_metrics in - send_http_ curl encoder ~url - ~encode:Metrics_service.encode_pb_export_metrics_service_request x + let send_metrics_http client (l : Metrics.resource_metrics list) = + Conv.metrics l |> send_http_ client ~url:config.url_metrics - let send_traces_http curl encoder (l : Trace.resource_spans list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - let url = config.url_traces in - send_http_ curl encoder ~url - ~encode:Trace_service.encode_pb_export_trace_service_request x + let send_traces_http client (l : Trace.resource_spans list) = + Conv.traces l |> send_http_ client ~url:config.url_traces - let send_logs_http curl encoder (l : Logs.resource_logs list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - let url = config.url_logs in - send_http_ curl encoder ~url - ~encode:Logs_service.encode_pb_export_logs_service_request x + let send_logs_http client (l : Logs.resource_logs list) = + Conv.logs l |> send_http_ client ~url:config.url_logs + + let maybe_pop ?force ~now batch = + Batch.pop_if_ready ?force ~now batch + |> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) []) (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc encoder : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_metrics with + let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = + match maybe_pop ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> - let batch = !gc_metrics :: l in + let batch = !gc_metrics @ l in gc_metrics := []; - let+ () = send_metrics_http httpc encoder batch in + let+ () = send_metrics_http httpc batch in true - let emit_traces_maybe ~now ?force httpc encoder : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_traces with + let emit_traces_maybe ~now ?force httpc : bool Lwt.t = + match maybe_pop ?force ~now batch_traces with | None -> Lwt.return false | Some l -> - let+ () = send_traces_http httpc encoder l in + let+ () = send_traces_http httpc l in true - let emit_logs_maybe ~now ?force httpc encoder : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_logs with + let emit_logs_maybe ~now ?force httpc : bool Lwt.t = + match maybe_pop ?force ~now batch_logs with | None -> Lwt.return false | Some l -> - let+ () = send_logs_http httpc encoder l in + let+ () = send_logs_http httpc l in true let[@inline] guard_exn_ where f = @@ -370,11 +353,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where (Printexc.to_string e) bt - let emit_all_force (httpc : Httpc.t) encoder : unit Lwt.t = + let emit_all_force (httpc : Httpc.t) : unit Lwt.t = let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc encoder - and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc encoder - and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc encoder in + let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc + and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc + and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc in () (* thread that calls [tick()] regularly, to help enforce timeouts *) @@ -391,7 +374,6 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Lwt.async tick_thread end in let httpc = Httpc.create () in - let encoder = Pbrt.Encoder.create () in let module M = struct (* we make sure that this is thread-safe, even though we don't have a @@ -404,7 +386,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_traces e; let now = Mtime_clock.now () in Lwt.async (fun () -> - let+ (_ : bool) = emit_traces_maybe ~now httpc encoder in + let+ (_ : bool) = emit_traces_maybe ~now httpc in ()) let push_metrics e = @@ -413,7 +395,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> - let+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in + let+ (_ : bool) = emit_metrics_maybe ~now httpc in ()) let push_logs e = @@ -421,7 +403,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_logs e; let now = Mtime_clock.now () in Lwt.async (fun () -> - let+ (_ : bool) = emit_logs_maybe ~now httpc encoder in + let+ (_ : bool) = emit_logs_maybe ~now httpc in ()) let set_on_tick_callbacks = set_on_tick_callbacks @@ -438,9 +420,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = (Printexc.to_string e)) (AList.get @@ Atomic.get on_tick_cbs_); let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now httpc encoder - and+ (_ : bool) = emit_logs_maybe ~now httpc encoder - and+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in + let+ (_ : bool) = emit_traces_maybe ~now httpc + and+ (_ : bool) = emit_logs_maybe ~now httpc + and+ (_ : bool) = emit_metrics_maybe ~now httpc in () let () = setup_ticker_thread ~tick:tick_ ~finally:ignore () @@ -452,7 +434,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: exiting…\n%!"; Lwt.async (fun () -> - let* () = emit_all_force httpc encoder in + let* () = emit_all_force httpc in Httpc.cleanup httpc; on_done (); Lwt.return ()) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 4b11e004..a62c750d 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -6,6 +6,7 @@ module OT = Opentelemetry module Config = Config module Self_trace = Opentelemetry_client.Self_trace +module Signal = Opentelemetry_client.Signal open Opentelemetry include Common_ @@ -130,21 +131,11 @@ end = struct mutable send_threads: Thread.t array; (** Threads that send data via http *) } - let send_http_ ~stop ~(config : Config.t) (client : Curl.t) encoder ~url - ~encode x : unit = + let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit = let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" in - let data = - let@ _sc = - Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" - in - Pbrt.Encoder.reset encoder; - encode x encoder; - Pbrt.Encoder.to_string encoder - in - if Config.Env.get_debug () then Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url (String.length data); @@ -194,68 +185,36 @@ end = struct (* avoid crazy error loop *) Thread.delay 3. - let send_logs_http ~stop ~config (client : Curl.t) encoder - (l : Logs.resource_logs list list) : unit = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer "send-logs" - ~attrs:[ "n", `Int (List.length l) ] - in - - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - send_http_ ~stop ~config client encoder ~url:config.Config.common.url_logs - ~encode:Logs_service.encode_pb_export_logs_service_request x - - let send_metrics_http ~stop ~config curl encoder - (l : Metrics.resource_metrics list list) : unit = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer "send-metrics" - ~attrs:[ "n", `Int (List.length l) ] - in - - let x = - Metrics_service.default_export_metrics_service_request ~resource_metrics:l - () - in - send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_metrics - ~encode:Metrics_service.encode_pb_export_metrics_service_request x - - let send_traces_http ~stop ~config curl encoder - (l : Trace.resource_spans list list) : unit = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer "send-traces" - ~attrs:[ "n", `Int (List.length l) ] - in - - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_traces - ~encode:Trace_service.encode_pb_export_trace_service_request x - let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev (** Thread that, in a loop, reads from [q] to get the next message to send via http *) let bg_thread_loop (self : t) : unit = Ezcurl.with_client ?set_opts:None @@ fun client -> - let stop = self.stop in let config = self.config in - let encoder = Pbrt.Encoder.create () in + let stop = self.stop in + let send ~name ~url ~conv signals = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] signals in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer name + ~attrs:[ "n", `Int (List.length l) ] + in + conv l |> send_http_ ~stop ~config ~url client + in + let module Conv = Signal.Converter () in try while not (Atomic.get stop) do let msg = B_queue.pop self.send_q in match msg with | To_send.Send_trace tr -> - send_traces_http ~stop ~config client encoder tr + send ~name:"send-traces" ~conv:Conv.traces + ~url:config.common.url_traces tr | To_send.Send_metric ms -> - send_metrics_http ~stop ~config client encoder ms + send ~name:"send-metrics" ~conv:Conv.metrics + ~url:config.common.url_metrics ms | To_send.Send_logs logs -> - send_logs_http ~stop ~config client encoder logs + send ~name:"send-logs" ~conv:Conv.logs ~url:config.common.url_logs + logs done with B_queue.Closed -> () diff --git a/src/client/dune b/src/client/dune index 2bc4e4a5..da204f79 100644 --- a/src/client/dune +++ b/src/client/dune @@ -1,4 +1,5 @@ (library (name opentelemetry_client) (public_name opentelemetry.client) + (libraries opentelemetry pbrt) (synopsis "Common types and logic shared between client implementations")) diff --git a/src/client/signal.ml b/src/client/signal.ml new file mode 100644 index 00000000..311889f8 --- /dev/null +++ b/src/client/signal.ml @@ -0,0 +1,40 @@ +module Trace_service = Opentelemetry.Proto.Trace_service +module Metrics_service = Opentelemetry.Proto.Metrics_service +module Logs_service = Opentelemetry.Proto.Logs_service +module Span = Opentelemetry.Span + +let ( let@ ) f x = f x + +module Converter () = struct + let encoder = Pbrt.Encoder.create () + + let resource_to_string ~ctor ~enc resource = + let x = ctor resource in + let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in + Pbrt.Encoder.reset encoder; + enc x encoder; + Pbrt.Encoder.to_string encoder + + let logs resource_logs = + resource_logs + |> resource_to_string + ~ctor:(fun r -> + Logs_service.default_export_logs_service_request ~resource_logs:r ()) + ~enc:Logs_service.encode_pb_export_logs_service_request + + let metrics resource_metrics = + resource_metrics + |> resource_to_string + ~ctor:(fun r -> + Metrics_service.default_export_metrics_service_request + ~resource_metrics:r ()) + ~enc:Metrics_service.encode_pb_export_metrics_service_request + + let traces resource_spans = + resource_spans + |> resource_to_string + ~ctor:(fun r -> + Trace_service.default_export_trace_service_request ~resource_spans:r + ()) + ~enc:Trace_service.encode_pb_export_trace_service_request +end diff --git a/src/client/signal.mli b/src/client/signal.mli new file mode 100644 index 00000000..b445184c --- /dev/null +++ b/src/client/signal.mli @@ -0,0 +1,17 @@ +(** Constructing and managing OTel + {{:https://opentelemetry.io/docs/concepts/signals/} signals} *) + +(** Convert signals to protobuf encoded strings, ready to be sent over the wire + + NOTE: The converters share an underlying stateful encoder, so each domain or + system thread should have its own [Converter] instance *) +module Converter : functor () -> sig + val logs : Opentelemetry_proto.Logs.resource_logs list -> string + (** [logs ls] is a protobuf encoded string of the logs [ls] *) + + val metrics : Opentelemetry_proto.Metrics.resource_metrics list -> string + (** [metrics ms] is a protobuf encoded string of the metrics [ms] *) + + val traces : Opentelemetry_proto.Trace.resource_spans list -> string + (** [metrics ts] is a protobuf encoded string of the traces [ts] *) +end