Merge pull request #24 from imandra-ai/wip-better-client

improve client code
This commit is contained in:
Simon Cruanes 2022-07-07 10:42:39 -04:00 committed by GitHub
commit 586e16e1c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 501 additions and 365 deletions

2
dune
View file

@ -1,3 +1,3 @@
(env (env
(_ (_
(flags :standard -warn-error -a+8))) (flags :standard -warn-error -a+8 -strict-sequence)))

View file

@ -1,27 +0,0 @@
type 'a t = {
arr: 'a array;
mutable i: int;
}
let create ~dummy n : _ t =
assert (n >= 1);
{ arr = Array.make n dummy; i = 0 }
let[@inline] size self = self.i
let[@inline] is_full self = self.i = Array.length self.arr
let push (self : _ t) x : bool =
if is_full self then
false
else (
self.arr.(self.i) <- x;
self.i <- 1 + self.i;
true
)
let pop_iter_all (self : _ t) f =
for j = 0 to self.i - 1 do
f self.arr.(j)
done;
self.i <- 0

View file

@ -1,11 +0,0 @@
(** queue of fixed size *)
type 'a t
val create : dummy:'a -> int -> 'a t
val size : _ t -> int
val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit

View file

@ -3,6 +3,10 @@ include Opentelemetry.Lock
let[@inline] ( let@ ) f x = f x let[@inline] ( let@ ) f x = f x
let spf = Printf.sprintf
let tid () = Thread.id @@ Thread.self ()
let debug_ = let debug_ =
ref ref
(match Sys.getenv_opt "OTEL_OCAML_DEBUG" with (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with

View file

@ -8,7 +8,7 @@ type t = {
batch_metrics: int option; batch_metrics: int option;
batch_logs: int option; batch_logs: int option;
batch_timeout_ms: int; batch_timeout_ms: int;
thread: bool; bg_threads: int;
ticker_thread: bool; ticker_thread: bool;
} }
@ -24,20 +24,30 @@ let pp out self =
batch_metrics; batch_metrics;
batch_logs; batch_logs;
batch_timeout_ms; batch_timeout_ms;
thread; bg_threads;
ticker_thread; ticker_thread;
} = } =
self self
in in
Format.fprintf out Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \ "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_traces=%a;@ batch_metrics=%a;@ \
batch_logs=%a;@ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" batch_logs=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ ticker_thread=%B @]}"
debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt debug url ppheaders headers ppiopt batch_traces ppiopt batch_metrics ppiopt
batch_logs batch_timeout_ms thread ticker_thread batch_logs batch_timeout_ms bg_threads ticker_thread
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400) ?(batch_traces = Some 400) ?(batch_metrics = None) ?(batch_logs = Some 400)
?(batch_timeout_ms = 500) ?(thread = true) ?(ticker_thread = true) () : t = ?(batch_timeout_ms = 500) ?(thread = true) ?bg_threads
?(ticker_thread = true) () : t =
let bg_threads =
match bg_threads with
| Some n -> max n 0
| None ->
if thread then
4
else
0
in
{ {
debug; debug;
url; url;
@ -46,6 +56,6 @@ let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
batch_metrics; batch_metrics;
batch_timeout_ms; batch_timeout_ms;
batch_logs; batch_logs;
thread; bg_threads;
ticker_thread; ticker_thread;
} }

View file

