diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index ab8292d4..4800c2d8 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -55,10 +55,12 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct | `Failure of string ] + (* TODO: use Curl multi *) + (* send the content to the remote endpoint/path *) let send_ ~path ~decode (bod:string) : ('a, error) result = Curl.reset curl; - Curl.set_verbose curl true; + if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); Curl.set_httppost curl []; Curl.set_httpheader curl ["Content-Type: application/x-protobuf"]; @@ -106,29 +108,40 @@ module Backend() : Opentelemetry.Collector.BACKEND = struct Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code Status.pp_status status - let send_trace (tr:Trace_service.export_trace_service_request) : unit = - let@() = with_lock_ in - if !debug_ then Format.eprintf "send trace %a@." Trace_service.pp_export_trace_service_request tr; - Pbrt.Encoder.reset encoder; - Trace_service.encode_export_trace_service_request tr encoder; - match - send_ ~path:"/v1/traces" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) - with - | Ok () -> () - | Error err -> report_err_ err + let send_trace : Trace_service.export_trace_service_request sender = { + send=fun tr ~over ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send trace %a@." Trace_service.pp_export_trace_service_request tr; + Pbrt.Encoder.reset encoder; + Trace_service.encode_export_trace_service_request tr encoder; + begin match + send_ ~path:"/v1/traces" ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder) + with + | Ok () -> () + | Error err -> report_err_ err + end; + over(); + ret() + } - let send_metrics (m:Metrics_service.export_metrics_service_request) : unit = - let@() = with_lock_ in - if !debug_ then Format.eprintf "send metrics %a@." Metrics_service.pp_export_metrics_service_request m; - Pbrt.Encoder.reset encoder; - Metrics_service.encode_export_metrics_service_request m encoder; - match - send_ ~path:"/v1/metrics" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder); - with - | Ok () -> () - | Error err -> report_err_ err + let send_metrics : Metrics_service.export_metrics_service_request sender = { + send=fun m ~over ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send metrics %a@." Metrics_service.pp_export_metrics_service_request m; + Pbrt.Encoder.reset encoder; + Metrics_service.encode_export_metrics_service_request m encoder; + begin + match + send_ ~path:"/v1/metrics" ~decode:(fun _ -> ()) + (Pbrt.Encoder.to_string encoder); + with + | Ok () -> () + | Error err -> report_err_ err + end; + over(); + ret() + } let rand_bytes_8 () : bytes = let@() = with_lock_ in diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 71701a7a..04765491 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -70,11 +70,18 @@ end module Collector = struct open Proto + (** Sender interface for a message of type [msg]. + Inspired from Logs' reporter + (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) *) + type 'msg sender = { + send: 'a. 'msg -> over:(unit -> unit) -> ret:(unit -> 'a) -> 'a; + } + (** Collector client interface. *) module type BACKEND = sig - val send_trace : Trace_service.export_trace_service_request -> unit + val send_trace : Trace_service.export_trace_service_request sender - val send_metrics : Metrics_service.export_metrics_service_request -> unit + val send_metrics : Metrics_service.export_metrics_service_request sender val rand_bytes_16 : unit -> bytes (** Generate 16 bytes of random data *) @@ -89,21 +96,21 @@ module Collector = struct let backend : backend option ref = ref None - let send_trace (l:Trace.resource_spans list) : unit = + let send_trace (l:Trace.resource_spans list) ~over ~ret = match !backend with - | None -> () + | None -> over(); ret() | Some (module B) -> let ev = Trace_service.default_export_trace_service_request ~resource_spans:l () in - B.send_trace ev + B.send_trace.send ev ~over ~ret - let send_metrics (l:Metrics.resource_metrics list) : unit = + let send_metrics (l:Metrics.resource_metrics list) ~over ~ret = match !backend with - | None -> () + | None -> over(); ret() | Some (module B) -> let ev = Metrics_service.default_export_metrics_service_request ~resource_metrics:l () in - B.send_metrics ev + B.send_metrics.send ev ~over ~ret let rand_bytes_16 () = match !backend with @@ -272,12 +279,14 @@ module Trace = struct type span = Span.t + (** Sync emitter *) let emit (spans:span list) : unit = let ils = default_instrumentation_library_spans ~spans () in let rs = default_resource_spans ~instrumentation_library_spans:[ils] () in - Collector.send_trace [rs] + Collector.send_trace [rs] ~over:(fun () -> ()) ~ret:(fun () -> ()) + (** Sync span guard *) let with_ ?trace_state ?service_name ?attrs ?kind ?(trace_id=Trace_id.create()) ?parent ?links @@ -351,17 +360,11 @@ module Metrics = struct (* TODO: summary *) (* TODO: exemplar *) - (** Emit a bunch of metrics to the collector. *) + (** Emit some metrics to the collector (sync). *) let emit (l:t list) : unit = let lm = default_instrumentation_library_metrics ~metrics:l () in let rm = default_resource_metrics ~instrumentation_library_metrics:[lm] () in - Collector.send_metrics [rm] + Collector.send_metrics [rm] ~over:ignore ~ret:ignore end - -(* -module Span = Span -module Timestamp = Timestamp - *) -