diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index 3ea9c317..fcdc611a 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -5,6 +5,7 @@ module OT = Opentelemetry module Config = Config +module Signal = Opentelemetry_client.Signal open Opentelemetry open Common_ @@ -270,6 +271,7 @@ end let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in let open Lwt.Syntax in + let module Conv = Signal.Converter in (* local helpers *) let open struct let timeout = @@ -291,10 +293,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Lwt.t = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in + let send_http_ (httpc : Httpc.t) ~url data : unit Lwt.t = let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with | Ok () -> Lwt.return () @@ -309,57 +308,41 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = (* avoid crazy error loop *) Lwt_unix.sleep 3. - let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) - = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () - in - let url = config.url_metrics in - send_http_ curl encoder ~url - ~encode:Metrics_service.encode_pb_export_metrics_service_request x + let send_metrics_http client (l : Metrics.resource_metrics list) = + Conv.metrics l |> send_http_ client ~url:config.url_metrics - let send_traces_http curl encoder (l : Trace.resource_spans list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - let url = config.url_traces in - send_http_ curl encoder ~url - ~encode:Trace_service.encode_pb_export_trace_service_request x + let send_traces_http client (l : Trace.resource_spans list) = + Conv.traces l |> send_http_ client ~url:config.url_traces - let send_logs_http curl encoder (l : Logs.resource_logs list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - let url = config.url_logs in - send_http_ curl encoder ~url - ~encode:Logs_service.encode_pb_export_logs_service_request x + let send_logs_http client (l : Logs.resource_logs list) = + Conv.logs l |> send_http_ client ~url:config.url_logs + + let maybe_pop ?force ~now batch = + Batch.pop_if_ready ?force ~now batch + |> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) []) (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc encoder : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_metrics with + let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = + match maybe_pop ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> - let batch = !gc_metrics :: l in + let batch = !gc_metrics @ l in gc_metrics := []; - let+ () = send_metrics_http httpc encoder batch in + let+ () = send_metrics_http httpc batch in true - let emit_traces_maybe ~now ?force httpc encoder : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_traces with + let emit_traces_maybe ~now ?force httpc : bool Lwt.t = + match maybe_pop ?force ~now batch_traces with | None -> Lwt.return false | Some l -> - let+ () = send_traces_http httpc encoder l in + let+ () = send_traces_http httpc l in true - let emit_logs_maybe ~now ?force httpc encoder : bool Lwt.t = - match Batch.pop_if_ready ?force ~now batch_logs with + let emit_logs_maybe ~now ?force httpc : bool Lwt.t = + match maybe_pop ?force ~now batch_logs with | None -> Lwt.return false | Some l -> - let+ () = send_logs_http httpc encoder l in + let+ () = send_logs_http httpc l in true let[@inline] guard_exn_ where f = @@ -370,24 +353,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where (Printexc.to_string e) bt - let emit_all_force (httpc : Httpc.t) encoder : unit Lwt.t = + let emit_all_force (httpc : Httpc.t) : unit Lwt.t = let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc encoder - and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc encoder - and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc encoder in - () - - let tick_common_ () = - if Config.Env.get_debug () then - Printf.eprintf "tick (from %d)\n%!" (tid ()); - sample_gc_metrics_if_needed (); - List.iter - (fun f -> - try f () - with e -> - Printf.eprintf "on tick callback raised: %s\n" - (Printexc.to_string e)) - (AList.get @@ Atomic.get on_tick_cbs_); + let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc + and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc + and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc in () (* thread that calls [tick()] regularly, to help enforce timeouts *) @@ -404,7 +374,6 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Lwt.async tick_thread end in let httpc = Httpc.create () in - let encoder = Pbrt.Encoder.create () in let module M = struct (* we make sure that this is thread-safe, even though we don't have a @@ -417,7 +386,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_traces e; let now = Mtime_clock.now () in Lwt.async (fun () -> - let+ (_ : bool) = emit_traces_maybe ~now httpc encoder in + let+ (_ : bool) = emit_traces_maybe ~now httpc in ()) let push_metrics e = @@ -426,7 +395,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> - let+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in + let+ (_ : bool) = emit_metrics_maybe ~now httpc in ()) let push_logs e = @@ -434,18 +403,26 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = Batch.push' batch_logs e; let now = Mtime_clock.now () in Lwt.async (fun () -> - let+ (_ : bool) = emit_logs_maybe ~now httpc encoder in + let+ (_ : bool) = emit_logs_maybe ~now httpc in ()) let set_on_tick_callbacks = set_on_tick_callbacks let tick_ () = - tick_common_ (); + if Config.Env.get_debug () then + Printf.eprintf "tick (from %d)\n%!" (tid ()); sample_gc_metrics_if_needed (); + List.iter + (fun f -> + try f () + with e -> + Printf.eprintf "on tick callback raised: %s\n" + (Printexc.to_string e)) + (AList.get @@ Atomic.get on_tick_cbs_); let now = Mtime_clock.now () in - let+ (_ : bool) = emit_traces_maybe ~now httpc encoder - and+ (_ : bool) = emit_logs_maybe ~now httpc encoder - and+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in + let+ (_ : bool) = emit_traces_maybe ~now httpc + and+ (_ : bool) = emit_logs_maybe ~now httpc + and+ (_ : bool) = emit_metrics_maybe ~now httpc in () let () = setup_ticker_thread ~tick:tick_ ~finally:ignore () @@ -457,7 +434,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: exiting…\n%!"; Lwt.async (fun () -> - let* () = emit_all_force httpc encoder in + let* () = emit_all_force httpc in Httpc.cleanup httpc; on_done (); Lwt.return ()) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 1c4e400e..c0e5eaaa 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -5,6 +5,8 @@ module OT = Opentelemetry module Config = Config +module Self_trace = Opentelemetry_client.Self_trace +module Signal = Opentelemetry_client.Signal open Opentelemetry include Common_ @@ -21,28 +23,6 @@ let timeout_gc_metrics = Mtime.Span.(20 * s) (** side channel for GC, appended to metrics batch data *) let gc_metrics = AList.make () -(** Mini tracing module (disabled if [config.self_trace=false]) *) -module Self_trace = struct - let enabled = Atomic.make true - - let add_event (scope : Scope.t) ev = Scope.add_event scope (fun () -> ev) - - let dummy_trace_id_ = Trace_id.create () - - let dummy_span_id = Span_id.create () - - let with_ ?kind ?attrs name f = - if Atomic.get enabled then - Opentelemetry.Trace.with_ ?kind ?attrs name f - else ( - (* do nothing *) - let scope = - Scope.make ~trace_id:dummy_trace_id_ ~span_id:dummy_span_id () - in - f scope - ) -end - (** capture current GC metrics if {!needs_gc_metrics} is true or it has been a long time since the last GC metrics collection, and push them into {!gc_metrics} for later collection *) @@ -151,21 +131,11 @@ end = struct mutable send_threads: Thread.t array; (** Threads that send data via http *) } - let send_http_ ~stop ~(config : Config.t) (client : Curl.t) encoder ~url - ~encode x : unit = + let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit = let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http" in - let data = - let@ _sc = - Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" - in - Pbrt.Encoder.reset encoder; - encode x encoder; - Pbrt.Encoder.to_string encoder - in - if Config.Env.get_debug () then Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url (String.length data); @@ -215,68 +185,36 @@ end = struct (* avoid crazy error loop *) Thread.delay 3. - let send_logs_http ~stop ~config (client : Curl.t) encoder - (l : Logs.resource_logs list list) : unit = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer "send-logs" - ~attrs:[ "n", `Int (List.length l) ] - in - - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - send_http_ ~stop ~config client encoder ~url:config.Config.common.url_logs - ~encode:Logs_service.encode_pb_export_logs_service_request x - - let send_metrics_http ~stop ~config curl encoder - (l : Metrics.resource_metrics list list) : unit = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer "send-metrics" - ~attrs:[ "n", `Int (List.length l) ] - in - - let x = - Metrics_service.default_export_metrics_service_request ~resource_metrics:l - () - in - send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_metrics - ~encode:Metrics_service.encode_pb_export_metrics_service_request x - - let send_traces_http ~stop ~config curl encoder - (l : Trace.resource_spans list list) : unit = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let@ _sp = - Self_trace.with_ ~kind:Span_kind_producer "send-traces" - ~attrs:[ "n", `Int (List.length l) ] - in - - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_traces - ~encode:Trace_service.encode_pb_export_trace_service_request x - let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev (** Thread that, in a loop, reads from [q] to get the next message to send via http *) let bg_thread_loop (self : t) : unit = Ezcurl.with_client ?set_opts:None @@ fun client -> - let stop = self.stop in let config = self.config in - let encoder = Pbrt.Encoder.create () in + let stop = self.stop in + let send ~name ~url ~conv signals = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] signals in + let@ _sp = + Self_trace.with_ ~kind:Span_kind_producer name + ~attrs:[ "n", `Int (List.length l) ] + in + conv l |> send_http_ ~stop ~config ~url client + in + let module Conv = Signal.Converter in try while not (Atomic.get stop) do let msg = B_queue.pop self.send_q in match msg with | To_send.Send_trace tr -> - send_traces_http ~stop ~config client encoder tr + send ~name:"send-traces" ~conv:Conv.traces + ~url:config.common.url_traces tr | To_send.Send_metric ms -> - send_metrics_http ~stop ~config client encoder ms + send ~name:"send-metrics" ~conv:Conv.metrics + ~url:config.common.url_metrics ms | To_send.Send_logs logs -> - send_logs_http ~stop ~config client encoder logs + send ~name:"send-logs" ~conv:Conv.logs ~url:config.common.url_logs + logs done with B_queue.Closed -> () @@ -513,7 +451,7 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () let backend = create_backend ~stop ~config () in Opentelemetry.Collector.set_backend backend; - Atomic.set Self_trace.enabled config.common.self_trace; + Self_trace.set_enabled config.common.self_trace; if config.ticker_thread then ( (* at most a minute *) diff --git a/src/client/client.ml b/src/client/client.ml index 7a911ad0..fa69c983 100644 --- a/src/client/client.ml +++ b/src/client/client.ml @@ -4,3 +4,5 @@ and [opentelemetry-client-ocurl] packages package. *) module Config = Config +module Signal = Signal +module Self_trace = Self_trace diff --git a/src/client/dune b/src/client/dune index 2bc4e4a5..da204f79 100644 --- a/src/client/dune +++ b/src/client/dune @@ -1,4 +1,5 @@ (library (name opentelemetry_client) (public_name opentelemetry.client) + (libraries opentelemetry pbrt) (synopsis "Common types and logic shared between client implementations")) diff --git a/src/client/self_trace.ml b/src/client/self_trace.ml new file mode 100644 index 00000000..62d04cae --- /dev/null +++ b/src/client/self_trace.ml @@ -0,0 +1,22 @@ +module OT = Opentelemetry + +let enabled = Atomic.make true + +let add_event (scope : OT.Scope.t) ev = OT.Scope.add_event scope (fun () -> ev) + +let dummy_trace_id_ = OT.Trace_id.dummy + +let dummy_span_id = OT.Span_id.dummy + +let with_ ?kind ?attrs name f = + if Atomic.get enabled then + OT.Trace.with_ ?kind ?attrs name f + else ( + (* A new scope is needed here because it might be modified *) + let scope = + OT.Scope.make ~trace_id:dummy_trace_id_ ~span_id:dummy_span_id () + in + f scope + ) + +let set_enabled b = Atomic.set enabled b diff --git a/src/client/self_trace.mli b/src/client/self_trace.mli new file mode 100644 index 00000000..d0690f02 --- /dev/null +++ b/src/client/self_trace.mli @@ -0,0 +1,12 @@ +(** Mini tracing module (disabled if [config.self_trace=false]) *) + +val add_event : Opentelemetry.Scope.t -> Opentelemetry.Event.t -> unit + +val with_ : + ?kind:Opentelemetry.Span_kind.t -> + ?attrs:(string * Opentelemetry.value) list -> + string -> + (Opentelemetry.Scope.t -> 'a) -> + 'a + +val set_enabled : bool -> unit diff --git a/src/client/signal.ml b/src/client/signal.ml new file mode 100644 index 00000000..691cc6f8 --- /dev/null +++ b/src/client/signal.ml @@ -0,0 +1,43 @@ +module Trace_service = Opentelemetry.Proto.Trace_service +module Metrics_service = Opentelemetry.Proto.Metrics_service +module Logs_service = Opentelemetry.Proto.Logs_service +module Span = Opentelemetry.Span + +let ( let@ ) = ( @@ ) + +module Converter = struct + let resource_to_string ~encoder ~ctor ~enc resource = + let encoder = + match encoder with + | Some e -> e + | None -> Pbrt.Encoder.create () + in + let x = ctor resource in + let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in + Pbrt.Encoder.reset encoder; + enc x encoder; + Pbrt.Encoder.to_string encoder + + let logs ?encoder resource_logs = + resource_logs + |> resource_to_string ~encoder + ~ctor:(fun r -> + Logs_service.default_export_logs_service_request ~resource_logs:r ()) + ~enc:Logs_service.encode_pb_export_logs_service_request + + let metrics ?encoder resource_metrics = + resource_metrics + |> resource_to_string ~encoder + ~ctor:(fun r -> + Metrics_service.default_export_metrics_service_request + ~resource_metrics:r ()) + ~enc:Metrics_service.encode_pb_export_metrics_service_request + + let traces ?encoder resource_spans = + resource_spans + |> resource_to_string ~encoder + ~ctor:(fun r -> + Trace_service.default_export_trace_service_request ~resource_spans:r + ()) + ~enc:Trace_service.encode_pb_export_trace_service_request +end diff --git a/src/client/signal.mli b/src/client/signal.mli new file mode 100644 index 00000000..ef2a6e24 --- /dev/null +++ b/src/client/signal.mli @@ -0,0 +1,31 @@ +(** Constructing and managing OTel + {{:https://opentelemetry.io/docs/concepts/signals/} signals} *) + +(** Convert signals to protobuf encoded strings, ready to be sent over the wire + + NOTE: The converters share an underlying stateful encoder, so each domain or + system thread should have its own [Converter] instance *) +module Converter : sig + val logs : + ?encoder:Pbrt.Encoder.t -> + Opentelemetry_proto.Logs.resource_logs list -> + string + (** [logs ls] is a protobuf encoded string of the logs [ls] + + @param encoder provide an encoder state to reuse *) + + val metrics : + ?encoder:Pbrt.Encoder.t -> + Opentelemetry_proto.Metrics.resource_metrics list -> + string + (** [metrics ms] is a protobuf encoded string of the metrics [ms] + @param encoder provide an encoder state to reuse *) + + val traces : + ?encoder:Pbrt.Encoder.t -> + Opentelemetry_proto.Trace.resource_spans list -> + string + (** [metrics ts] is a protobuf encoded string of the traces [ts] + + @param encoder provide an encoder state to reuse *) +end