make collector async-compatible (inspired from Logs)

This commit is contained in:
Simon Cruanes 2022-03-18 11:05:59 -04:00
parent c8d33aa611
commit 097436f907
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 56 additions and 40 deletions

View file

@ -55,10 +55,12 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct
| `Failure of string | `Failure of string
] ]
(* TODO: use Curl multi *)
(* send the content to the remote endpoint/path *) (* send the content to the remote endpoint/path *)
let send_ ~path ~decode (bod:string) : ('a, error) result = let send_ ~path ~decode (bod:string) : ('a, error) result =
Curl.reset curl; Curl.reset curl;
Curl.set_verbose curl true; if !debug_ then Curl.set_verbose curl true;
Curl.set_url curl (!url ^ path); Curl.set_url curl (!url ^ path);
Curl.set_httppost curl []; Curl.set_httppost curl [];
Curl.set_httpheader curl ["Content-Type: application/x-protobuf"]; Curl.set_httpheader curl ["Content-Type: application/x-protobuf"];
@ -106,29 +108,40 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct
Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@."
code Status.pp_status status code Status.pp_status status
let send_trace (tr:Trace_service.export_trace_service_request) : unit = let send_trace : Trace_service.export_trace_service_request sender = {
let@() = with_lock_ in send=fun tr ~over ~ret ->
if !debug_ then Format.eprintf "send trace %a@." Trace_service.pp_export_trace_service_request tr; let@() = with_lock_ in
Pbrt.Encoder.reset encoder; if !debug_ then Format.eprintf "send trace %a@." Trace_service.pp_export_trace_service_request tr;
Trace_service.encode_export_trace_service_request tr encoder; Pbrt.Encoder.reset encoder;
match Trace_service.encode_export_trace_service_request tr encoder;
send_ ~path:"/v1/traces" ~decode:(fun _ -> ()) begin match
(Pbrt.Encoder.to_string encoder) send_ ~path:"/v1/traces" ~decode:(fun _ -> ())
with (Pbrt.Encoder.to_string encoder)
| Ok () -> () with
| Error err -> report_err_ err | Ok () -> ()
| Error err -> report_err_ err
end;
over();
ret()
}
let send_metrics (m:Metrics_service.export_metrics_service_request) : unit = let send_metrics : Metrics_service.export_metrics_service_request sender = {
let@() = with_lock_ in send=fun m ~over ~ret ->
if !debug_ then Format.eprintf "send metrics %a@." Metrics_service.pp_export_metrics_service_request m; let@() = with_lock_ in
Pbrt.Encoder.reset encoder; if !debug_ then Format.eprintf "send metrics %a@." Metrics_service.pp_export_metrics_service_request m;
Metrics_service.encode_export_metrics_service_request m encoder; Pbrt.Encoder.reset encoder;
match Metrics_service.encode_export_metrics_service_request m encoder;
send_ ~path:"/v1/metrics" ~decode:(fun _ -> ()) begin
(Pbrt.Encoder.to_string encoder); match
with send_ ~path:"/v1/metrics" ~decode:(fun _ -> ())
| Ok () -> () (Pbrt.Encoder.to_string encoder);
| Error err -> report_err_ err with
| Ok () -> ()
| Error err -> report_err_ err
end;
over();
ret()
}
let rand_bytes_8 () : bytes = let rand_bytes_8 () : bytes =
let@() = with_lock_ in let@() = with_lock_ in

View file

@ -70,11 +70,18 @@ end
module Collector = struct module Collector = struct
open Proto open Proto
(** Sender interface for a message of type [msg].
Inspired from Logs' reporter
(see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) *)
type 'msg sender = {
send: 'a. 'msg -> over:(unit -> unit) -> ret:(unit -> 'a) -> 'a;
}
(** Collector client interface. *) (** Collector client interface. *)
module type BACKEND = sig module type BACKEND = sig
val send_trace : Trace_service.export_trace_service_request -> unit val send_trace : Trace_service.export_trace_service_request sender
val send_metrics : Metrics_service.export_metrics_service_request -> unit val send_metrics : Metrics_service.export_metrics_service_request sender
val rand_bytes_16 : unit -> bytes val rand_bytes_16 : unit -> bytes
(** Generate 16 bytes of random data *) (** Generate 16 bytes of random data *)
@ -89,21 +96,21 @@ module Collector = struct
let backend : backend option ref = ref None let backend : backend option ref = ref None
let send_trace (l:Trace.resource_spans list) : unit = let send_trace (l:Trace.resource_spans list) ~over ~ret =
match !backend with match !backend with
| None -> () | None -> over(); ret()
| Some (module B) -> | Some (module B) ->
let ev = Trace_service.default_export_trace_service_request let ev = Trace_service.default_export_trace_service_request
~resource_spans:l () in ~resource_spans:l () in
B.send_trace ev B.send_trace.send ev ~over ~ret
let send_metrics (l:Metrics.resource_metrics list) : unit = let send_metrics (l:Metrics.resource_metrics list) ~over ~ret =
match !backend with match !backend with
| None -> () | None -> over(); ret()
| Some (module B) -> | Some (module B) ->
let ev = Metrics_service.default_export_metrics_service_request let ev = Metrics_service.default_export_metrics_service_request
~resource_metrics:l () in ~resource_metrics:l () in
B.send_metrics ev B.send_metrics.send ev ~over ~ret
let rand_bytes_16 () = let rand_bytes_16 () =
match !backend with match !backend with
@ -272,12 +279,14 @@ module Trace = struct
type span = Span.t type span = Span.t
(** Sync emitter *)
let emit (spans:span list) : unit = let emit (spans:span list) : unit =
let ils = let ils =
default_instrumentation_library_spans ~spans () in default_instrumentation_library_spans ~spans () in
let rs = default_resource_spans ~instrumentation_library_spans:[ils] () in let rs = default_resource_spans ~instrumentation_library_spans:[ils] () in
Collector.send_trace [rs] Collector.send_trace [rs] ~over:(fun () -> ()) ~ret:(fun () -> ())
(** Sync span guard *)
let with_ let with_
?trace_state ?service_name ?attrs ?trace_state ?service_name ?attrs
?kind ?(trace_id=Trace_id.create()) ?parent ?links ?kind ?(trace_id=Trace_id.create()) ?parent ?links
@ -351,17 +360,11 @@ module Metrics = struct
(* TODO: summary *) (* TODO: summary *)
(* TODO: exemplar *) (* TODO: exemplar *)
(** Emit a bunch of metrics to the collector. *) (** Emit some metrics to the collector (sync). *)
let emit (l:t list) : unit = let emit (l:t list) : unit =
let lm = let lm =
default_instrumentation_library_metrics ~metrics:l () in default_instrumentation_library_metrics ~metrics:l () in
let rm = default_resource_metrics let rm = default_resource_metrics
~instrumentation_library_metrics:[lm] () in ~instrumentation_library_metrics:[lm] () in
Collector.send_metrics [rm] Collector.send_metrics [rm] ~over:ignore ~ret:ignore
end end
(*
module Span = Span
module Timestamp = Timestamp
*)