diff --git a/dune b/dune index f2eef302..815c0f49 100644 --- a/dune +++ b/dune @@ -1,3 +1,3 @@ (env (_ - (flags :standard -warn-error -a+8))) + (flags :standard -warn-error -a+8 -strict-sequence))) diff --git a/src/client/FQueue.ml b/src/client/FQueue.ml deleted file mode 100644 index ce04dad3..00000000 --- a/src/client/FQueue.ml +++ /dev/null @@ -1,27 +0,0 @@ -type 'a t = { - arr: 'a array; - mutable i: int; -} - -let create ~dummy n : _ t = - assert (n >= 1); - { arr = Array.make n dummy; i = 0 } - -let[@inline] size self = self.i - -let[@inline] is_full self = self.i = Array.length self.arr - -let push (self : _ t) x : bool = - if is_full self then - false - else ( - self.arr.(self.i) <- x; - self.i <- 1 + self.i; - true - ) - -let pop_iter_all (self : _ t) f = - for j = 0 to self.i - 1 do - f self.arr.(j) - done; - self.i <- 0 diff --git a/src/client/FQueue.mli b/src/client/FQueue.mli deleted file mode 100644 index 0a03b00d..00000000 --- a/src/client/FQueue.mli +++ /dev/null @@ -1,11 +0,0 @@ -(** queue of fixed size *) - -type 'a t - -val create : dummy:'a -> int -> 'a t - -val size : _ t -> int - -val push : 'a t -> 'a -> bool (* true iff it could write element *) - -val pop_iter_all : 'a t -> ('a -> unit) -> unit diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index c69c4245..9ea214bf 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -33,6 +33,7 @@ let _init_curl = type error = [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string + | `Sysbreak ] let n_errors = Atomic.make 0 @@ -40,6 +41,7 @@ let n_errors = Atomic.make 0 let n_dropped = Atomic.make 0 let report_err_ = function + | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg | `Status (code, status) -> @@ -112,6 +114,7 @@ module Curl () : CURL = struct let status = Status.decode_status dec in Error (`Status (code, status)) ) + | exception Sys.Break -> Error `Sysbreak | exception Curl.CurlException (_, code, msg) -> let status = Status.default_status ~code:(Int32.of_int code) @@ -119,19 +122,95 @@ module Curl () : CURL = struct () in Error (`Status (code, status)) - with e -> Error (`Failure (Printexc.to_string e)) + with + | Sys.Break -> Error `Sysbreak + | e -> Error (`Failure (Printexc.to_string e)) end -module type PUSH = sig - type elt +module type BATCH = sig end - val push : elt -> unit +module Batch : sig + type 'a t - val is_empty : unit -> bool + val push : 'a t -> 'a -> bool + (** [push batch x] pushes [x] into the batch, and heuristically + returns [true] if the batch is ready to be emitted (to know if we should + wake up the sending thread, if any) *) - val is_big_enough : unit -> bool + val push' : 'a t -> 'a -> unit - val pop_iter_all : (elt -> unit) -> unit + val is_ready : now:Mtime.t -> _ t -> bool + (** is the batch ready to be sent? This is heuristic. *) + + val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option + (** Is the batch ready to be emitted? If batching is disabled, + this is true as soon as {!is_empty} is false. If a timeout is provided + for this batch, then it will be ready if an element has been in it + for at least the timeout. + @param now passed to implement timeout *) + + val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t +end = struct + type 'a t = { + lock: Mutex.t; + mutable size: int; + mutable q: 'a list; + batch: int option; + timeout: Mtime.span option; + mutable start: Mtime.t; + } + + let make ?batch ?timeout () : _ t = + { + lock = Mutex.create (); + size = 0; + start = Mtime_clock.now (); + q = []; + batch; + timeout; + } + + let is_empty_ self = self.size = 0 + + let timeout_expired_ ~now self : bool = + match self.timeout with + | Some t -> + let elapsed = Mtime.span now self.start in + Mtime.Span.compare elapsed t >= 0 + | None -> false + + let is_full_ self : bool = + match self.batch with + | None -> self.size > 0 + | Some b -> self.size >= b + + let is_ready ~now self : bool = + let@ () = with_mutex_ self.lock in + is_full_ self || timeout_expired_ ~now self + + let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + let@ () = with_mutex_ self.lock in + if + (force && not (is_empty_ self)) + || is_full_ self || timeout_expired_ ~now self + then ( + let l = self.q in + self.q <- []; + self.size <- 0; + Some l + ) else + None + + let push (self : _ t) x : bool = + let@ () = with_mutex_ self.lock in + if self.size = 0 && Option.is_some self.timeout then + self.start <- Mtime_clock.now (); + self.size <- 1 + self.size; + self.q <- x :: self.q; + let ready = is_full_ self in + ready + + let push' self x = ignore (push self x : bool) end (** An emitter. This is used by {!Backend} below to forward traces/metrics/… @@ -152,57 +231,6 @@ module type EMITTER = sig val cleanup : unit -> unit end -type 'a push = (module PUSH with type elt = 'a) - -type on_full_cb = unit -> unit - -(* make a "push" object, along with a setter for a callback to call when - it's ready to emit a batch *) -let mk_push (type a) ?batch () : - (module PUSH with type elt = a) * (on_full_cb -> unit) = - let on_full : on_full_cb ref = ref ignore in - let push = - match batch with - | None -> - let r = ref None in - let module M = struct - type elt = a - - let is_empty () = !r == None - - let is_big_enough () = !r != None - - let push x = - r := Some x; - !on_full () - - let pop_iter_all f = - Option.iter f !r; - r := None - end in - (module M : PUSH with type elt = a) - | Some n -> - let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in - let module M = struct - type elt = a - - let is_empty () = FQueue.size q = 0 - - let is_big_enough () = FQueue.size q >= n - - let push x = - if (not (FQueue.push q x)) || FQueue.size q > n then ( - !on_full (); - if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *) - ) - - let pop_iter_all f = FQueue.pop_iter_all q f - end in - (module M : PUSH with type elt = a) - in - - push, ( := ) on_full - (* start a thread in the background, running [f()] *) let start_bg_thread (f : unit -> unit) : unit = let run () = @@ -224,169 +252,145 @@ let batch_is_empty = List.for_all l_is_empty https://opentelemetry.io/docs/reference/specification/error-handling/ *) let mk_emitter ~(config : Config.t) () : (module EMITTER) = let open Proto in - let continue = ref true in + (* local helpers *) + let open struct + let continue = Atomic.make true - let ((module E_trace) : Trace.resource_spans list push), on_trace_full = - mk_push ?batch:config.batch_traces () - in - let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full - = - mk_push ?batch:config.batch_metrics () - in - let ((module E_logs) : Logs.resource_logs list push), on_logs_full = - mk_push ?batch:config.batch_logs () - in + let timeout = + if config.batch_timeout_ms > 0 then + Some Mtime.Span.(config.batch_timeout_ms * ms) + else + None - let encoder = Pbrt.Encoder.create () in + let batch_traces : Trace.resource_spans list Batch.t = + Batch.make ?batch:config.batch_traces ?timeout () - let ((module C) as curl) = (module Curl () : CURL) in + let batch_metrics : Metrics.resource_metrics list Batch.t = + Batch.make ?batch:config.batch_metrics ?timeout () - let on_tick_cbs_ = ref (ref []) in - let set_on_tick_callbacks = ( := ) on_tick_cbs_ in + let batch_logs : Logs.resource_logs list Batch.t = + Batch.make ?batch:config.batch_logs ?timeout () - let send_http_ ~path ~encode x : unit = - Pbrt.Encoder.reset encoder; - encode x encoder; - let data = Pbrt.Encoder.to_string encoder in - match C.send ~path ~decode:(fun _ -> ()) data with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - in + let encoder = Pbrt.Encoder.create () - let send_metrics_http (l : Metrics.resource_metrics list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Metrics_service.default_export_metrics_service_request ~resource_metrics:l - () - in - send_http_ ~path:"/v1/metrics" - ~encode:Metrics_service.encode_export_metrics_service_request x - in + let curl = (module Curl () : CURL) - let send_traces_http (l : Trace.resource_spans list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Trace_service.default_export_trace_service_request ~resource_spans:l () - in - send_http_ ~path:"/v1/traces" - ~encode:Trace_service.encode_export_trace_service_request x - in + module C = (val curl) - let send_logs_http (l : Logs.resource_logs list list) = - let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - let x = - Logs_service.default_export_logs_service_request ~resource_logs:l () - in - send_http_ ~path:"/v1/logs" - ~encode:Logs_service.encode_export_logs_service_request x - in + let on_tick_cbs_ = ref (ref []) - let last_wakeup = Atomic.make (Mtime_clock.now ()) in - let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in - let batch_timeout () : bool = - let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in - Mtime.Span.compare elapsed timeout >= 0 - in + let set_on_tick_callbacks = ( := ) on_tick_cbs_ - let emit_metrics ?(force = false) () : bool = - if force || ((not force) && E_metrics.is_big_enough ()) then ( - let batch = ref [ AList.pop_all gc_metrics ] in - E_metrics.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_metrics_http !batch; - Atomic.set last_wakeup (Mtime_clock.now ()) - ); - do_something - ) else - false - in - let emit_traces ?(force = false) () : bool = - if force || ((not force) && E_trace.is_big_enough ()) then ( - let batch = ref [] in - E_trace.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_traces_http !batch; - Atomic.set last_wakeup (Mtime_clock.now ()) - ); - do_something - ) else - false - in - let emit_logs ?(force = false) () : bool = - if force || ((not force) && E_logs.is_big_enough ()) then ( - let batch = ref [] in - E_logs.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_logs_http !batch; - Atomic.set last_wakeup (Mtime_clock.now ()) - ); - do_something - ) else - false - in + let send_http_ ~path ~encode x : unit = + Pbrt.Encoder.reset encoder; + encode x encoder; + let data = Pbrt.Encoder.to_string encoder in + match C.send ~path ~decode:(fun _ -> ()) data with + | Ok () -> () + | Error `Sysbreak -> + Printf.eprintf "ctrl-c captured, stopping\n%!"; + Atomic.set continue false + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err - let[@inline] guard f = - try f () - with e -> - Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" - (Printexc.to_string e) - in + let send_metrics_http (l : Metrics.resource_metrics list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Metrics_service.default_export_metrics_service_request + ~resource_metrics:l () + in + send_http_ ~path:"/v1/metrics" + ~encode:Metrics_service.encode_export_metrics_service_request x - let emit_all_force () = - ignore (emit_traces ~force:true () : bool); - ignore (emit_logs ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool) - in + let send_traces_http (l : Trace.resource_spans list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Trace_service.default_export_trace_service_request ~resource_spans:l () + in + send_http_ ~path:"/v1/traces" + ~encode:Trace_service.encode_export_trace_service_request x + let send_logs_http (l : Logs.resource_logs list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Logs_service.default_export_logs_service_request ~resource_logs:l () + in + send_http_ ~path:"/v1/logs" + ~encode:Logs_service.encode_export_logs_service_request x + + (* emit metrics, if the batch is full or timeout lapsed *) + let emit_metrics_maybe ~now ?force () : bool = + match Batch.pop_if_ready ?force ~now batch_metrics with + | None -> false + | Some l -> + let batch = AList.pop_all gc_metrics :: l in + send_metrics_http batch; + true + + let emit_traces_maybe ~now ?force () : bool = + match Batch.pop_if_ready ?force ~now batch_traces with + | None -> false + | Some l -> + send_traces_http l; + true + + let emit_logs_maybe ~now ?force () : bool = + match Batch.pop_if_ready ?force ~now batch_logs with + | None -> false + | Some l -> + send_logs_http l; + true + + let[@inline] guard_exn_ f = + try f () + with e -> + Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" + (Printexc.to_string e) + + let emit_all_force () = + let now = Mtime_clock.now () in + ignore (emit_traces_maybe ~now ~force:true () : bool); + ignore (emit_logs_maybe ~now ~force:true () : bool); + ignore (emit_metrics_maybe ~now ~force:true () : bool) + end in if config.thread then ( (let m = Mutex.create () in Lock.set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m)); - let ((module C) as curl) = (module Curl () : CURL) in - let m = Mutex.create () in let cond = Condition.create () in (* loop for the thread that processes events and sends them to collector *) let bg_thread () = - while !continue do - let@ () = guard in - let timeout = batch_timeout () in + while Atomic.get continue do + let@ () = guard_exn_ in - let do_metrics = emit_metrics ~force:timeout () in - let do_traces = emit_traces ~force:timeout () in - let do_logs = emit_logs ~force:timeout () in + let now = Mtime_clock.now () in + let do_metrics = emit_metrics_maybe ~now () in + let do_traces = emit_traces_maybe ~now () in + let do_logs = emit_logs_maybe ~now () in if (not do_metrics) && (not do_traces) && not do_logs then - (* wait *) + (* wait for something to happen *) let@ () = with_mutex_ m in Condition.wait cond m done; - (* flush remaining events *) - let@ () = guard in - ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); - ignore (emit_logs ~force:true () : bool); + (* flush remaining events once we exit *) + let@ () = guard_exn_ in + emit_all_force (); C.cleanup () in start_bg_thread bg_thread; + (* if the bg thread waits, this will wake it up so it can send batches *) let wakeup () = - with_mutex_ m (fun () -> Condition.signal cond); + with_mutex_ m (fun () -> Condition.broadcast cond); Thread.yield () in - (* wake up if a batch is full *) - on_metrics_full wakeup; - on_trace_full wakeup; - let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); List.iter @@ -396,50 +400,47 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) !(!on_tick_cbs_); - if batch_timeout () then wakeup () + + let now = Mtime_clock.now () in + if + (not (Atomic.get continue)) + || Batch.is_ready ~now batch_metrics + || Batch.is_ready ~now batch_traces + || Batch.is_ready ~now batch_logs + then + wakeup () in if config.ticker_thread then ( (* thread that calls [tick()] regularly, to help enforce timeouts *) let tick_thread () = - while true do + while Atomic.get continue do Thread.delay 0.5; tick () - done + done; + wakeup () in start_bg_thread tick_thread ); let module M = struct - let push_trace e = - E_trace.push e; - if batch_timeout () then wakeup () + let push_trace e = if Batch.push batch_traces e then wakeup () - let push_metrics e = - E_metrics.push e; - if batch_timeout () then wakeup () + let push_metrics e = if Batch.push batch_metrics e then wakeup () - let push_logs e = - E_logs.push e; - if batch_timeout () then wakeup () + let push_logs e = if Batch.push batch_logs e then wakeup () let set_on_tick_callbacks = set_on_tick_callbacks let tick = tick let cleanup () = - continue := false; + Atomic.set continue false; with_mutex_ m (fun () -> Condition.broadcast cond) end in (module M) ) else ( - on_metrics_full (fun () -> - if Atomic.get needs_gc_metrics then sample_gc_metrics (); - ignore (emit_metrics () : bool)); - on_trace_full (fun () -> ignore (emit_traces () : bool)); - on_logs_full (fun () -> ignore (emit_logs () : bool)); - let cleanup () = emit_all_force (); C.cleanup () @@ -447,25 +448,33 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) = let module M = struct let push_trace e = - let@ () = guard in - E_trace.push e; - if batch_timeout () then emit_all_force () + let@ () = guard_exn_ in + Batch.push' batch_traces e; + let now = Mtime_clock.now () in + ignore (emit_traces_maybe ~now () : bool) let push_metrics e = - let@ () = guard in - E_metrics.push e; - if batch_timeout () then emit_all_force () + let@ () = guard_exn_ in + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + Batch.push' batch_metrics e; + let now = Mtime_clock.now () in + ignore (emit_metrics_maybe ~now () : bool) let push_logs e = - let@ () = guard in - E_logs.push e; - if batch_timeout () then emit_all_force () + let@ () = guard_exn_ in + Batch.push' batch_logs e; + let now = Mtime_clock.now () in + ignore (emit_logs_maybe ~now () : bool) let set_on_tick_callbacks = set_on_tick_callbacks let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics (); - if batch_timeout () then emit_all_force () + let now = Mtime_clock.now () in + ignore (emit_traces_maybe ~now () : bool); + ignore (emit_metrics_maybe ~now () : bool); + ignore (emit_logs_maybe ~now () : bool); + () let cleanup = cleanup end in @@ -516,13 +525,13 @@ end) [ make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true + sum ~name:"otel.export.dropped" ~is_monotonic:true [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); ]; - sum ~name:"otel-export.errors" ~is_monotonic:true + sum ~name:"otel.export.errors" ~is_monotonic:true [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) @@ -537,12 +546,13 @@ end) { send = (fun m ~ret -> - let@ () = Lock.with_lock in if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + let@ () = Lock.with_lock in + let m = List.rev_append (additional_metrics ()) m in push_metrics m; ret ()); @@ -552,11 +562,12 @@ end) { send = (fun m ~ret -> - let@ () = Lock.with_lock in if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m; + + let@ () = Lock.with_lock in push_logs m; ret ()); }