diff --git a/src/client-ocurl/batch.ml b/src/client-ocurl/batch.ml new file mode 100644 index 00000000..122013a8 --- /dev/null +++ b/src/client-ocurl/batch.ml @@ -0,0 +1,24 @@ +type 'a t = { + mutable len: int; + mutable l: 'a list list; + mutable started: Mtime.t; +} + +let create () = { len = 0; l = []; started = Mtime_clock.now () } + +let push self l = + if l != [] then ( + if self.l == [] then self.started <- Mtime_clock.now (); + self.l <- l :: self.l; + self.len <- self.len + List.length l + ) + +let len self = self.len + +let time_started self = self.started + +let pop_all self = + let l = self.l in + self.l <- []; + self.len <- 0; + l diff --git a/src/client-ocurl/batch.mli b/src/client-ocurl/batch.mli new file mode 100644 index 00000000..2b867b88 --- /dev/null +++ b/src/client-ocurl/batch.mli @@ -0,0 +1,14 @@ +(** List of lists with length *) + +type 'a t + +val create : unit -> 'a t + +val push : 'a t -> 'a list -> unit + +val len : _ t -> int + +val time_started : _ t -> Mtime.t +(** Time at which the batch most recently became non-empty *) + +val pop_all : 'a t -> 'a list list diff --git a/src/client-ocurl/config.ml b/src/client-ocurl/config.ml index 1a790654..3dfea160 100644 --- a/src/client-ocurl/config.ml +++ b/src/client-ocurl/config.ml @@ -18,10 +18,6 @@ let pp out self = debug url ppheaders headers batch_timeout_ms bg_threads let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) - ?(batch_timeout_ms = 500) ?bg_threads () : t = - let bg_threads = - match bg_threads with - | Some n -> max n 2 - | None -> 4 - in + ?(batch_timeout_ms = 500) ?(bg_threads = 4) () : t = + let bg_threads = max 2 (min bg_threads 32) in { debug; url; headers; batch_timeout_ms; bg_threads } diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 46e6a1c9..79b0e4cf 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -1,3 +1,5 @@ +(** Configuration for the ocurl backend *) + type t = private { debug: bool; url: string; diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index cb30f1ff..00aea025 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -4,6 +4,7 @@ *) module OT = Opentelemetry +module Config = Config open Opentelemetry include Common_ @@ -13,15 +14,15 @@ let last_gc_metrics = Atomic.make (Mtime_clock.now ()) let timeout_gc_metrics = Mtime.Span.(20 * s) +(** side channel for GC, appended to metrics batch data *) let gc_metrics = AList.make () -(* side channel for GC, appended to {!E_metrics}'s data *) -(* capture current GC metrics if {!needs_gc_metrics} is true - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) +(** capture current GC metrics if {!needs_gc_metrics} is true + or it has been a long time since the last GC metrics collection, + and push them into {!gc_metrics} for later collection *) let sample_gc_metrics_if_needed () = let now = Mtime_clock.now () in - let alarm = Atomic.compare_and_set needs_gc_metrics true false in + let alarm = Atomic.exchange needs_gc_metrics false in let timeout () = let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in Mtime.Span.compare elapsed timeout_gc_metrics > 0 @@ -36,663 +37,370 @@ let sample_gc_metrics_if_needed () = AList.add gc_metrics l ) -module Config = Config - -let _init_curl = - lazy - (Curl.global_init Curl.CURLINIT_GLOBALALL; - at_exit Curl.global_cleanup) - -type error = - [ `Status of int * Opentelemetry.Proto.Status.status - | `Failure of string - | `Sysbreak - ] - let n_errors = Atomic.make 0 let n_dropped = Atomic.make 0 -let report_err_ = function - | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" - | `Failure msg -> - Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details }) - -> - let pp_details out l = - List.iter - (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) - l - in - Format.eprintf - "@[<2>opentelemetry: export failed with@ http code=%d@ status \ - {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." - code scode - (Bytes.unsafe_to_string message) - pp_details details - -module Httpc : sig - type t - - val create : unit -> t - - val send : - t -> - path:string -> - decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> - string -> - ('a, error) result - - val cleanup : t -> unit -end = struct +(** Something sent to the collector *) +module Event = struct open Opentelemetry.Proto - let () = Lazy.force _init_curl - - (* TODO: use Curl.Multi, etc. instead? *) - type t = { - buf_res: Buffer.t; - curl: Curl.t; - } - - let create () : t = { buf_res = Buffer.create 256; curl = Curl.init () } - - let cleanup self = Curl.cleanup self.curl - - (* send the content to the remote endpoint/path *) - let send (self : t) ~path ~decode (bod : string) : ('a, error) result = - let { curl; buf_res } = self in - Curl.reset curl; - if !debug_ then Curl.set_verbose curl true; - let full_url = !url ^ path in - Curl.set_url curl full_url; - Curl.set_httppost curl []; - let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in - let http_headers = List.map to_http_header !headers in - Curl.set_httpheader curl - ("Content-Type: application/x-protobuf" :: http_headers); - (* write body *) - Curl.set_post curl true; - Curl.set_postfieldsize curl (String.length bod); - Curl.set_readfunction curl - (let i = ref 0 in - fun n -> - if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; - let len = min n (String.length bod - !i) in - let s = String.sub bod !i len in - if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; - i := !i + len; - s); - (* read result's body *) - Buffer.clear buf_res; - Curl.set_writefunction curl (fun s -> - Buffer.add_string buf_res s; - String.length s); - try - match Curl.perform curl with - | () -> - let code = Curl.get_responsecode curl in - if !debug_ then - Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); - if code >= 200 && code < 300 then ( - match decode with - | `Ret x -> Ok x - | `Dec f -> - let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in - (try Ok (f dec) - with e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) - bt))) - ) else ( - let str = Buffer.contents buf_res in - let dec = Pbrt.Decoder.of_string str in - - try - let status = Status.decode_status dec in - Error (`Status (code, status)) - with e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf - "httpc: decoding of status (url=%S, code=%d) failed with:\n\ - %s\n\ - status: %S\n\ - %s" - full_url code (Printexc.to_string e) str bt)) - ) - | exception Sys.Break -> Error `Sysbreak - | exception Curl.CurlException (_, code, msg) -> - let status = - Status.default_status ~code:(Int32.of_int code) - ~message:(Bytes.unsafe_of_string msg) - () - in - Error (`Status (code, status)) - with - | Sys.Break -> Error `Sysbreak - | e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf "httpc: post on url=%S failed with:\n%s\n%s" full_url - (Printexc.to_string e) bt)) + type t = + | E_metric of Metrics.resource_metrics list + | E_trace of Trace.resource_spans list + | E_logs of Logs.resource_logs list + | E_tick + | E_flush_all (** Flush all batches *) end -(** Batch of resources to be pushed later. - - This type is thread-safe. *) -module Batch : sig - type 'a t - - val push : 'a t -> 'a -> bool - (** [push batch x] pushes [x] into the batch, and heuristically - returns [true] if the batch is ready to be emitted (to know if we should - wake up the sending thread, if any) *) - - val push' : 'a t -> 'a -> unit - - val is_ready : now:Mtime.t -> _ t -> bool - (** is the batch ready to be sent? This is heuristic. *) - - val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option - (** Is the batch ready to be emitted? If batching is disabled, - this is true as soon as {!is_empty} is false. If a timeout is provided - for this batch, then it will be ready if an element has been in it - for at least the timeout. - @param now passed to implement timeout *) - - val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t - (** Create a new batch *) -end = struct - type 'a t = { - lock: Mutex.t; - mutable size: int; - mutable q: 'a list; - batch: int option; - high_watermark: int; - timeout: Mtime.span option; - mutable start: Mtime.t; - } - - let make ?batch ?timeout () : _ t = - Option.iter (fun b -> assert (b > 0)) batch; - let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in - { - lock = Mutex.create (); - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; - } - - let timeout_expired_ ~now self : bool = - match self.timeout with - | Some t -> - let elapsed = Mtime.span now self.start in - Mtime.Span.compare elapsed t >= 0 - | None -> false - - let is_full_ self : bool = - match self.batch with - | None -> self.size > 0 - | Some b -> self.size >= b - - let is_ready ~now self : bool = - let@ () = with_mutex_ self.lock in - is_full_ self || timeout_expired_ ~now self - - let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - let@ () = with_mutex_ self.lock in - if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) - then ( - let l = self.q in - self.q <- []; - self.size <- 0; - assert (l <> []); - Some l - ) else - None - - let push (self : _ t) x : bool = - let@ () = with_mutex_ self.lock in - if self.size >= self.high_watermark then ( - (* drop this to prevent queue from growing too fast *) - Atomic.incr n_dropped; - true - ) else ( - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); - - (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready - ) - - let push' self x = ignore (push self x : bool) -end - -(** An emitter. This is used by {!Backend} below to forward traces/metrics/… - from the program to whatever collector client we have. *) -module type EMITTER = sig +(** Something to be sent via HTTP *) +module To_send = struct open Opentelemetry.Proto - val push_trace : Trace.resource_spans list -> unit - - val push_metrics : Metrics.resource_metrics list -> unit - - val push_logs : Logs.resource_logs list -> unit - - val set_on_tick_callbacks : (unit -> unit) list ref -> unit - - val tick : unit -> unit - - val cleanup : unit -> unit + type t = + | Send_metric of Metrics.resource_metrics list list + | Send_trace of Trace.resource_spans list list + | Send_logs of Logs.resource_logs list list end -(* start a thread in the background, running [f()] *) -let start_bg_thread (f : unit -> unit) : unit = +(** start a thread in the background, running [f()] *) +let start_bg_thread (f : unit -> unit) : Thread.t = let run () = (* block some signals: USR1 USR2 TERM PIPE ALARM STOP, see [$ kill -L] *) ignore (Thread.sigmask Unix.SIG_BLOCK [ 10; 12; 13; 14; 15; 19 ] : _ list); f () in - ignore (Thread.create run () : Thread.t) + Thread.create run () -(* make an emitter. +let str_to_hex (s : string) : string = + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + in - exceptions inside should be caught, see - https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = - let open Proto in - (* local helpers *) - let open struct - let timeout = - if config.batch_timeout_ms > 0 then - Some Mtime.Span.(config.batch_timeout_ms * ms) - else - None + let res = Bytes.create (2 * String.length s) in + for i = 0 to String.length s - 1 do + let n = Char.code (String.get s i) in + Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) + done; + Bytes.unsafe_to_string res - let batch_traces : Trace.resource_spans list Batch.t = - Batch.make ?batch:config.batch_traces ?timeout () +module Backend_impl : sig + type t - let batch_metrics : Metrics.resource_metrics list Batch.t = - Batch.make ?batch:config.batch_metrics ?timeout () + val create : stop:bool Atomic.t -> config:Config.t -> unit -> t - let batch_logs : Logs.resource_logs list Batch.t = - Batch.make ?batch:config.batch_logs ?timeout () + val send_event : t -> Event.t -> unit + + val shutdown : t -> unit +end = struct + open Opentelemetry.Proto + + type t = { + stop: bool Atomic.t; + cleaned: bool Atomic.t; (** True when we cleaned up after closing *) + config: Config.t; + q: Event.t B_queue.t; (** Queue to receive data from the user's code *) + mutable main_th: Thread.t option; (** Thread that listens on [q] *) + send_q: To_send.t B_queue.t; (** Queue for the send worker threads *) + mutable send_threads: Thread.t array; (** Threads that send data via http *) + } + + let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit + = + Pbrt.Encoder.reset encoder; + encode x encoder; + let data = Pbrt.Encoder.to_string encoder in + let url = config.Config.url ^ path in + match + Ezcurl.post ~headers:config.headers ~client ~params:[] ~url + ~content:(`String data) () + with + | Ok { code; _ } when code >= 200 && code < 300 -> () + | Ok { code; body; headers = _; info = _ } -> + Atomic.incr n_errors; + if config.debug then ( + let dec = Pbrt.Decoder.of_string body in + let body = + try + let status = Status.decode_status dec in + Format.asprintf "%a" Status.pp_status status + with _ -> + spf "(could not decode status)\nraw bytes: %s" (str_to_hex body) + in + Printf.eprintf "error while sending:\n code=%d\n %s\n%!" code body + ); + () + | exception Sys.Break -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + Atomic.set stop true + | Error (code, msg) -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + + Printf.eprintf "opentelemetry: export failed:\n %s\n curl code: %s\n%!" + msg (Curl.strerror code); + + (* avoid crazy error loop *) + Thread.delay 3. + + let send_logs_http ~stop ~config (client : Curl.t) encoder + (l : Logs.resource_logs list list) : unit = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Logs_service.default_export_logs_service_request ~resource_logs:l () + in + send_http_ ~stop ~config client encoder ~path:"/v1/logs" + ~encode:Logs_service.encode_export_logs_service_request x + + let send_metrics_http ~stop ~config curl encoder + (l : Metrics.resource_metrics list list) : unit = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Metrics_service.default_export_metrics_service_request ~resource_metrics:l + () + in + send_http_ ~stop ~config curl encoder ~path:"/v1/metrics" + ~encode:Metrics_service.encode_export_metrics_service_request x + + let send_traces_http ~stop ~config curl encoder + (l : Trace.resource_spans list list) : unit = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Trace_service.default_export_trace_service_request ~resource_spans:l () + in + send_http_ ~stop ~config curl encoder ~path:"/v1/traces" + ~encode:Trace_service.encode_export_trace_service_request x + + let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev + + (** Thread that, in a loop, reads from [q] to get the + next message to send via http *) + let bg_thread_loop (self : t) : unit = + Ezcurl.with_client @@ fun client -> + let stop = self.stop in + let config = self.config in + let encoder = Pbrt.Encoder.create () in + try + while not (Atomic.get stop) do + let msg = B_queue.pop self.send_q in + match msg with + | To_send.Send_trace tr -> + send_traces_http ~stop ~config client encoder tr + | To_send.Send_metric ms -> + send_metrics_http ~stop ~config client encoder ms + | To_send.Send_logs logs -> + send_logs_http ~stop ~config client encoder logs + done + with B_queue.Closed -> () + + type batches = { + traces: Proto.Trace.resource_spans Batch.t; + logs: Proto.Logs.resource_logs Batch.t; + metrics: Proto.Metrics.resource_metrics Batch.t; + } + + let batch_timeout_expired_ ~config ~now (b : _ Batch.t) : bool = + Batch.len b > 0 + && + let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in + let elapsed = Mtime.span now (Batch.time_started b) in + Mtime.Span.compare elapsed timeout >= 0 + + let main_thread_loop (self : t) : unit = + let local_q = Queue.create () in + let config = self.config in + + (* keep track of batches *) + let batches = + { + traces = Batch.create (); + logs = Batch.create (); + metrics = Batch.create (); + } + in + + let send_metrics () = + B_queue.push self.send_q + (To_send.Send_metric (Batch.pop_all batches.metrics)) + in + + let send_logs () = + B_queue.push self.send_q (To_send.Send_logs (Batch.pop_all batches.logs)) + in + + let send_traces () = + B_queue.push self.send_q + (To_send.Send_trace (Batch.pop_all batches.traces)) + in + + try + while not (Atomic.get self.stop) do + (* read multiple events at once *) + B_queue.pop_all self.q local_q; + + (* how to process a single event *) + let process_ev (ev : Event.t) : unit = + match ev with + | Event.E_metric _ | Event.E_trace _ | Event.E_logs _ -> () + | Event.E_tick -> + (* check for batches whose timeout expired *) + let now = Mtime_clock.now () in + if batch_timeout_expired_ ~config ~now batches.metrics then + send_metrics (); + if batch_timeout_expired_ ~config ~now batches.logs then + send_logs (); + if batch_timeout_expired_ ~config ~now batches.traces then + send_traces () + | Event.E_flush_all -> + if Batch.len batches.metrics > 0 then send_metrics (); + if Batch.len batches.logs > 0 then send_logs (); + if Batch.len batches.traces > 0 then send_traces () + in + + while not (Queue.is_empty local_q) do + let ev = Queue.pop local_q in + process_ev ev + done + done + with B_queue.Closed -> () + + let create ~stop ~config () : t = + let n_send_threads = max 2 config.Config.bg_threads in + let self = + { + stop; + config; + q = B_queue.create (); + send_threads = [||]; + send_q = B_queue.create (); + cleaned = Atomic.make false; + main_th = None; + } + in + + let main_th = start_bg_thread (fun () -> main_thread_loop self) in + self.main_th <- Some main_th; + + self.send_threads <- + Array.init n_send_threads (fun _i -> + start_bg_thread (fun () -> bg_thread_loop self)); + + self + + let shutdown self : unit = + Atomic.set self.stop true; + if not (Atomic.exchange self.cleaned true) then ( + (* empty batches *) + send_event self Event.E_flush_all; + (* close the incoming queue, wait for the thread to finish + before we start cutting off the background threads, so that they + have time to receive the final batches *) + B_queue.close self.q; + Option.iter Thread.join self.main_th; + (* close send queues, then wait for all threads *) + B_queue.close self.send_q; + Array.iter Thread.join self.send_threads + ) +end + +let mk_backend ~stop ~config () : (module Collector.BACKEND) = + let module M = struct + open Opentelemetry.Proto + open Opentelemetry.Collector + + let backend = Backend_impl.create ~stop ~config () + + let send_trace : Trace.resource_spans list sender = + { + send = + (fun l ~ret -> + Backend_impl.send_event backend (Event.E_trace l); + ret ()); + } + + let last_sent_metrics = Atomic.make (Mtime_clock.now ()) + + (* send metrics from time to time *) + let timeout_sent_metrics = Mtime.Span.(5 * s) + + let signal_emit_gc_metrics () = + if !debug_ then + Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; + Atomic.set needs_gc_metrics true + + let additional_metrics () : Metrics.resource_metrics list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now () in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + (* there is a possible race condition here, as several threads might update + metrics at the same time. But that's harmless. *) + if add_own_metrics then ( + Atomic.set last_sent_metrics now; + let open OT.Metrics in + [ + make_resource_metrics + [ + sum ~name:"otel.export.dropped" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel.export.errors" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]; + ] + ) else + [] + + let send_metrics : Metrics.resource_metrics list sender = + { + send = + (fun m ~ret -> + let m = List.rev_append (additional_metrics ()) m in + Backend_impl.send_event backend (Event.E_metric m); + ret ()); + } + + let send_logs : Logs.resource_logs list sender = + { + send = + (fun m ~ret -> + Backend_impl.send_event backend (Event.E_logs m); + ret ()); + } let on_tick_cbs_ = Atomic.make (ref []) let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in - match Httpc.send httpc ~path ~decode:(`Ret ()) data with - | Ok () -> () - | Error `Sysbreak -> - Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set stop true - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err; - (* avoid crazy error loop *) - Thread.delay 3. - - let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) - = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () - in - send_http_ curl encoder ~path:"/v1/metrics" - ~encode:Metrics_service.encode_export_metrics_service_request x - - let send_traces_http curl encoder (l : Trace.resource_spans list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - send_http_ curl encoder ~path:"/v1/traces" - ~encode:Trace_service.encode_export_trace_service_request x - - let send_logs_http curl encoder (l : Logs.resource_logs list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - send_http_ curl encoder ~path:"/v1/logs" - ~encode:Logs_service.encode_export_logs_service_request x - - (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc encoder : bool = - match Batch.pop_if_ready ?force ~now batch_metrics with - | None -> false - | Some l -> - let batch = AList.pop_all gc_metrics :: l in - send_metrics_http httpc encoder batch; - true - - let emit_traces_maybe ~now ?force httpc encoder : bool = - match Batch.pop_if_ready ?force ~now batch_traces with - | None -> false - | Some l -> - send_traces_http httpc encoder l; - true - - let emit_logs_maybe ~now ?force httpc encoder : bool = - match Batch.pop_if_ready ?force ~now batch_logs with - | None -> false - | Some l -> - send_logs_http httpc encoder l; - true - - let[@inline] guard_exn_ where f = - try f () - with e -> - let bt = Printexc.get_backtrace () in - Printf.eprintf - "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where - (Printexc.to_string e) bt - - let emit_all_force (httpc : Httpc.t) encoder = - let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool); - ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool); - ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) - - let tick_common_ () = - if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ()); - sample_gc_metrics_if_needed (); - List.iter - (fun f -> - try f () - with e -> - Printf.eprintf "on tick callback raised: %s\n" - (Printexc.to_string e)) - !(Atomic.get on_tick_cbs_); - () - - let setup_ticker_thread ~tick ~finally () = - (* thread that calls [tick()] regularly, to help enforce timeouts *) - let tick_thread () = - let@ () = - Fun.protect ~finally:(fun () -> - Atomic.set stop true; - finally ()) - in - while not @@ Atomic.get stop do - Thread.delay 0.5; - tick () - done - in - start_bg_thread tick_thread - end in - (* setup a global lock *) - (let global_lock_ = Mutex.create () in - Lock.set_mutex - ~lock:(fun () -> Mutex.lock global_lock_) - ~unlock:(fun () -> Mutex.unlock global_lock_)); - - if config.bg_threads > 0 then ( - (* lock+condition used for background threads to wait, and be woken up - when a batch is ready *) - let m = Mutex.create () in - let cond = Condition.create () in - - (* loop for the thread that processes events and sends them to collector *) - let bg_thread () = - let httpc = Httpc.create () in - let encoder = Pbrt.Encoder.create () in - while not @@ Atomic.get stop do - let@ () = guard_exn_ (spf "bg thread[%d] (main loop)" @@ tid ()) in - - let now = Mtime_clock.now () in - let do_metrics = emit_metrics_maybe ~now httpc encoder in - let do_traces = emit_traces_maybe ~now httpc encoder in - let do_logs = emit_logs_maybe ~now httpc encoder in - if (not do_metrics) && (not do_traces) && not do_logs then ( - let@ () = guard_exn_ (spf "bg thread[%d] (waiting)" @@ tid ()) in - (* wait for something to happen *) - Mutex.lock m; - Condition.wait cond m; - Mutex.unlock m - ) - done; - (* flush remaining events once we exit *) - let@ () = guard_exn_ "bg thread (cleanup)" in - emit_all_force httpc encoder; - Httpc.cleanup httpc - in - - for _i = 1 to config.bg_threads do - start_bg_thread bg_thread - done; - - (* if the bg thread waits, this will wake it up so it can send batches *) - let wakeup ~all () = - with_mutex_ m (fun () -> - if all then - Condition.broadcast cond - else - Condition.signal cond); - Thread.yield () - in - let tick () = - tick_common_ (); + sample_gc_metrics_if_needed (); + Backend_impl.send_event backend Event.E_tick; + let l = Atomic.get on_tick_cbs_ in + List.iter (fun f -> f ()) !l - let now = Mtime_clock.now () in - if Atomic.get stop then - wakeup ~all:true () - else if - Batch.is_ready ~now batch_metrics - || Batch.is_ready ~now batch_traces - || Batch.is_ready ~now batch_logs - then - wakeup ~all:false () - in - - if config.ticker_thread then - setup_ticker_thread ~tick ~finally:(fun () -> wakeup ~all:true ()) (); - - let module M = struct - let push_trace e = if Batch.push batch_traces e then wakeup ~all:false () - - let push_metrics e = - if Batch.push batch_metrics e then wakeup ~all:false () - - let push_logs e = if Batch.push batch_logs e then wakeup ~all:false () - - let set_on_tick_callbacks = set_on_tick_callbacks - - let tick = tick - - let cleanup () = - Atomic.set stop true; - if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - wakeup ~all:true () - end in - (module M) - ) else ( - let httpc = Httpc.create () in - let encoder = Pbrt.Encoder.create () in - - let module M = struct - (* we make sure that this is thread-safe, even though we don't have a - background thread. There can still be a ticker thread, and there - can also be several user threads that produce spans and call - the emit functions. *) - - let push_trace e = - let@ () = guard_exn_ "push trace" in - Batch.push' batch_traces e; - let now = Mtime_clock.now () in - let@ () = Lock.with_lock in - ignore (emit_traces_maybe ~now httpc encoder : bool) - - let push_metrics e = - let@ () = guard_exn_ "push metrics" in - sample_gc_metrics_if_needed (); - Batch.push' batch_metrics e; - let now = Mtime_clock.now () in - let@ () = Lock.with_lock in - ignore (emit_metrics_maybe ~now httpc encoder : bool) - - let push_logs e = - let@ () = guard_exn_ "push logs" in - Batch.push' batch_logs e; - let now = Mtime_clock.now () in - let@ () = Lock.with_lock in - ignore (emit_logs_maybe ~now httpc encoder : bool) - - let set_on_tick_callbacks = set_on_tick_callbacks - - let tick () = - tick_common_ (); - sample_gc_metrics_if_needed (); - let@ () = Lock.with_lock in - let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now httpc encoder : bool); - ignore (emit_metrics_maybe ~now httpc encoder : bool); - ignore (emit_logs_maybe ~now httpc encoder : bool); - () - - (* make sure we have a ticker thread, if required *) - let () = - if config.ticker_thread then - setup_ticker_thread ~tick ~finally:ignore () - - let cleanup () = - if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - emit_all_force httpc encoder; - Httpc.cleanup httpc - end in - (module M) - ) - -module Backend (Arg : sig - val stop : bool Atomic.t - - val config : Config.t -end) -() : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) - - open Opentelemetry.Proto - open Opentelemetry.Collector - - let send_trace : Trace.resource_spans list sender = - { - send = - (fun l ~ret -> - (if !debug_ then - let@ () = Lock.with_lock in - Format.eprintf "send spans %a@." - (Format.pp_print_list Trace.pp_resource_spans) - l); - push_trace l; - ret ()); - } - - let last_sent_metrics = Atomic.make (Mtime_clock.now ()) - - let timeout_sent_metrics = Mtime.Span.(5 * s) - (* send metrics from time to time *) - - let signal_emit_gc_metrics () = - if !debug_ then - Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true - - let additional_metrics () : Metrics.resource_metrics list = - (* add exporter metrics to the lot? *) - let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now () in - let add_own_metrics = - let elapsed = Mtime.span last_emit now in - Mtime.Span.compare elapsed timeout_sent_metrics > 0 - in - - (* there is a possible race condition here, as several threads might update - metrics at the same time. But that's harmless. *) - if add_own_metrics then ( - Atomic.set last_sent_metrics now; - let open OT.Metrics in - [ - make_resource_metrics - [ - sum ~name:"otel.export.dropped" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel.export.errors" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]; - ] - ) else - [] - - let send_metrics : Metrics.resource_metrics list sender = - { - send = - (fun m ~ret -> - (if !debug_ then - let@ () = Lock.with_lock in - Format.eprintf "send metrics %a@." - (Format.pp_print_list Metrics.pp_resource_metrics) - m); - - let m = List.rev_append (additional_metrics ()) m in - push_metrics m; - ret ()); - } - - let send_logs : Logs.resource_logs list sender = - { - send = - (fun m ~ret -> - (if !debug_ then - let@ () = Lock.with_lock in - Format.eprintf "send logs %a@." - (Format.pp_print_list Logs.pp_resource_logs) - m); - - push_logs m; - ret ()); - } -end + let cleanup () = Backend_impl.shutdown backend + end in + (module M) let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () = debug_ := config.debug; - let module B = - Backend - (struct - let stop = stop - - let config = config - end) - () - in - Opentelemetry.Collector.set_backend (module B); + let ((module B) as backend) = mk_backend ~stop ~config () in + Opentelemetry.Collector.set_backend backend; B.cleanup let setup ?stop ?(config = Config.make ()) ?(enable = true) () =