Factor out signal encoding logic

This commit is contained in:
Shon Feder 2025-06-15 23:36:39 -04:00
parent 0045a97e34
commit 52377b0a03
No known key found for this signature in database
5 changed files with 110 additions and 111 deletions

View file

@ -5,6 +5,7 @@
module OT = Opentelemetry
module Config = Config
module Signal = Opentelemetry_client.Signal
open Opentelemetry
open Common_
@ -270,6 +271,7 @@ end
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in
let open Lwt.Syntax in
let module Conv = Signal.Converter () in
(* local helpers *)
let open struct
let timeout =
@ -291,10 +293,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let set_on_tick_callbacks = Atomic.set on_tick_cbs_
let send_http_ (httpc : Httpc.t) encoder ~url ~encode x : unit Lwt.t =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
let send_http_ (httpc : Httpc.t) ~url data : unit Lwt.t =
let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in
match r with
| Ok () -> Lwt.return ()
@ -309,57 +308,41 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
(* avoid crazy error loop *)
Lwt_unix.sleep 3.
let send_metrics_http curl encoder (l : Metrics.resource_metrics list list)
=
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l ()
in
let url = config.url_metrics in
send_http_ curl encoder ~url
~encode:Metrics_service.encode_pb_export_metrics_service_request x
let send_metrics_http client (l : Metrics.resource_metrics list) =
Conv.metrics l |> send_http_ client ~url:config.url_metrics
let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
let url = config.url_traces in
send_http_ curl encoder ~url
~encode:Trace_service.encode_pb_export_trace_service_request x
let send_traces_http client (l : Trace.resource_spans list) =
Conv.traces l |> send_http_ client ~url:config.url_traces
let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
let url = config.url_logs in
send_http_ curl encoder ~url
~encode:Logs_service.encode_pb_export_logs_service_request x
let send_logs_http client (l : Logs.resource_logs list) =
Conv.logs l |> send_http_ client ~url:config.url_logs
let maybe_pop ?force ~now batch =
Batch.pop_if_ready ?force ~now batch
|> Option.map (List.fold_left (fun acc l -> List.rev_append l acc) [])
(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_metrics with
let emit_metrics_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_metrics with
| None -> Lwt.return false
| Some l ->
let batch = !gc_metrics :: l in
let batch = !gc_metrics @ l in
gc_metrics := [];
let+ () = send_metrics_http httpc encoder batch in
let+ () = send_metrics_http httpc batch in
true
let emit_traces_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_traces with
let emit_traces_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_traces with
| None -> Lwt.return false
| Some l ->
let+ () = send_traces_http httpc encoder l in
let+ () = send_traces_http httpc l in
true
let emit_logs_maybe ~now ?force httpc encoder : bool Lwt.t =
match Batch.pop_if_ready ?force ~now batch_logs with
let emit_logs_maybe ~now ?force httpc : bool Lwt.t =
match maybe_pop ?force ~now batch_logs with
| None -> Lwt.return false
| Some l ->
let+ () = send_logs_http httpc encoder l in
let+ () = send_logs_http httpc l in
true
let[@inline] guard_exn_ where f =
@ -370,11 +353,11 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
"opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where
(Printexc.to_string e) bt
let emit_all_force (httpc : Httpc.t) encoder : unit Lwt.t =
let emit_all_force (httpc : Httpc.t) : unit Lwt.t =
let now = Mtime_clock.now () in
let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc encoder
and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc encoder
and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc encoder in
let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc
and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc
and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc in
()
(* thread that calls [tick()] regularly, to help enforce timeouts *)
@ -391,7 +374,6 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Lwt.async tick_thread
end in
let httpc = Httpc.create () in
let encoder = Pbrt.Encoder.create () in
let module M = struct
(* we make sure that this is thread-safe, even though we don't have a
@ -404,7 +386,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Batch.push' batch_traces e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_traces_maybe ~now httpc encoder in
let+ (_ : bool) = emit_traces_maybe ~now httpc in
())
let push_metrics e =
@ -413,7 +395,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Batch.push' batch_metrics e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in
let+ (_ : bool) = emit_metrics_maybe ~now httpc in
())
let push_logs e =
@ -421,7 +403,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
Batch.push' batch_logs e;
let now = Mtime_clock.now () in
Lwt.async (fun () ->
let+ (_ : bool) = emit_logs_maybe ~now httpc encoder in
let+ (_ : bool) = emit_logs_maybe ~now httpc in
())
let set_on_tick_callbacks = set_on_tick_callbacks
@ -438,9 +420,9 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
(Printexc.to_string e))
(AList.get @@ Atomic.get on_tick_cbs_);
let now = Mtime_clock.now () in
let+ (_ : bool) = emit_traces_maybe ~now httpc encoder
and+ (_ : bool) = emit_logs_maybe ~now httpc encoder
and+ (_ : bool) = emit_metrics_maybe ~now httpc encoder in
let+ (_ : bool) = emit_traces_maybe ~now httpc
and+ (_ : bool) = emit_logs_maybe ~now httpc
and+ (_ : bool) = emit_metrics_maybe ~now httpc in
()
let () = setup_ticker_thread ~tick:tick_ ~finally:ignore ()
@ -452,7 +434,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: exiting…\n%!";
Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in
let* () = emit_all_force httpc in
Httpc.cleanup httpc;
on_done ();
Lwt.return ())

View file