@ -28,12 +28,13 @@ type t = private {
incomplete. incomplete.
Note that the batch might take longer than that, because this is Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *) only checked when a new event occurs. Default 500. *)
thread: bool; (** Is there a background thread? Default [true] *) bg_threads: int;
(** Are there background threads, and how many? Default [4] *)
ticker_thread: bool; ticker_thread: bool;
(** Is there a ticker thread? Default [true]. (** Is there a ticker thread? Default [true].
This thread will regularly call [tick()] on the backend, to make This thread will regularly call [tick()] on the backend, to make
sure it makes progress, and regularly send events to the collector. sure it makes progress, and regularly send events to the collector.
This option is ignored if [thread=false]. *) This option is ignored if [bg_threads=0]. *)
} }
(** Configuration. (** Configuration.
@ -49,9 +50,15 @@ val make :
?batch_logs:int option -> ?batch_logs:int option ->
?batch_timeout_ms:int -> ?batch_timeout_ms:int ->
?thread:bool -> ?thread:bool ->
?bg_threads:int ->
?ticker_thread:bool -> ?ticker_thread:bool ->
unit -> unit ->
t t
(** Make a configuration *) (** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
*)
val pp : Format.formatter -> t -> unit val pp : Format.formatter -> t -> unit

View file

@ -12,16 +12,18 @@ let needs_gc_metrics = Atomic.make false
let gc_metrics = AList.make () let gc_metrics = AList.make ()
(* side channel for GC, appended to {!E_metrics}'s data *) (* side channel for GC, appended to {!E_metrics}'s data *)
(* capture current GC metrics and push them into {!gc_metrics} for later (* capture current GC metrics if {!needs_gc_metrics} is true,
and push them into {!gc_metrics} for later
collection *) collection *)
let sample_gc_metrics () = let sample_gc_metrics_if_needed () =
Atomic.set needs_gc_metrics false; if Atomic.compare_and_set needs_gc_metrics true false then (
let l = let l =
OT.Metrics.make_resource_metrics OT.Metrics.make_resource_metrics
~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ())
@@ Opentelemetry.GC_metrics.get_metrics () @@ Opentelemetry.GC_metrics.get_metrics ()
in in
AList.add gc_metrics l AList.add gc_metrics l
)
module Config = Config module Config = Config
@ -33,6 +35,7 @@ let _init_curl =
type error = type error =
[ `Status of int * Opentelemetry.Proto.Status.status [ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string | `Failure of string
| `Sysbreak
] ]
let n_errors = Atomic.make 0 let n_errors = Atomic.make 0
@ -40,42 +43,58 @@ let n_errors = Atomic.make 0
let n_dropped = Atomic.make 0 let n_dropped = Atomic.make 0
let report_err_ = function let report_err_ = function
| `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!"
| `Failure msg -> | `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status (code, status) -> | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details })
->
let pp_details out l =
List.iter
(fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s))
l
in
Format.eprintf Format.eprintf
"@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code "@[<2>opentelemetry: export failed with@ http code=%d@ status \
Proto.Status.pp_status status {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@."
code scode
(Bytes.unsafe_to_string message)
pp_details details
module Httpc : sig
type t
val create : unit -> t
module type CURL = sig
val send : val send :
path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result t ->
path:string ->
decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] ->
string ->
('a, error) result
val cleanup : unit -> unit val cleanup : t -> unit
end end = struct
(* create a curl client *)
module Curl () : CURL = struct
open Opentelemetry.Proto open Opentelemetry.Proto
let () = Lazy.force _init_curl let () = Lazy.force _init_curl
let buf_res = Buffer.create 256
(* TODO: use Curl.Multi, etc. instead? *) (* TODO: use Curl.Multi, etc. instead? *)
type t = {
buf_res: Buffer.t;
curl: Curl.t;
}
(* http client *) let create () : t = { buf_res = Buffer.create 256; curl = Curl.init () }
let curl : Curl.t = Curl.init ()
let cleanup () = Curl.cleanup curl let cleanup self = Curl.cleanup self.curl
(* TODO: use Curl multi *)
(* send the content to the remote endpoint/path *) (* send the content to the remote endpoint/path *)
let send ~path ~decode (bod : string) : ('a, error) result = let send (self : t) ~path ~decode (bod : string) : ('a, error) result =
let { curl; buf_res } = self in
Curl.reset curl; Curl.reset curl;
if !debug_ then Curl.set_verbose curl true; if !debug_ then Curl.set_verbose curl true;
Curl.set_url curl (!url ^ path); let full_url = !url ^ path in
Curl.set_url curl full_url;
Curl.set_httppost curl []; Curl.set_httppost curl [];
let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in let to_http_header (k, v) = Printf.sprintf "%s: %s" k v in
let http_headers = List.map to_http_header !headers in let http_headers = List.map to_http_header !headers in
@ -104,14 +123,37 @@ module Curl () : CURL = struct
let code = Curl.get_responsecode curl in let code = Curl.get_responsecode curl in
if !debug_ then if !debug_ then
Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res);
let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in
if code >= 200 && code < 300 then ( if code >= 200 && code < 300 then (
let res = decode dec in match decode with
Ok res | `Ret x -> Ok x
| `Dec f ->
let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in
(try Ok (f dec)
with e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e)
bt)))
) else ( ) else (
let status = Status.decode_status dec in let str = Buffer.contents buf_res in
Error (`Status (code, status)) let dec = Pbrt.Decoder.of_string str in
try
let status = Status.decode_status dec in
Error (`Status (code, status))
with e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf
"httpc: decoding of status (url=%S, code=%d) failed with:\n\
%s\n\
status: %S\n\
%s"
full_url code (Printexc.to_string e) str bt))
) )
| exception Sys.Break -> Error `Sysbreak
| exception Curl.CurlException (_, code, msg) -> | exception Curl.CurlException (_, code, msg) ->
let status = let status =
Status.default_status ~code:(Int32.of_int code) Status.default_status ~code:(Int32.of_int code)
@ -119,19 +161,114 @@ module Curl () : CURL = struct
() ()
in in
Error (`Status (code, status)) Error (`Status (code, status))
with e -> Error (`Failure (Printexc.to_string e)) with
| Sys.Break -> Error `Sysbreak
| e ->
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf "httpc: post on url=%S failed with:\n%s\n%s" full_url
(Printexc.to_string e) bt))
end end
module type PUSH = sig (** Batch of resources to be pushed later.
type elt
val push : elt -> unit This type is thread-safe. *)
module Batch : sig
type 'a t
val is_empty : unit -> bool val push : 'a t -> 'a -> bool
(** [push batch x] pushes [x] into the batch, and heuristically
returns [true] if the batch is ready to be emitted (to know if we should
wake up the sending thread, if any) *)
val is_big_enough : unit -> bool val push' : 'a t -> 'a -> unit
val pop_iter_all : (elt -> unit) -> unit val is_ready : now:Mtime.t -> _ t -> bool
(** is the batch ready to be sent? This is heuristic. *)
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled,
this is true as soon as {!is_empty} is false. If a timeout is provided
for this batch, then it will be ready if an element has been in it
for at least the timeout.
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *)
end = struct
type 'a t = {
lock: Mutex.t;
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?batch ?timeout () : _ t =
Option.iter (fun b -> assert (b > 0)) batch;
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
{
lock = Mutex.create ();
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let is_empty_ self = self.size = 0
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b
let is_ready ~now self : bool =
let@ () = with_mutex_ self.lock in
is_full_ self || timeout_expired_ ~now self
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let@ () = with_mutex_ self.lock in
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
then (
let l = self.q in
self.q <- [];
self.size <- 0;
assert (l <> []);
Some l
) else
None
let push (self : _ t) x : bool =
let@ () = with_mutex_ self.lock in
if self.size >= self.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
)
let push' self x = ignore (push self x : bool)
end end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/… (** An emitter. This is used by {!Backend} below to forward traces/metrics/…
@ -152,57 +289,6 @@ module type EMITTER = sig
val cleanup : unit -> unit val cleanup : unit -> unit
end end
type 'a push = (module PUSH with type elt = 'a)
type on_full_cb = unit -> unit
(* make a "push" object, along with a setter for a callback to call when
it's ready to emit a batch *)
let mk_push (type a) ?batch () :
(module PUSH with type elt = a) * (on_full_cb -> unit) =
let on_full : on_full_cb ref = ref ignore in
let push =
match batch with
| None ->
let r = ref None in
let module M = struct
type elt = a
let is_empty () = !r == None
let is_big_enough () = !r != None
let push x =
r := Some x;
!on_full ()
let pop_iter_all f =
Option.iter f !r;
r := None
end in
(module M : PUSH with type elt = a)
| Some n ->
let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in
let module M = struct
type elt = a
let is_empty () = FQueue.size q = 0
let is_big_enough () = FQueue.size q >= n
let push x =
if (not (FQueue.push q x)) || FQueue.size q > n then (
!on_full ();
if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *)
)
let pop_iter_all f = FQueue.pop_iter_all q f
end in
(module M : PUSH with type elt = a)
in
push, ( := ) on_full
(* start a thread in the background, running [f()] *) (* start a thread in the background, running [f()] *)
let start_bg_thread (f : unit -> unit) : unit = let start_bg_thread (f : unit -> unit) : unit =
let run () = let run () =
@ -212,271 +298,288 @@ let start_bg_thread (f : unit -> unit) : unit =
in in
ignore (Thread.create run () : Thread.t) ignore (Thread.create run () : Thread.t)
let l_is_empty = function
| [] -> true
| _ :: _ -> false
let batch_is_empty = List.for_all l_is_empty
(* make an emitter. (* make an emitter.
exceptions inside should be caught, see exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *) https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~(config : Config.t) () : (module EMITTER) = let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in let open Proto in
let continue = ref true in (* local helpers *)
let open struct
let timeout =
if config.batch_timeout_ms > 0 then
Some Mtime.Span.(config.batch_timeout_ms * ms)
else
None
let ((module E_trace) : Trace.resource_spans list push), on_trace_full = let batch_traces : Trace.resource_spans list Batch.t =
mk_push ?batch:config.batch_traces () Batch.make ?batch:config.batch_traces ?timeout ()
in
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full
=
mk_push ?batch:config.batch_metrics ()
in
let ((module E_logs) : Logs.resource_logs list push), on_logs_full =
mk_push ?batch:config.batch_logs ()
in
let encoder = Pbrt.Encoder.create () in let batch_metrics : Metrics.resource_metrics list Batch.t =
Batch.make ?batch:config.batch_metrics ?timeout ()
let ((module C) as curl) = (module Curl () : CURL) in let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()
let on_tick_cbs_ = ref (ref []) in let on_tick_cbs_ = Atomic.make (ref [])
let set_on_tick_callbacks = ( := ) on_tick_cbs_ in
let send_http_ ~path ~encode x : unit = let set_on_tick_callbacks = Atomic.set on_tick_cbs_
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
match C.send ~path ~decode:(fun _ -> ()) data with
| Ok () -> ()
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err
in
let send_metrics_http (l : Metrics.resource_metrics list list) = let send_http_ (httpc : Httpc.t) encoder ~path ~encode x : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in Pbrt.Encoder.reset encoder;
let x = encode x encoder;
Metrics_service.default_export_metrics_service_request ~resource_metrics:l let data = Pbrt.Encoder.to_string encoder in
() match Httpc.send httpc ~path ~decode:(`Ret ()) data with
in | Ok () -> ()
send_http_ ~path:"/v1/metrics" | Error `Sysbreak ->
~encode:Metrics_service.encode_export_metrics_service_request x Printf.eprintf "ctrl-c captured, stopping\n%!";
in Atomic.set stop true
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err;
(* avoid crazy error loop *)
Thread.delay 3.
let send_traces_http (l : Trace.resource_spans list list) = let send_metrics_http curl encoder (l : Metrics.resource_metrics list list)
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in =
let x = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Trace_service.default_export_trace_service_request ~resource_spans:l () let x =
in Metrics_service.default_export_metrics_service_request
send_http_ ~path:"/v1/traces" ~resource_metrics:l ()
~encode:Trace_service.encode_export_trace_service_request x in
in send_http_ curl encoder ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x
let send_logs_http (l : Logs.resource_logs list list) = let send_traces_http curl encoder (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x = let x =
Logs_service.default_export_logs_service_request ~resource_logs:l () Trace_service.default_export_trace_service_request ~resource_spans:l ()
in in
send_http_ ~path:"/v1/logs" send_http_ curl encoder ~path:"/v1/traces"
~encode:Logs_service.encode_export_logs_service_request x ~encode:Trace_service.encode_export_trace_service_request x
in
let last_wakeup = Atomic.make (Mtime_clock.now ()) in let send_logs_http curl encoder (l : Logs.resource_logs list list) =
let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let batch_timeout () : bool = let x =
let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in Logs_service.default_export_logs_service_request ~resource_logs:l ()
Mtime.Span.compare elapsed timeout >= 0 in
in send_http_ curl encoder ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x
let emit_metrics ?(force = false) () : bool = (* emit metrics, if the batch is full or timeout lapsed *)
if force || ((not force) && E_metrics.is_big_enough ()) then ( let emit_metrics_maybe ~now ?force httpc encoder : bool =
let batch = ref [ AList.pop_all gc_metrics ] in match Batch.pop_if_ready ?force ~now batch_metrics with
E_metrics.pop_iter_all (fun l -> batch := l :: !batch); | None -> false
let do_something = not (l_is_empty !batch) in | Some l ->
if do_something then ( let batch = AList.pop_all gc_metrics :: l in
send_metrics_http !batch; send_metrics_http httpc encoder batch;
Atomic.set last_wakeup (Mtime_clock.now ()) true
);
do_something
) else
false
in
let emit_traces ?(force = false) () : bool =
if force || ((not force) && E_trace.is_big_enough ()) then (
let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_traces_http !batch;
Atomic.set last_wakeup (Mtime_clock.now ())
);
do_something
) else
false
in
let emit_logs ?(force = false) () : bool =
if force || ((not force) && E_logs.is_big_enough ()) then (
let batch = ref [] in
E_logs.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_logs_http !batch;
Atomic.set last_wakeup (Mtime_clock.now ())
);
do_something
) else
false
in
let[@inline] guard f = let emit_traces_maybe ~now ?force httpc encoder : bool =
try f () match Batch.pop_if_ready ?force ~now batch_traces with
with e -> | None -> false
Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" | Some l ->
(Printexc.to_string e) send_traces_http httpc encoder l;
in true
let emit_all_force () = let emit_logs_maybe ~now ?force httpc encoder : bool =
ignore (emit_traces ~force:true () : bool); match Batch.pop_if_ready ?force ~now batch_logs with
ignore (emit_logs ~force:true () : bool); | None -> false
ignore (emit_metrics ~force:true () : bool) | Some l ->
in send_logs_http httpc encoder l;
true
if config.thread then ( let[@inline] guard_exn_ where f =
(let m = Mutex.create () in try f ()
Lock.set_mutex with e ->
~lock:(fun () -> Mutex.lock m) let bt = Printexc.get_backtrace () in
~unlock:(fun () -> Mutex.unlock m)); Printf.eprintf
"opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where
(Printexc.to_string e) bt
let ((module C) as curl) = (module Curl () : CURL) in let emit_all_force (httpc : Httpc.t) encoder =
let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now ~force:true httpc encoder : bool);
ignore (emit_logs_maybe ~now ~force:true httpc encoder : bool);
ignore (emit_metrics_maybe ~now ~force:true httpc encoder : bool)
let m = Mutex.create () in let tick_common_ () =
let cond = Condition.create () in if !debug_ then Printf.eprintf "tick (from %d)\n%!" (tid ());
sample_gc_metrics_if_needed ();
(* loop for the thread that processes events and sends them to collector *)
let bg_thread () =
while !continue do
let@ () = guard in
let timeout = batch_timeout () in
let do_metrics = emit_metrics ~force:timeout () in
let do_traces = emit_traces ~force:timeout () in
let do_logs = emit_logs ~force:timeout () in
if (not do_metrics) && (not do_traces) && not do_logs then
(* wait *)
let@ () = with_mutex_ m in
Condition.wait cond m
done;
(* flush remaining events *)
let@ () = guard in
ignore (emit_traces ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool);
ignore (emit_logs ~force:true () : bool);
C.cleanup ()
in
start_bg_thread bg_thread;
let wakeup () =
with_mutex_ m (fun () -> Condition.signal cond);
Thread.yield ()
in
(* wake up if a batch is full *)
on_metrics_full wakeup;
on_trace_full wakeup;
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
List.iter List.iter
(fun f -> (fun f ->
try f () try f ()
with e -> with e ->
Printf.eprintf "on tick callback raised: %s\n" Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e)) (Printexc.to_string e))
!(!on_tick_cbs_); !(Atomic.get on_tick_cbs_);
if batch_timeout () then wakeup () ()
in
if config.ticker_thread then ( let setup_ticker_thread ~tick ~finally () =
(* thread that calls [tick()] regularly, to help enforce timeouts *) (* thread that calls [tick()] regularly, to help enforce timeouts *)
let tick_thread () = let tick_thread () =
while true do let@ () =
Fun.protect ~finally:(fun () ->
Atomic.set stop true;
finally ())
in
while not @@ Atomic.get stop do
Thread.delay 0.5; Thread.delay 0.5;
tick () tick ()
done done
in in
start_bg_thread tick_thread start_bg_thread tick_thread
); end in
(* setup a global lock *)
(let global_lock_ = Mutex.create () in
Lock.set_mutex
~lock:(fun () -> Mutex.lock global_lock_)
~unlock:(fun () -> Mutex.unlock global_lock_));
if config.bg_threads > 0 then (
(* lock+condition used for background threads to wait, and be woken up
when a batch is ready *)
let m = Mutex.create () in
let cond = Condition.create () in
(* loop for the thread that processes events and sends them to collector *)
let bg_thread () =
let httpc = Httpc.create () in
let encoder = Pbrt.Encoder.create () in
while not @@ Atomic.get stop do
let@ () = guard_exn_ (spf "bg thread[%d] (main loop)" @@ tid ()) in
let now = Mtime_clock.now () in
let do_metrics = emit_metrics_maybe ~now httpc encoder in
let do_traces = emit_traces_maybe ~now httpc encoder in
let do_logs = emit_logs_maybe ~now httpc encoder in
if (not do_metrics) && (not do_traces) && not do_logs then (
let@ () = guard_exn_ (spf "bg thread[%d] (waiting)" @@ tid ()) in
(* wait for something to happen *)
Mutex.lock m;
Condition.wait cond m;
Mutex.unlock m
)
done;
(* flush remaining events once we exit *)
let@ () = guard_exn_ "bg thread (cleanup)" in
emit_all_force httpc encoder;
Httpc.cleanup httpc
in
for _i = 1 to config.bg_threads do
start_bg_thread bg_thread
done;
(* if the bg thread waits, this will wake it up so it can send batches *)
let wakeup ~all () =
with_mutex_ m (fun () ->
if all then
Condition.broadcast cond
else
Condition.signal cond);
Thread.yield ()
in
let tick () =
tick_common_ ();
let now = Mtime_clock.now () in
if Atomic.get stop then
wakeup ~all:true ()
else if
Batch.is_ready ~now batch_metrics
|| Batch.is_ready ~now batch_traces
|| Batch.is_ready ~now batch_logs
then
wakeup ~all:false ()
in
if config.ticker_thread then
setup_ticker_thread ~tick ~finally:(fun () -> wakeup ~all:true ()) ();
let module M = struct let module M = struct
let push_trace e = let push_trace e = if Batch.push batch_traces e then wakeup ~all:false ()
E_trace.push e;
if batch_timeout () then wakeup ()
let push_metrics e = let push_metrics e =
E_metrics.push e; if Batch.push batch_metrics e then wakeup ~all:false ()
if batch_timeout () then wakeup ()
let push_logs e = let push_logs e = if Batch.push batch_logs e then wakeup ~all:false ()
E_logs.push e;
if batch_timeout () then wakeup ()
let set_on_tick_callbacks = set_on_tick_callbacks let set_on_tick_callbacks = set_on_tick_callbacks
let tick = tick let tick = tick
let cleanup () = let cleanup () =
continue := false; Atomic.set stop true;
with_mutex_ m (fun () -> Condition.broadcast cond) if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
wakeup ~all:true ()
end in end in
(module M) (module M)
) else ( ) else (
on_metrics_full (fun () -> let httpc = Httpc.create () in
if Atomic.get needs_gc_metrics then sample_gc_metrics (); let encoder = Pbrt.Encoder.create () in
ignore (emit_metrics () : bool));
on_trace_full (fun () -> ignore (emit_traces () : bool));
on_logs_full (fun () -> ignore (emit_logs () : bool));
let cleanup () =
emit_all_force ();
C.cleanup ()
in
let module M = struct let module M = struct
(* we make sure that this is thread-safe, even though we don't have a
background thread. There can still be a ticker thread, and there
can also be several user threads that produce spans and call
the emit functions. *)
let push_trace e = let push_trace e =
let@ () = guard in let@ () = guard_exn_ "push trace" in
E_trace.push e; Batch.push' batch_traces e;
if batch_timeout () then emit_all_force () let now = Mtime_clock.now () in
let@ () = Lock.with_lock in
ignore (emit_traces_maybe ~now httpc encoder : bool)
let push_metrics e = let push_metrics e =
let@ () = guard in let@ () = guard_exn_ "push metrics" in
E_metrics.push e; sample_gc_metrics_if_needed ();
if batch_timeout () then emit_all_force () Batch.push' batch_metrics e;
let now = Mtime_clock.now () in
let@ () = Lock.with_lock in
ignore (emit_metrics_maybe ~now httpc encoder : bool)
let push_logs e = let push_logs e =
let@ () = guard in let@ () = guard_exn_ "push logs" in
E_logs.push e; Batch.push' batch_logs e;
if batch_timeout () then emit_all_force () let now = Mtime_clock.now () in
let@ () = Lock.with_lock in
ignore (emit_logs_maybe ~now httpc encoder : bool)
let set_on_tick_callbacks = set_on_tick_callbacks let set_on_tick_callbacks = set_on_tick_callbacks
let tick () = let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics (); sample_gc_metrics_if_needed ();
if batch_timeout () then emit_all_force () let@ () = Lock.with_lock in
let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now httpc encoder : bool);
ignore (emit_metrics_maybe ~now httpc encoder : bool);
ignore (emit_logs_maybe ~now httpc encoder : bool);
()
let cleanup = cleanup (* make sure we have a ticker thread, if required *)
let () =
if config.ticker_thread then
setup_ticker_thread ~tick ~finally:ignore ()
let cleanup () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
emit_all_force httpc encoder;
Httpc.cleanup httpc
end in end in
(module M) (module M)
) )
module Backend (Arg : sig module Backend (Arg : sig
val stop : bool Atomic.t
val config : Config.t val config : Config.t
end) end)
() : Opentelemetry.Collector.BACKEND = struct () : Opentelemetry.Collector.BACKEND = struct
include (val mk_emitter ~config:Arg.config ()) include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
open Opentelemetry.Proto open Opentelemetry.Proto
open Opentelemetry.Collector open Opentelemetry.Collector
@ -485,11 +588,11 @@ end)
{ {
send = send =
(fun l ~ret -> (fun l ~ret ->
let@ () = Lock.with_lock in (if !debug_ then
if !debug_ then let@ () = Lock.with_lock in
Format.eprintf "send spans %a@." Format.eprintf "send spans %a@."
(Format.pp_print_list Trace.pp_resource_spans) (Format.pp_print_list Trace.pp_resource_spans)
l; l);
push_trace l; push_trace l;
ret ()); ret ());
} }
@ -499,7 +602,10 @@ end)
let timeout_sent_metrics = Mtime.Span.(5 * s) let timeout_sent_metrics = Mtime.Span.(5 * s)
(* send metrics from time to time *) (* send metrics from time to time *)
let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true let signal_emit_gc_metrics () =
if !debug_ then
Printf.eprintf "opentelemetry: emit GC metrics requested\n%!";
Atomic.set needs_gc_metrics true
let additional_metrics () : Metrics.resource_metrics list = let additional_metrics () : Metrics.resource_metrics list =
(* add exporter metrics to the lot? *) (* add exporter metrics to the lot? *)
@ -510,19 +616,21 @@ end)
Mtime.Span.compare elapsed timeout_sent_metrics > 0 Mtime.Span.compare elapsed timeout_sent_metrics > 0
in in
(* there is a possible race condition here, as several threads might update
metrics at the same time. But that's harmless. *)
if add_own_metrics then ( if add_own_metrics then (
let open OT.Metrics in
Atomic.set last_sent_metrics now; Atomic.set last_sent_metrics now;
let open OT.Metrics in
[ [
make_resource_metrics make_resource_metrics
[ [
sum ~name:"otel-export.dropped" ~is_monotonic:true sum ~name:"otel.export.dropped" ~is_monotonic:true
[ [
int int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
]; ];
sum ~name:"otel-export.errors" ~is_monotonic:true sum ~name:"otel.export.errors" ~is_monotonic:true
[ [
int int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
@ -537,11 +645,11 @@ end)
{ {
send = send =
(fun m ~ret -> (fun m ~ret ->
let@ () = Lock.with_lock in (if !debug_ then
if !debug_ then let@ () = Lock.with_lock in
Format.eprintf "send metrics %a@." Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics) (Format.pp_print_list Metrics.pp_resource_metrics)
m; m);
let m = List.rev_append (additional_metrics ()) m in let m = List.rev_append (additional_metrics ()) m in
push_metrics m; push_metrics m;
@ -552,21 +660,24 @@ end)
{ {
send = send =
(fun m ~ret -> (fun m ~ret ->
let@ () = Lock.with_lock in (if !debug_ then
if !debug_ then let@ () = Lock.with_lock in
Format.eprintf "send logs %a@." Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs) (Format.pp_print_list Logs.pp_resource_logs)
m; m);
push_logs m; push_logs m;
ret ()); ret ());
} }
end end
let setup_ ~(config : Config.t) () = let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
debug_ := config.debug; debug_ := config.debug;
let module B = let module B =
Backend Backend
(struct (struct
let stop = stop
let config = config let config = config
end) end)
() ()
@ -574,15 +685,15 @@ let setup_ ~(config : Config.t) () =
Opentelemetry.Collector.set_backend (module B); Opentelemetry.Collector.set_backend (module B);
B.cleanup B.cleanup
let setup ?(config = Config.make ()) ?(enable = true) () = let setup ?stop ?(config = Config.make ()) ?(enable = true) () =
if enable then ( if enable then (
let cleanup = setup_ ~config () in let cleanup = setup_ ?stop ~config () in
at_exit cleanup at_exit cleanup
) )
let with_setup ?(config = Config.make ()) ?(enable = true) () f = let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f =
if enable then ( if enable then (
let cleanup = setup_ ~config () in let cleanup = setup_ ?stop ~config () in
Fun.protect ~finally:cleanup f Fun.protect ~finally:cleanup f
) else ) else
f () f ()

View file

@ -18,15 +18,27 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
(** Set a lock/unlock pair to protect the critical sections (** Set a lock/unlock pair to protect the critical sections
of {!Opentelemetry.Collector.BACKEND} *) of {!Opentelemetry.Collector.BACKEND} *)
module Atomic = Opentelemetry_atomic.Atomic
module Config = Config module Config = Config
val setup : ?config:Config.t -> ?enable:bool -> unit -> unit val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can @param enable actually setup the backend (default true). This can
be used to enable/disable the setup depending on CLI arguments be used to enable/disable the setup depending on CLI arguments
or environment. or environment.
@param config configuration to use *) @param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads
will all stop after a little while.
*)
val with_setup : ?config:Config.t -> ?enable:bool -> unit -> (unit -> 'a) -> 'a val with_setup :
?stop:bool Atomic.t ->
?config:Config.t ->
?enable:bool ->
unit ->
(unit -> 'a) ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up (** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns. *) after [f()] returns
See {!setup} for more details. *)

View file

@ -600,7 +600,13 @@ module Trace = struct
default_resource_spans ~resource:(Some resource) default_resource_spans ~resource:(Some resource)
~instrumentation_library_spans:[ ils ] () ~instrumentation_library_spans:[ ils ] ()
(** Sync emitter *) (** Sync emitter.
This instructs the collector to forward
the spans to some backend at a later point.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
let emit ?service_name ?attrs (spans : span list) : unit = let emit ?service_name ?attrs (spans : span list) : unit =
let rs = make_resource_spans ?service_name ?attrs spans in let rs = make_resource_spans ?service_name ?attrs spans in
Collector.send_trace [ rs ] ~ret:(fun () -> ()) Collector.send_trace [ rs ] ~ret:(fun () -> ())
@ -629,7 +635,10 @@ module Trace = struct
if Collector.has_backend () then if Collector.has_backend () then
scope.attrs <- List.rev_append (attrs ()) scope.attrs scope.attrs <- List.rev_append (attrs ()) scope.attrs
(** Sync span guard *) (** Sync span guard.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
let with_ ?trace_state ?service_name let with_ ?trace_state ?service_name
?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope
?links name (f : scope -> 'a) : 'a = ?links name (f : scope -> 'a) : 'a =
@ -771,7 +780,11 @@ module Metrics = struct
(** Emit some metrics to the collector (sync). This blocks until (** Emit some metrics to the collector (sync). This blocks until
the backend has pushed the metrics into some internal queue, or the backend has pushed the metrics into some internal queue, or
discarded them. *) discarded them.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks.
*)
let emit ?attrs (l : t list) : unit = let emit ?attrs (l : t list) : unit =
let rm = make_resource_metrics ?attrs l in let rm = make_resource_metrics ?attrs l in
Collector.send_metrics [ rm ] ~ret:ignore Collector.send_metrics [ rm ] ~ret:ignore
@ -851,6 +864,12 @@ module Logs = struct
?trace_id ?span_id bod) ?trace_id ?span_id bod)
fmt fmt
(** Emit logs.
This instructs the collector to send the logs to some backend at
a later date.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
let emit ?service_name ?attrs (l : t list) : unit = let emit ?service_name ?attrs (l : t list) : unit =
let attributes = Globals.mk_attributes ?service_name ?attrs () in let attributes = Globals.mk_attributes ?service_name ?attrs () in
let resource = Proto.Resource.default_resource ~attributes () in let resource = Proto.Resource.default_resource ~attributes () in
@ -880,8 +899,10 @@ module Metrics_callbacks = struct
end end
(** [register f] adds the callback [f] to the list. (** [register f] adds the callback [f] to the list.
[f] will be called at unspecified times and is expected to return
a list of metrics. *) [f] will be called at unspecified times and is expected to return
a list of metrics. It might be called regularly by the backend,
in particular (but not only) when {!Collector.tick} is called. *)
let register f : unit = let register f : unit =
if !cbs_ = [] then if !cbs_ = [] then
(* make sure we call [f] (and others) at each tick *) (* make sure we call [f] (and others) at each tick *)

View file

@ -98,6 +98,7 @@ let () =
let debug = ref false in let debug = ref false in
let thread = ref true in let thread = ref true in
let n_bg_threads = ref 0 in
let batch_traces = ref 400 in let batch_traces = ref 400 in
let batch_metrics = ref 3 in let batch_metrics = ref 3 in
let opts = let opts =
@ -114,6 +115,7 @@ let () =
"--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop";
"--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop";
"-j", Arg.Set_int n_jobs, " number of parallel jobs"; "-j", Arg.Set_int n_jobs, " number of parallel jobs";
"--bg-threads", Arg.Set_int n_bg_threads, " number of background threads";
] ]
|> Arg.align |> Arg.align
in in
@ -130,7 +132,14 @@ let () =
Opentelemetry_client_ocurl.Config.make ~debug:!debug Opentelemetry_client_ocurl.Config.make ~debug:!debug
~batch_traces:(some_if_nzero batch_traces) ~batch_traces:(some_if_nzero batch_traces)
~batch_metrics:(some_if_nzero batch_metrics) ~batch_metrics:(some_if_nzero batch_metrics)
~thread:!thread () ~thread:!thread
?bg_threads:
(let n = !n_bg_threads in
if n = 0 then
None
else
Some n)
()
in in
Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@."
!sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config;
@ -142,4 +151,4 @@ let () =
Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!" Printf.printf "\ndone. %d spans in %.4fs (%.4f/s)\n%!"
(Atomic.get num_tr) elapsed n_per_sec) (Atomic.get num_tr) elapsed n_per_sec)
in in
Opentelemetry_client_ocurl.with_setup ~config () run Opentelemetry_client_ocurl.with_setup ~stop ~config () run