diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3a5c66ec..aebd8efa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -3,7 +3,7 @@ name: build on: pull_request: push: - branch: + branches: - master jobs: @@ -13,7 +13,7 @@ jobs: matrix: os: - ubuntu-latest - - windows-latest + #- windows-latest #- macos-latest ocaml-compiler: - 4.08.x diff --git a/dune b/dune index 1c6b45f6..575b5a09 100644 --- a/dune +++ b/dune @@ -1,3 +1,3 @@ (env (_ - (flags :standard -warn-error -a+8 -w +a-4-30-40-41-42-44-70 -strict-sequence))) + (flags :standard -warn-error -a+8 -w +a-4-30-40-41-42-44-48-70 -strict-sequence))) diff --git a/dune-project b/dune-project index 6d660768..ec02e11c 100644 --- a/dune-project +++ b/dune-project @@ -45,8 +45,9 @@ (opentelemetry (= :version)) (pbrt (>= 2.3)) (odoc :with-doc) + (ezcurl (>= 0.2.3)) ocurl) - (synopsis "Collector client for opentelemetry, using http + ocurl")) + (synopsis "Collector client for opentelemetry, using http + ezcurl")) (package (name opentelemetry-cohttp-lwt) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index 30305dc4..1f1b4c4d 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" version: "0.4" -synopsis: "Collector client for opentelemetry, using http + ocurl" +synopsis: "Collector client for opentelemetry, using http + ezcurl" maintainer: ["the Imandra team and contributors"] authors: ["the Imandra team and contributors"] license: "MIT" @@ -14,6 +14,7 @@ depends: [ "opentelemetry" {= version} "pbrt" {>= "2.3"} "odoc" {with-doc} + "ezcurl" {>= "0.2.3"} "ocurl" ] build: [ diff --git a/src/client-ocurl/b_queue.ml b/src/client-ocurl/b_queue.ml new file mode 100644 index 00000000..21d6012f --- /dev/null +++ b/src/client-ocurl/b_queue.ml @@ -0,0 +1,69 @@ +type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; + mutable closed: bool; +} + +exception Closed + +let create () : _ t = + { + mutex = Mutex.create (); + cond = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Condition.broadcast self.cond (* awake waiters so they fail *) + ); + Mutex.unlock self.mutex + +let push (self : _ t) x : unit = + Mutex.lock self.mutex; + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else ( + Queue.push x self.q; + Condition.signal self.cond; + Mutex.unlock self.mutex + ) + +let pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + let rec loop () = + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) else if Queue.is_empty self.q then ( + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + Mutex.unlock self.mutex; + x + ) + in + loop () + +let pop_all (self : 'a t) into : unit = + Mutex.lock self.mutex; + let rec loop () = + if Queue.is_empty self.q then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + Queue.transfer self.q into; + Mutex.unlock self.mutex + ) + in + loop () diff --git a/src/client-ocurl/b_queue.mli b/src/client-ocurl/b_queue.mli new file mode 100644 index 00000000..73bf3edc --- /dev/null +++ b/src/client-ocurl/b_queue.mli @@ -0,0 +1,23 @@ +(** Basic Blocking Queue *) + +type 'a t + +val create : unit -> _ t + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + +val pop : 'a t -> 'a +(** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val pop_all : 'a t -> 'a Queue.t -> unit +(** [pop_all q into] pops all the elements of [q] + and moves them into [into]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val close : _ t -> unit +(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/client-ocurl/batch.ml b/src/client-ocurl/batch.ml new file mode 100644 index 00000000..0be8b1b0 --- /dev/null +++ b/src/client-ocurl/batch.ml @@ -0,0 +1,24 @@ +type 'a t = { + mutable len: int; + mutable l: 'a list list; + mutable started: Mtime.t; +} + +let create () = { len = 0; l = []; started = Mtime_clock.now () } + +let push self l = + if l != [] then ( + if self.l == [] then self.started <- Mtime_clock.now (); + self.l <- l :: self.l; + self.len <- self.len + List.length l + ) + +let[@inline] len self = self.len + +let[@inline] time_started self = self.started + +let pop_all self = + let l = self.l in + self.l <- []; + self.len <- 0; + l diff --git a/src/client-ocurl/batch.mli b/src/client-ocurl/batch.mli new file mode 100644 index 00000000..2b867b88 --- /dev/null +++ b/src/client-ocurl/batch.mli @@ -0,0 +1,14 @@ +(** List of lists with length *) + +type 'a t + +val create : unit -> 'a t + +val push : 'a t -> 'a list -> unit + +val len : _ t -> int + +val time_started : _ t -> Mtime.t +(** Time at which the batch most recently became non-empty *) + +val pop_all : 'a t -> 'a list list diff --git a/src/client-ocurl/common_.ml b/src/client-ocurl/common_.ml index 592a4462..f688ff2b 100644 --- a/src/client-ocurl/common_.ml +++ b/src/client-ocurl/common_.ml @@ -1,10 +1,10 @@ module Atomic = Opentelemetry_atomic.Atomic include Opentelemetry.Lock -let[@inline] ( let@ ) f x = f x - let spf = Printf.sprintf +let ( let@ ) = ( @@ ) + let tid () = Thread.id @@ Thread.self () let debug_ = @@ -22,11 +22,6 @@ let get_url () = !url let set_url s = url := s -(** [with_mutex m f] calls [f()] in a section where [m] is locked. *) -let[@inline] with_mutex_ m f = - Mutex.lock m; - Fun.protect ~finally:(fun () -> Mutex.unlock m) f - let parse_headers s = let parse_header s = match String.split_on_char '=' s with diff --git a/src/client-ocurl/config.ml b/src/client-ocurl/config.ml index b86e6305..0abca7e1 100644 --- a/src/client-ocurl/config.ml +++ b/src/client-ocurl/config.ml @@ -4,58 +4,23 @@ type t = { debug: bool; url: string; headers: (string * string) list; - batch_traces: int option; - batch_metrics: int option; - batch_logs: int option; batch_timeout_ms: int; bg_threads: int; ticker_thread: bool; } let pp out self = - let ppiopt = Format.pp_print_option Format.pp_print_int in let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in let ppheaders = Format.pp_print_list pp_header in - let { - debug; - url; - headers; - batch_traces; - batch_metrics; - batch_logs; - batch_timeout_ms; - bg_threads; - ticker_thread; - } = + let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } = self in Format.fprintf out - "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ - batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}" - debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt - batch_logs batch_timeout_ms bg_threads ticker_thread + "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \ + ticker_thread=%B @]}" + debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) - ?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400) - ?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads - ?(ticker_thread = true) () : t = - let bg_threads = - match bg_threads with - | Some n -> max n 0 - | None -> - if thread then - 4 - else - 0 - in - { - debug; - url; - headers; - batch_traces; - batch_metrics; - batch_timeout_ms; - batch_logs; - bg_threads; - ticker_thread; - } + ?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t = + let bg_threads = max 2 (min bg_threads 32) in + { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 957dba13..17a7711e 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -1,3 +1,5 @@ +(** Configuration for the ocurl backend *) + type t = private { debug: bool; url: string; @@ -6,23 +8,6 @@ type t = private { headers: (string * string) list; (** API headers sent to the endpoint. Default is none or "OTEL_EXPORTER_OTLP_HEADERS" if set. *) - batch_traces: int option; - (** Batch traces? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. - - Note that traces and metrics are batched separately. - Default [Some 400]. - *) - batch_metrics: int option; - (** Batch metrics? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. - - Note that traces and metrics are batched separately. - Default [None]. - *) - batch_logs: int option; - (** Batch logs? See {!batch_metrics} for details. - Default [Some 400] *) batch_timeout_ms: int; (** Number of milliseconds after which we will emit a batch, even incomplete. @@ -31,10 +16,8 @@ type t = private { bg_threads: int; (** Are there background threads, and how many? Default [4] *) ticker_thread: bool; - (** Is there a ticker thread? Default [true]. - This thread will regularly call [tick()] on the backend, to make - sure it makes progress, and regularly send events to the collector. - This option is ignored if [bg_threads=0]. *) + (** If true, start a thread that regularly checks if signals should + be sent to the collector. Default [true] *) } (** Configuration. @@ -45,20 +28,12 @@ val make : ?debug:bool -> ?url:string -> ?headers:(string * string) list -> - ?batch_traces:int option -> - ?batch_metrics:int option -> - ?batch_logs:int option -> ?batch_timeout_ms:int -> - ?thread:bool -> ?bg_threads:int -> ?ticker_thread:bool -> unit -> t (** Make a configuration. - - @param thread if true and [bg_threads] is not provided, we will pick a number - of bg threads. Otherwise the number of [bg_threads] superseeds this option. - *) val pp : Format.formatter -> t -> unit diff --git a/src/client-ocurl/dune b/src/client-ocurl/dune index 8e5c873f..a8162b4a 100644 --- a/src/client-ocurl/dune +++ b/src/client-ocurl/dune @@ -2,4 +2,4 @@ (name opentelemetry_client_ocurl) (public_name opentelemetry-client-ocurl) (libraries opentelemetry opentelemetry.atomic curl pbrt threads mtime - mtime.clock.os)) + mtime.clock.os ezcurl ezcurl.core)) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index cb30f1ff..e0d6a596 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -4,6 +4,7 @@ *) module OT = Opentelemetry +module Config = Config open Opentelemetry include Common_ @@ -13,15 +14,15 @@ let last_gc_metrics = Atomic.make (Mtime_clock.now ()) let timeout_gc_metrics = Mtime.Span.(20 * s) +(** side channel for GC, appended to metrics batch data *) let gc_metrics = AList.make () -(* side channel for GC, appended to {!E_metrics}'s data *) -(* capture current GC metrics if {!needs_gc_metrics} is true - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) +(** capture current GC metrics if {!needs_gc_metrics} is true + or it has been a long time since the last GC metrics collection, + and push them into {!gc_metrics} for later collection *) let sample_gc_metrics_if_needed () = let now = Mtime_clock.now () in - let alarm = Atomic.compare_and_set needs_gc_metrics true false in + let alarm = Atomic.exchange needs_gc_metrics false in let timeout () = let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in Mtime.Span.compare elapsed timeout_gc_metrics > 0 @@ -36,663 +37,404 @@ let sample_gc_metrics_if_needed () = AList.add gc_metrics l ) -module Config = Config - -let _init_curl = - lazy - (Curl.global_init Curl.CURLINIT_GLOBALALL; - at_exit Curl.global_cleanup) - -type error = - [ `Status of int * Opentelemetry.Proto.Status.status - | `Failure of string - | `Sysbreak - ] - let n_errors = Atomic.make 0 let n_dropped = Atomic.make 0 -let report_err_ = function - | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" - | `Failure msg -> - Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg - | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details }) - -> - let pp_details out l = - List.iter - (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) - l - in - Format.eprintf - "@[<2>opentelemetry: export failed with@ http code=%d@ status \ - {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." - code scode - (Bytes.unsafe_to_string message) - pp_details details - -module Httpc : sig - type t - - val create : unit -> t - - val send : - t -> - path:string -> - decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> - string -> - ('a, error) result - - val cleanup : t -> unit -end = struct +(** Something sent to the collector *) +module Event = struct open Opentelemetry.Proto - let () = Lazy.force _init_curl - - (* TODO: use Curl.Multi, etc. instead? *) - type t = { - buf_res: Buffer.t; - curl: Curl.t; - } - - let create () : t = { buf_res = Buffer.create 256; curl = Curl.init () } - - let cleanup self = Curl.cleanup self.curl - - (* send the content to the remote endpoint/path *) - let send (self : t) ~path ~decode (bod : string) : ('a, error) result = - let { curl; buf_res } = self in - Curl.reset curl; - if !debug_ then Curl.set_verbose curl true; - let full_url = !url ^ path in - Curl.set_url curl full_url; - Curl.set_httppost curl []; - let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in - let http_headers = List.map to_http_header !headers in - Curl.set_httpheader curl - ("Content-Type: application/x-protobuf" :: http_headers); - (* write body *) - Curl.set_post curl true; - Curl.set_postfieldsize curl (String.length bod); - Curl.set_readfunction curl - (let i = ref 0 in - fun n -> - if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; - let len = min n (String.length bod - !i) in - let s = String.sub bod !i len in - if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; - i := !i + len; - s); - (* read result's body *) - Buffer.clear buf_res; - Curl.set_writefunction curl (fun s -> - Buffer.add_string buf_res s; - String.length s); - try - match Curl.perform curl with - | () -> - let code = Curl.get_responsecode curl in - if !debug_ then - Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); - if code >= 200 && code < 300 then ( - match decode with - | `Ret x -> Ok x - | `Dec f -> - let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in - (try Ok (f dec) - with e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) - bt))) - ) else ( - let str = Buffer.contents buf_res in - let dec = Pbrt.Decoder.of_string str in - - try - let status = Status.decode_status dec in - Error (`Status (code, status)) - with e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf - "httpc: decoding of status (url=%S, code=%d) failed with:\n\ - %s\n\ - status: %S\n\ - %s" - full_url code (Printexc.to_string e) str bt)) - ) - | exception Sys.Break -> Error `Sysbreak - | exception Curl.CurlException (_, code, msg) -> - let status = - Status.default_status ~code:(Int32.of_int code) - ~message:(Bytes.unsafe_of_string msg) - () - in - Error (`Status (code, status)) - with - | Sys.Break -> Error `Sysbreak - | e -> - let bt = Printexc.get_backtrace () in - Error - (`Failure - (spf "httpc: post on url=%S failed with:\n%s\n%s" full_url - (Printexc.to_string e) bt)) + type t = + | E_metric of Metrics.resource_metrics list + | E_trace of Trace.resource_spans list + | E_logs of Logs.resource_logs list + | E_tick + | E_flush_all (** Flush all batches *) end -(** Batch of resources to be pushed later. - - This type is thread-safe. *) -module Batch : sig - type 'a t - - val push : 'a t -> 'a -> bool - (** [push batch x] pushes [x] into the batch, and heuristically - returns [true] if the batch is ready to be emitted (to know if we should - wake up the sending thread, if any) *) - - val push' : 'a t -> 'a -> unit - - val is_ready : now:Mtime.t -> _ t -> bool - (** is the batch ready to be sent? This is heuristic. *) - - val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option - (** Is the batch ready to be emitted? If batching is disabled, - this is true as soon as {!is_empty} is false. If a timeout is provided - for this batch, then it will be ready if an element has been in it - for at least the timeout. - @param now passed to implement timeout *) - - val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t - (** Create a new batch *) -end = struct - type 'a t = { - lock: Mutex.t; - mutable size: int; - mutable q: 'a list; - batch: int option; - high_watermark: int; - timeout: Mtime.span option; - mutable start: Mtime.t; - } - - let make ?batch ?timeout () : _ t = - Option.iter (fun b -> assert (b > 0)) batch; - let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in - { - lock = Mutex.create (); - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; - } - - let timeout_expired_ ~now self : bool = - match self.timeout with - | Some t -> - let elapsed = Mtime.span now self.start in - Mtime.Span.compare elapsed t >= 0 - | None -> false - - let is_full_ self : bool = - match self.batch with - | None -> self.size > 0 - | Some b -> self.size >= b - - let is_ready ~now self : bool = - let@ () = with_mutex_ self.lock in - is_full_ self || timeout_expired_ ~now self - - let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - let@ () = with_mutex_ self.lock in - if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) - then ( - let l = self.q in - self.q <- []; - self.size <- 0; - assert (l <> []); - Some l - ) else - None - - let push (self : _ t) x : bool = - let@ () = with_mutex_ self.lock in - if self.size >= self.high_watermark then ( - (* drop this to prevent queue from growing too fast *) - Atomic.incr n_dropped; - true - ) else ( - if self.size = 0 && Option.is_some self.timeout then - (* current batch starts now *) - self.start <- Mtime_clock.now (); - - (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; - let ready = is_full_ self in - ready - ) - - let push' self x = ignore (push self x : bool) -end - -(** An emitter. This is used by {!Backend} below to forward traces/metrics/… - from the program to whatever collector client we have. *) -module type EMITTER = sig +(** Something to be sent via HTTP *) +module To_send = struct open Opentelemetry.Proto - val push_trace : Trace.resource_spans list -> unit - - val push_metrics : Metrics.resource_metrics list -> unit - - val push_logs : Logs.resource_logs list -> unit - - val set_on_tick_callbacks : (unit -> unit) list ref -> unit - - val tick : unit -> unit - - val cleanup : unit -> unit + type t = + | Send_metric of Metrics.resource_metrics list list + | Send_trace of Trace.resource_spans list list + | Send_logs of Logs.resource_logs list list end -(* start a thread in the background, running [f()] *) -let start_bg_thread (f : unit -> unit) : unit = +(** start a thread in the background, running [f()] *) +let start_bg_thread (f : unit -> unit) : Thread.t = let run () = (* block some signals: USR1 USR2 TERM PIPE ALARM STOP, see [$ kill -L] *) ignore (Thread.sigmask Unix.SIG_BLOCK [ 10; 12; 13; 14; 15; 19 ] : _ list); f () in - ignore (Thread.create run () : Thread.t) + Thread.create run () -(* make an emitter. +let str_to_hex (s : string) : string = + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + in - exceptions inside should be caught, see - https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = - let open Proto in - (* local helpers *) - let open struct - let timeout = - if config.batch_timeout_ms > 0 then - Some Mtime.Span.(config.batch_timeout_ms * ms) - else - None + let res = Bytes.create (2 * String.length s) in + for i = 0 to String.length s - 1 do + let n = Char.code (String.get s i) in + Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) + done; + Bytes.unsafe_to_string res - let batch_traces : Trace.resource_spans list Batch.t = - Batch.make ?batch:config.batch_traces ?timeout () +module Backend_impl : sig + type t - let batch_metrics : Metrics.resource_metrics list Batch.t = - Batch.make ?batch:config.batch_metrics ?timeout () + val create : stop:bool Atomic.t -> config:Config.t -> unit -> t - let batch_logs : Logs.resource_logs list Batch.t = - Batch.make ?batch:config.batch_logs ?timeout () + val send_event : t -> Event.t -> unit + + val shutdown : t -> unit +end = struct + open Opentelemetry.Proto + + type t = { + stop: bool Atomic.t; + cleaned: bool Atomic.t; (** True when we cleaned up after closing *) + config: Config.t; + q: Event.t B_queue.t; (** Queue to receive data from the user's code *) + mutable main_th: Thread.t option; (** Thread that listens on [q] *) + send_q: To_send.t B_queue.t; (** Queue for the send worker threads *) + mutable send_threads: Thread.t array; (** Threads that send data via http *) + } + + let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit + = + Pbrt.Encoder.reset encoder; + encode x encoder; + let data = Pbrt.Encoder.to_string encoder in + let url = config.Config.url ^ path in + if !debug_ || config.debug then + Printf.eprintf "opentelemetry: send http POST to %s (%dB)\n%!" url + (String.length data); + let headers = + ("Content-Type", "application/x-protobuf") :: config.headers + in + match + Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () + with + | Ok { code; _ } when code >= 200 && code < 300 -> () + | Ok { code; body; headers = _; info = _ } -> + Atomic.incr n_errors; + if !debug_ || config.debug then ( + let dec = Pbrt.Decoder.of_string body in + let body = + try + let status = Status.decode_status dec in + Format.asprintf "%a" Status.pp_status status + with _ -> + spf "(could not decode status)\nraw bytes: %s" (str_to_hex body) + in + Printf.eprintf "error while sending:\n code=%d\n %s\n%!" code body + ); + () + | exception Sys.Break -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + Atomic.set stop true + | Error (code, msg) -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + + Printf.eprintf "opentelemetry: export failed:\n %s\n curl code: %s\n%!" + msg (Curl.strerror code); + + (* avoid crazy error loop *) + Thread.delay 3. + + let send_logs_http ~stop ~config (client : Curl.t) encoder + (l : Logs.resource_logs list list) : unit = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Logs_service.default_export_logs_service_request ~resource_logs:l () + in + send_http_ ~stop ~config client encoder ~path:"/v1/logs" + ~encode:Logs_service.encode_export_logs_service_request x + + let send_metrics_http ~stop ~config curl encoder + (l : Metrics.resource_metrics list list) : unit = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Metrics_service.default_export_metrics_service_request ~resource_metrics:l + () + in + send_http_ ~stop ~config curl encoder ~path:"/v1/metrics" + ~encode:Metrics_service.encode_export_metrics_service_request x + + let send_traces_http ~stop ~config curl encoder + (l : Trace.resource_spans list list) : unit = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Trace_service.default_export_trace_service_request ~resource_spans:l () + in + send_http_ ~stop ~config curl encoder ~path:"/v1/traces" + ~encode:Trace_service.encode_export_trace_service_request x + + let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev + + (** Thread that, in a loop, reads from [q] to get the + next message to send via http *) + let bg_thread_loop (self : t) : unit = + Ezcurl.with_client ?set_opts:None @@ fun client -> + let stop = self.stop in + let config = self.config in + let encoder = Pbrt.Encoder.create () in + try + while not (Atomic.get stop) do + let msg = B_queue.pop self.send_q in + match msg with + | To_send.Send_trace tr -> + send_traces_http ~stop ~config client encoder tr + | To_send.Send_metric ms -> + send_metrics_http ~stop ~config client encoder ms + | To_send.Send_logs logs -> + send_logs_http ~stop ~config client encoder logs + done + with B_queue.Closed -> () + + type batches = { + traces: Proto.Trace.resource_spans Batch.t; + logs: Proto.Logs.resource_logs Batch.t; + metrics: Proto.Metrics.resource_metrics Batch.t; + } + + let batch_max_size_ = 200 + + let should_send_batch_ ~config ~now (b : _ Batch.t) : bool = + Batch.len b > 0 + && (Batch.len b >= batch_max_size_ + || + let timeout = Mtime.Span.(config.Config.batch_timeout_ms * ms) in + let elapsed = Mtime.span now (Batch.time_started b) in + Mtime.Span.compare elapsed timeout >= 0) + + let main_thread_loop (self : t) : unit = + let local_q = Queue.create () in + let config = self.config in + + (* keep track of batches *) + let batches = + { + traces = Batch.create (); + logs = Batch.create (); + metrics = Batch.create (); + } + in + + let send_metrics () = + B_queue.push self.send_q + (To_send.Send_metric (Batch.pop_all batches.metrics)) + in + + let send_logs () = + B_queue.push self.send_q (To_send.Send_logs (Batch.pop_all batches.logs)) + in + + let send_traces () = + B_queue.push self.send_q + (To_send.Send_trace (Batch.pop_all batches.traces)) + in + + try + while not (Atomic.get self.stop) do + (* read multiple events at once *) + B_queue.pop_all self.q local_q; + + let now = Mtime_clock.now () in + + (* how to process a single event *) + let process_ev (ev : Event.t) : unit = + match ev with + | Event.E_metric m -> + Batch.push batches.metrics m; + if should_send_batch_ ~config ~now batches.metrics then + send_metrics () + | Event.E_trace tr -> + Batch.push batches.traces tr; + if should_send_batch_ ~config ~now batches.traces then + send_traces () + | Event.E_logs logs -> + Batch.push batches.logs logs; + if should_send_batch_ ~config ~now batches.logs then send_logs () + | Event.E_tick -> + (* check for batches whose timeout expired *) + if should_send_batch_ ~config ~now batches.metrics then + send_metrics (); + if should_send_batch_ ~config ~now batches.logs then send_logs (); + if should_send_batch_ ~config ~now batches.traces then + send_traces () + | Event.E_flush_all -> + if Batch.len batches.metrics > 0 then send_metrics (); + if Batch.len batches.logs > 0 then send_logs (); + if Batch.len batches.traces > 0 then send_traces () + in + + while not (Queue.is_empty local_q) do + let ev = Queue.pop local_q in + process_ev ev + done + done + with B_queue.Closed -> () + + let create ~stop ~config () : t = + let n_send_threads = max 2 config.Config.bg_threads in + let self = + { + stop; + config; + q = B_queue.create (); + send_threads = [||]; + send_q = B_queue.create (); + cleaned = Atomic.make false; + main_th = None; + } + in + + let main_th = start_bg_thread (fun () -> main_thread_loop self) in + self.main_th <- Some main_th; + + self.send_threads <- + Array.init n_send_threads (fun _i -> + start_bg_thread (fun () -> bg_thread_loop self)); + + self + + let shutdown self : unit = + Atomic.set self.stop true; + if not (Atomic.exchange self.cleaned true) then ( + (* empty batches *) + send_event self Event.E_flush_all; + (* close the incoming queue, wait for the thread to finish + before we start cutting off the background threads, so that they + have time to receive the final batches *) + B_queue.close self.q; + Option.iter Thread.join self.main_th; + (* close send queues, then wait for all threads *) + B_queue.close self.send_q; + Array.iter Thread.join self.send_threads + ) +end + +let mk_backend ~stop ~config () : (module Collector.BACKEND) = + let module M = struct + open Opentelemetry.Proto + open Opentelemetry.Collector + + let backend = Backend_impl.create ~stop ~config () + + let send_trace : Trace.resource_spans list sender = + { + send = + (fun l ~ret -> + Backend_impl.send_event backend (Event.E_trace l); + ret ()); + } + + let last_sent_metrics = Atomic.make (Mtime_clock.now ()) + + (* send metrics from time to time *) + let timeout_sent_metrics = Mtime.Span.(5 * s) + + let signal_emit_gc_metrics () = + if !debug_ || config.debug then + Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; + Atomic.set needs_gc_metrics true + + let additional_metrics () : Metrics.resource_metrics list = + (* add exporter metrics to the lot? *) + let last_emit = Atomic.get last_sent_metrics in + let now = Mtime_clock.now () in + let add_own_metrics = + let elapsed = Mtime.span last_emit now in + Mtime.Span.compare elapsed timeout_sent_metrics > 0 + in + + (* there is a possible race condition here, as several threads might update + metrics at the same time. But that's harmless. *) + if add_own_metrics then ( + Atomic.set last_sent_metrics now; + let open OT.Metrics in + [ + make_resource_metrics + [ + sum ~name:"otel.export.dropped" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel.export.errors" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]; + ] + ) else + [] + + let send_metrics : Metrics.resource_metrics list sender = + { + send = + (fun m ~ret -> + let m = List.rev_append (additional_metrics ()) m in + Backend_impl.send_event backend (Event.E_metric m); + ret ()); + } + + let send_logs : Logs.resource_logs list sender = + { + send = + (fun m ~ret -> + Backend_impl.send_event backend (Event.E_logs m); + ret ()); + } let on_tick_cbs_ = Atomic.make (ref []) let set_on_tick_callbacks = Atomic.set on_tick_cbs_ - let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in - match Httpc.send httpc ~path ~decode:(`Ret ()) data with - | Ok () -> () - | Error `Sysbreak -> - Printf.eprintf "ctrl-c captured, stopping\n%!"; - Atomic.set stop true - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err; - (* avoid crazy error loop *) - Thread.delay 3. - - let send_metrics_http curl encoder (l : Metrics.resource_metrics list list) - = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () - in - send_http_ curl encoder ~path:"/v1/metrics" - ~encode:Metrics_service.encode_export_metrics_service_request x - - let send_traces_http curl encoder (l : Trace.resource_spans list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - send_http_ curl encoder ~path:"/v1/traces" - ~encode:Trace_service.encode_export_trace_service_request x - - let send_logs_http curl encoder (l : Logs.resource_logs list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - send_http_ curl encoder ~path:"/v1/logs" - ~encode:Logs_service.encode_export_logs_service_request x - - (* emit metrics, if the batch is full or timeout lapsed *) - let emit_metrics_maybe ~now ?force httpc encoder : bool = - match Batch.pop_if_ready ?force ~now batch_metrics with - | None -> false - | Some l -> - let batch = AList.pop_all gc_metrics :: l in - send_metrics_http httpc encoder batch; - true - - let emit_traces_maybe ~now ?force httpc encoder : bool = - match Batch.pop_if_ready ?force ~now batch_traces with - | None -> false - | Some l -> - send_traces_http httpc encoder l; - true - - let emit_logs_maybe ~now ?force httpc encoder : bool = - match Batch.pop_if_ready ?force ~now batch_logs with - | None -> false - | Some l -> - send_logs_http httpc encoder l; - true - - let[@inline] guard_exn_ where f = - try f () - with e -> - let bt = Printexc.get_backtrace () in - Printf.eprintf - "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where - (Printexc.to_string e) bt - - let emit_all_force (httpc : Httpc.t) encoder = - let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool); - ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool); - ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool) - - let tick_common_ () = - if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ()); - sample_gc_metrics_if_needed (); - List.iter - (fun f -> - try f () - with e -> - Printf.eprintf "on tick callback raised: %s\n" - (Printexc.to_string e)) - !(Atomic.get on_tick_cbs_); - () - - let setup_ticker_thread ~tick ~finally () = - (* thread that calls [tick()] regularly, to help enforce timeouts *) - let tick_thread () = - let@ () = - Fun.protect ~finally:(fun () -> - Atomic.set stop true; - finally ()) - in - while not @@ Atomic.get stop do - Thread.delay 0.5; - tick () - done - in - start_bg_thread tick_thread - end in - (* setup a global lock *) - (let global_lock_ = Mutex.create () in - Lock.set_mutex - ~lock:(fun () -> Mutex.lock global_lock_) - ~unlock:(fun () -> Mutex.unlock global_lock_)); - - if config.bg_threads > 0 then ( - (* lock+condition used for background threads to wait, and be woken up - when a batch is ready *) - let m = Mutex.create () in - let cond = Condition.create () in - - (* loop for the thread that processes events and sends them to collector *) - let bg_thread () = - let httpc = Httpc.create () in - let encoder = Pbrt.Encoder.create () in - while not @@ Atomic.get stop do - let@ () = guard_exn_ (spf "bg thread[%d] (main loop)" @@ tid ()) in - - let now = Mtime_clock.now () in - let do_metrics = emit_metrics_maybe ~now httpc encoder in - let do_traces = emit_traces_maybe ~now httpc encoder in - let do_logs = emit_logs_maybe ~now httpc encoder in - if (not do_metrics) && (not do_traces) && not do_logs then ( - let@ () = guard_exn_ (spf "bg thread[%d] (waiting)" @@ tid ()) in - (* wait for something to happen *) - Mutex.lock m; - Condition.wait cond m; - Mutex.unlock m - ) - done; - (* flush remaining events once we exit *) - let@ () = guard_exn_ "bg thread (cleanup)" in - emit_all_force httpc encoder; - Httpc.cleanup httpc - in - - for _i = 1 to config.bg_threads do - start_bg_thread bg_thread - done; - - (* if the bg thread waits, this will wake it up so it can send batches *) - let wakeup ~all () = - with_mutex_ m (fun () -> - if all then - Condition.broadcast cond - else - Condition.signal cond); - Thread.yield () - in - let tick () = - tick_common_ (); + sample_gc_metrics_if_needed (); + Backend_impl.send_event backend Event.E_tick; + let l = Atomic.get on_tick_cbs_ in + List.iter (fun f -> f ()) !l - let now = Mtime_clock.now () in - if Atomic.get stop then - wakeup ~all:true () - else if - Batch.is_ready ~now batch_metrics - || Batch.is_ready ~now batch_traces - || Batch.is_ready ~now batch_logs - then - wakeup ~all:false () - in + let cleanup () = Backend_impl.shutdown backend + end in + (module M) - if config.ticker_thread then - setup_ticker_thread ~tick ~finally:(fun () -> wakeup ~all:true ()) (); - - let module M = struct - let push_trace e = if Batch.push batch_traces e then wakeup ~all:false () - - let push_metrics e = - if Batch.push batch_metrics e then wakeup ~all:false () - - let push_logs e = if Batch.push batch_logs e then wakeup ~all:false () - - let set_on_tick_callbacks = set_on_tick_callbacks - - let tick = tick - - let cleanup () = - Atomic.set stop true; - if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - wakeup ~all:true () - end in - (module M) - ) else ( - let httpc = Httpc.create () in - let encoder = Pbrt.Encoder.create () in - - let module M = struct - (* we make sure that this is thread-safe, even though we don't have a - background thread. There can still be a ticker thread, and there - can also be several user threads that produce spans and call - the emit functions. *) - - let push_trace e = - let@ () = guard_exn_ "push trace" in - Batch.push' batch_traces e; - let now = Mtime_clock.now () in - let@ () = Lock.with_lock in - ignore (emit_traces_maybe ~now httpc encoder : bool) - - let push_metrics e = - let@ () = guard_exn_ "push metrics" in - sample_gc_metrics_if_needed (); - Batch.push' batch_metrics e; - let now = Mtime_clock.now () in - let@ () = Lock.with_lock in - ignore (emit_metrics_maybe ~now httpc encoder : bool) - - let push_logs e = - let@ () = guard_exn_ "push logs" in - Batch.push' batch_logs e; - let now = Mtime_clock.now () in - let@ () = Lock.with_lock in - ignore (emit_logs_maybe ~now httpc encoder : bool) - - let set_on_tick_callbacks = set_on_tick_callbacks - - let tick () = - tick_common_ (); - sample_gc_metrics_if_needed (); - let@ () = Lock.with_lock in - let now = Mtime_clock.now () in - ignore (emit_traces_maybe ~now httpc encoder : bool); - ignore (emit_metrics_maybe ~now httpc encoder : bool); - ignore (emit_logs_maybe ~now httpc encoder : bool); - () - - (* make sure we have a ticker thread, if required *) - let () = - if config.ticker_thread then - setup_ticker_thread ~tick ~finally:ignore () - - let cleanup () = - if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; - emit_all_force httpc encoder; - Httpc.cleanup httpc - end in - (module M) - ) - -module Backend (Arg : sig - val stop : bool Atomic.t - - val config : Config.t -end) -() : Opentelemetry.Collector.BACKEND = struct - include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) - - open Opentelemetry.Proto - open Opentelemetry.Collector - - let send_trace : Trace.resource_spans list sender = - { - send = - (fun l ~ret -> - (if !debug_ then - let@ () = Lock.with_lock in - Format.eprintf "send spans %a@." - (Format.pp_print_list Trace.pp_resource_spans) - l); - push_trace l; - ret ()); - } - - let last_sent_metrics = Atomic.make (Mtime_clock.now ()) - - let timeout_sent_metrics = Mtime.Span.(5 * s) - (* send metrics from time to time *) - - let signal_emit_gc_metrics () = - if !debug_ then - Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; - Atomic.set needs_gc_metrics true - - let additional_metrics () : Metrics.resource_metrics list = - (* add exporter metrics to the lot? *) - let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now () in - let add_own_metrics = - let elapsed = Mtime.span last_emit now in - Mtime.Span.compare elapsed timeout_sent_metrics > 0 - in - - (* there is a possible race condition here, as several threads might update - metrics at the same time. But that's harmless. *) - if add_own_metrics then ( - Atomic.set last_sent_metrics now; - let open OT.Metrics in - [ - make_resource_metrics - [ - sum ~name:"otel.export.dropped" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel.export.errors" ~is_monotonic:true - [ - int - ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]; - ] - ) else - [] - - let send_metrics : Metrics.resource_metrics list sender = - { - send = - (fun m ~ret -> - (if !debug_ then - let@ () = Lock.with_lock in - Format.eprintf "send metrics %a@." - (Format.pp_print_list Metrics.pp_resource_metrics) - m); - - let m = List.rev_append (additional_metrics ()) m in - push_metrics m; - ret ()); - } - - let send_logs : Logs.resource_logs list sender = - { - send = - (fun m ~ret -> - (if !debug_ then - let@ () = Lock.with_lock in - Format.eprintf "send logs %a@." - (Format.pp_print_list Logs.pp_resource_logs) - m); - - push_logs m; - ret ()); - } -end +(** thread that calls [tick()] regularly, to help enforce timeouts *) +let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () = + let sleep_s = float sleep_ms /. 1000. in + let tick_loop () = + while not @@ Atomic.get stop do + Thread.delay sleep_s; + B.tick () + done + in + start_bg_thread tick_loop let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () = - debug_ := config.debug; - let module B = - Backend - (struct - let stop = stop + let ((module B) as backend) = mk_backend ~stop ~config () in + Opentelemetry.Collector.set_backend backend; + + if config.ticker_thread then ( + let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in + ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) + ); - let config = config - end) - () - in - Opentelemetry.Collector.set_backend (module B); B.cleanup let setup ?stop ?(config = Config.make ()) ?(enable = true) () = diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index 4292586f..7d3ccc0a 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -14,10 +14,6 @@ val get_headers : unit -> (string * string) list val set_headers : (string * string) list -> unit (** Set http headers that are sent on every http query to the collector. *) -val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit -(** Set a lock/unlock pair to protect the critical sections - of {!Opentelemetry.Collector.BACKEND} *) - module Atomic = Opentelemetry_atomic.Atomic module Config = Config diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index faaeca12..35f16423 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -3,7 +3,7 @@ module Atomic = Opentelemetry_atomic.Atomic let spf = Printf.sprintf -let ( let@ ) f x = f x +let ( let@ ) = ( @@ ) let sleep_inner = ref 0.1 @@ -98,21 +98,13 @@ let () = let ts_start = Unix.gettimeofday () in let debug = ref false in - let thread = ref true in let n_bg_threads = ref 0 in - let batch_traces = ref 400 in - let batch_metrics = ref 3 in let opts = [ "--debug", Arg.Bool (( := ) debug), " enable debug output"; - "--thread", Arg.Bool (( := ) thread), " use a background thread"; ( "--stress-alloc", Arg.Bool (( := ) stress_alloc_), " perform heavy allocs in inner loop" ); - "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; - ( "--batch-metrics", - Arg.Int (( := ) batch_metrics), - " size of metrics batch" ); "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "-j", Arg.Set_int n_jobs, " number of parallel jobs"; @@ -123,17 +115,8 @@ let () = Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; - let some_if_nzero r = - if !r > 0 then - Some !r - else - None - in let config = Opentelemetry_client_ocurl.Config.make ~debug:!debug - ~batch_traces:(some_if_nzero batch_traces) - ~batch_metrics:(some_if_nzero batch_metrics) - ~thread:!thread ?bg_threads: (let n = !n_bg_threads in if n = 0 then