@ -6,6 +6,7 @@
module OT = Opentelemetry
module Config = Config
module Self_trace = Opentelemetry_client.Self_trace
module Signal = Opentelemetry_client.Signal
open Opentelemetry
include Common_
@ -130,21 +131,11 @@ end = struct
mutable send_threads: Thread.t array; (** Threads that send data via http *)
}
let send_http_ ~stop ~(config : Config.t) (client : Curl.t) encoder ~url
~encode x : unit =
let send_http_ ~stop ~(config : Config.t) (client : Curl.t) ~url data : unit =
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
in
let data =
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto"
in
Pbrt.Encoder.reset encoder;
encode x encoder;
Pbrt.Encoder.to_string encoder
in
if Config.Env.get_debug () then
Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url
(String.length data);
@ -194,68 +185,36 @@ end = struct
(* avoid crazy error loop *)
Thread.delay 3.
let send_logs_http ~stop ~config (client : Curl.t) encoder
(l : Logs.resource_logs list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-logs"
~attrs:[ "n", `Int (List.length l) ]
in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ ~stop ~config client encoder ~url:config.Config.common.url_logs
~encode:Logs_service.encode_pb_export_logs_service_request x
let send_metrics_http ~stop ~config curl encoder
(l : Metrics.resource_metrics list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-metrics"
~attrs:[ "n", `Int (List.length l) ]
in
let x =
Metrics_service.default_export_metrics_service_request ~resource_metrics:l
()
in
send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_metrics
~encode:Metrics_service.encode_pb_export_metrics_service_request x
let send_traces_http ~stop ~config curl encoder
(l : Trace.resource_spans list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-traces"
~attrs:[ "n", `Int (List.length l) ]
in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ ~stop ~config curl encoder ~url:config.Config.common.url_traces
~encode:Trace_service.encode_pb_export_trace_service_request x
let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev
(** Thread that, in a loop, reads from [q] to get the next message to send via
http *)
let bg_thread_loop (self : t) : unit =
Ezcurl.with_client ?set_opts:None @@ fun client ->
let stop = self.stop in
let config = self.config in
let encoder = Pbrt.Encoder.create () in
let stop = self.stop in
let send ~name ~url ~conv signals =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] signals in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer name
~attrs:[ "n", `Int (List.length l) ]
in
conv l |> send_http_ ~stop ~config ~url client
in
let module Conv = Signal.Converter () in
try
while not (Atomic.get stop) do
let msg = B_queue.pop self.send_q in
match msg with
| To_send.Send_trace tr ->
send_traces_http ~stop ~config client encoder tr
send ~name:"send-traces" ~conv:Conv.traces
~url:config.common.url_traces tr
| To_send.Send_metric ms ->
send_metrics_http ~stop ~config client encoder ms
send ~name:"send-metrics" ~conv:Conv.metrics
~url:config.common.url_metrics ms
| To_send.Send_logs logs ->
send_logs_http ~stop ~config client encoder logs
send ~name:"send-logs" ~conv:Conv.logs ~url:config.common.url_logs
logs
done
with B_queue.Closed -> ()

View file

@ -1,4 +1,5 @@
(library
(name opentelemetry_client)
(public_name opentelemetry.client)
(libraries opentelemetry pbrt)
(synopsis "Common types and logic shared between client implementations"))

40
src/client/signal.ml Normal file
View file

@ -0,0 +1,40 @@
module Trace_service = Opentelemetry.Proto.Trace_service
module Metrics_service = Opentelemetry.Proto.Metrics_service
module Logs_service = Opentelemetry.Proto.Logs_service
module Span = Opentelemetry.Span
let ( let@ ) f x = f x
module Converter () = struct
let encoder = Pbrt.Encoder.create ()
let resource_to_string ~ctor ~enc resource =
let x = ctor resource in
let@ _sc = Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto" in
Pbrt.Encoder.reset encoder;
enc x encoder;
Pbrt.Encoder.to_string encoder
let logs resource_logs =
resource_logs
|> resource_to_string
~ctor:(fun r ->
Logs_service.default_export_logs_service_request ~resource_logs:r ())
~enc:Logs_service.encode_pb_export_logs_service_request
let metrics resource_metrics =
resource_metrics
|> resource_to_string
~ctor:(fun r ->
Metrics_service.default_export_metrics_service_request
~resource_metrics:r ())
~enc:Metrics_service.encode_pb_export_metrics_service_request
let traces resource_spans =
resource_spans
|> resource_to_string
~ctor:(fun r ->
Trace_service.default_export_trace_service_request ~resource_spans:r
())
~enc:Trace_service.encode_pb_export_trace_service_request
end

17
src/client/signal.mli Normal file
View file

@ -0,0 +1,17 @@
(** Constructing and managing OTel
{{:https://opentelemetry.io/docs/concepts/signals/} signals} *)
(** Convert signals to protobuf encoded strings, ready to be sent over the wire
NOTE: The converters share an underlying stateful encoder, so each domain or
system thread should have its own [Converter] instance *)
module Converter : functor () -> sig
val logs : Opentelemetry_proto.Logs.resource_logs list -> string
(** [logs ls] is a protobuf encoded string of the logs [ls] *)
val metrics : Opentelemetry_proto.Metrics.resource_metrics list -> string
(** [metrics ms] is a protobuf encoded string of the metrics [ms] *)
val traces : Opentelemetry_proto.Trace.resource_spans list -> string
(** [metrics ts] is a protobuf encoded string of the traces [ts] *)
end