feat(ocurl): simpler, cleaner backend implementation

we now only have a single representation of a batch, with its own
internal state to handle timeouts. It handles its own locking, and there
are no callbacks anymore.
This commit is contained in:
Simon Cruanes 2022-07-06 12:06:51 -04:00
parent 85b6e91615
commit d689dfc8fb
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
4 changed files with 229 additions and 256 deletions

2
dune
View file

@ -1,3 +1,3 @@
(env
(_
(flags :standard -warn-error -a+8)))
(flags :standard -warn-error -a+8 -strict-sequence)))

View file

@ -1,27 +0,0 @@
type 'a t = {
arr: 'a array;
mutable i: int;
}
let create ~dummy n : _ t =
assert (n >= 1);
{ arr = Array.make n dummy; i = 0 }
let[@inline] size self = self.i
let[@inline] is_full self = self.i = Array.length self.arr
let push (self : _ t) x : bool =
if is_full self then
false
else (
self.arr.(self.i) <- x;
self.i <- 1 + self.i;
true
)
let pop_iter_all (self : _ t) f =
for j = 0 to self.i - 1 do
f self.arr.(j)
done;
self.i <- 0

View file

@ -1,11 +0,0 @@
(** queue of fixed size *)
type 'a t
val create : dummy:'a -> int -> 'a t
val size : _ t -> int
val push : 'a t -> 'a -> bool (* true iff it could write element *)
val pop_iter_all : 'a t -> ('a -> unit) -> unit

View file

@ -33,6 +33,7 @@ let _init_curl =
type error =
[ `Status of int * Opentelemetry.Proto.Status.status
| `Failure of string
| `Sysbreak
]
let n_errors = Atomic.make 0
@ -40,6 +41,7 @@ let n_errors = Atomic.make 0
let n_dropped = Atomic.make 0
let report_err_ = function
| `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!"
| `Failure msg ->
Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg
| `Status (code, status) ->
@ -112,6 +114,7 @@ module Curl () : CURL = struct
let status = Status.decode_status dec in
Error (`Status (code, status))
)
| exception Sys.Break -> Error `Sysbreak
| exception Curl.CurlException (_, code, msg) ->
let status =
Status.default_status ~code:(Int32.of_int code)
@ -119,19 +122,95 @@ module Curl () : CURL = struct
()
in
Error (`Status (code, status))
with e -> Error (`Failure (Printexc.to_string e))
with
| Sys.Break -> Error `Sysbreak
| e -> Error (`Failure (Printexc.to_string e))
end
module type PUSH = sig
type elt
module type BATCH = sig end
val push : elt -> unit
module Batch : sig
type 'a t
val is_empty : unit -> bool
val push : 'a t -> 'a -> bool
(** [push batch x] pushes [x] into the batch, and heuristically
returns [true] if the batch is ready to be emitted (to know if we should
wake up the sending thread, if any) *)
val is_big_enough : unit -> bool
val push' : 'a t -> 'a -> unit
val pop_iter_all : (elt -> unit) -> unit
val is_ready : now:Mtime.t -> _ t -> bool
(** is the batch ready to be sent? This is heuristic. *)
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled,
this is true as soon as {!is_empty} is false. If a timeout is provided
for this batch, then it will be ready if an element has been in it
for at least the timeout.
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
end = struct
type 'a t = {
lock: Mutex.t;
mutable size: int;
mutable q: 'a list;
batch: int option;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?batch ?timeout () : _ t =
{
lock = Mutex.create ();
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
}
let is_empty_ self = self.size = 0
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b
let is_ready ~now self : bool =
let@ () = with_mutex_ self.lock in
is_full_ self || timeout_expired_ ~now self
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let@ () = with_mutex_ self.lock in
if
(force && not (is_empty_ self))
|| is_full_ self || timeout_expired_ ~now self
then (
let l = self.q in
self.q <- [];
self.size <- 0;
Some l
) else
None
let push (self : _ t) x : bool =
let@ () = with_mutex_ self.lock in
if self.size = 0 && Option.is_some self.timeout then
self.start <- Mtime_clock.now ();
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
let push' self x = ignore (push self x : bool)
end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/…
@ -152,57 +231,6 @@ module type EMITTER = sig
val cleanup : unit -> unit
end
type 'a push = (module PUSH with type elt = 'a)
type on_full_cb = unit -> unit
(* make a "push" object, along with a setter for a callback to call when
it's ready to emit a batch *)
let mk_push (type a) ?batch () :
(module PUSH with type elt = a) * (on_full_cb -> unit) =
let on_full : on_full_cb ref = ref ignore in
let push =
match batch with
| None ->
let r = ref None in
let module M = struct
type elt = a
let is_empty () = !r == None
let is_big_enough () = !r != None
let push x =
r := Some x;
!on_full ()
let pop_iter_all f =
Option.iter f !r;
r := None
end in
(module M : PUSH with type elt = a)
| Some n ->
let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in
let module M = struct
type elt = a
let is_empty () = FQueue.size q = 0
let is_big_enough () = FQueue.size q >= n
let push x =
if (not (FQueue.push q x)) || FQueue.size q > n then (
!on_full ();
if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *)
)
let pop_iter_all f = FQueue.pop_iter_all q f
end in
(module M : PUSH with type elt = a)
in
push, ( := ) on_full
(* start a thread in the background, running [f()] *)
let start_bg_thread (f : unit -> unit) : unit =
let run () =
@ -224,169 +252,145 @@ let batch_is_empty = List.for_all l_is_empty
https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~(config : Config.t) () : (module EMITTER) =
let open Proto in
let continue = ref true in
(* local helpers *)
let open struct
let continue = Atomic.make true
let ((module E_trace) : Trace.resource_spans list push), on_trace_full =
mk_push ?batch:config.batch_traces ()
in
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full
=
mk_push ?batch:config.batch_metrics ()
in
let ((module E_logs) : Logs.resource_logs list push), on_logs_full =
mk_push ?batch:config.batch_logs ()
in
let timeout =
if config.batch_timeout_ms > 0 then
Some Mtime.Span.(config.batch_timeout_ms * ms)
else
None
let encoder = Pbrt.Encoder.create () in
let batch_traces : Trace.resource_spans list Batch.t =
Batch.make ?batch:config.batch_traces ?timeout ()
let ((module C) as curl) = (module Curl () : CURL) in
let batch_metrics : Metrics.resource_metrics list Batch.t =
Batch.make ?batch:config.batch_metrics ?timeout ()
let on_tick_cbs_ = ref (ref []) in
let set_on_tick_callbacks = ( := ) on_tick_cbs_ in
let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout ()
let send_http_ ~path ~encode x : unit =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
match C.send ~path ~decode:(fun _ -> ()) data with
| Ok () -> ()
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err
in
let encoder = Pbrt.Encoder.create ()
let send_metrics_http (l : Metrics.resource_metrics list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Metrics_service.default_export_metrics_service_request ~resource_metrics:l
()
in
send_http_ ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x
in
let curl = (module Curl () : CURL)
let send_traces_http (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x
in
module C = (val curl)
let send_logs_http (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x
in
let on_tick_cbs_ = ref (ref [])
let last_wakeup = Atomic.make (Mtime_clock.now ()) in
let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in
let batch_timeout () : bool =
let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in
Mtime.Span.compare elapsed timeout >= 0
in
let set_on_tick_callbacks = ( := ) on_tick_cbs_
let emit_metrics ?(force = false) () : bool =
if force || ((not force) && E_metrics.is_big_enough ()) then (
let batch = ref [ AList.pop_all gc_metrics ] in
E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_metrics_http !batch;
Atomic.set last_wakeup (Mtime_clock.now ())
);
do_something
) else
false
in
let emit_traces ?(force = false) () : bool =
if force || ((not force) && E_trace.is_big_enough ()) then (
let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_traces_http !batch;
Atomic.set last_wakeup (Mtime_clock.now ())
);
do_something
) else
false
in
let emit_logs ?(force = false) () : bool =
if force || ((not force) && E_logs.is_big_enough ()) then (
let batch = ref [] in
E_logs.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_logs_http !batch;
Atomic.set last_wakeup (Mtime_clock.now ())
);
do_something
) else
false
in
let send_http_ ~path ~encode x : unit =
Pbrt.Encoder.reset encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
match C.send ~path ~decode:(fun _ -> ()) data with
| Ok () -> ()
| Error `Sysbreak ->
Printf.eprintf "ctrl-c captured, stopping\n%!";
Atomic.set continue false
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err
let[@inline] guard f =
try f ()
with e ->
Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!"
(Printexc.to_string e)
in
let send_metrics_http (l : Metrics.resource_metrics list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l ()
in
send_http_ ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x
let emit_all_force () =
ignore (emit_traces ~force:true () : bool);
ignore (emit_logs ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool)
in
let send_traces_http (l : Trace.resource_spans list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request ~resource_spans:l ()
in
send_http_ ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x
let send_logs_http (l : Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request ~resource_logs:l ()
in
send_http_ ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x
(* emit metrics, if the batch is full or timeout lapsed *)
let emit_metrics_maybe ~now ?force () : bool =
match Batch.pop_if_ready ?force ~now batch_metrics with
| None -> false
| Some l ->
let batch = AList.pop_all gc_metrics :: l in
send_metrics_http batch;
true
let emit_traces_maybe ~now ?force () : bool =
match Batch.pop_if_ready ?force ~now batch_traces with
| None -> false
| Some l ->
send_traces_http l;
true
let emit_logs_maybe ~now ?force () : bool =
match Batch.pop_if_ready ?force ~now batch_logs with
| None -> false
| Some l ->
send_logs_http l;
true
let[@inline] guard_exn_ f =
try f ()
with e ->
Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!"
(Printexc.to_string e)
let emit_all_force () =
let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now ~force:true () : bool);
ignore (emit_logs_maybe ~now ~force:true () : bool);
ignore (emit_metrics_maybe ~now ~force:true () : bool)
end in
if config.thread then (
(let m = Mutex.create () in
Lock.set_mutex
~lock:(fun () -> Mutex.lock m)
~unlock:(fun () -> Mutex.unlock m));
let ((module C) as curl) = (module Curl () : CURL) in
let m = Mutex.create () in
let cond = Condition.create () in
(* loop for the thread that processes events and sends them to collector *)
let bg_thread () =
while !continue do
let@ () = guard in
let timeout = batch_timeout () in
while Atomic.get continue do
let@ () = guard_exn_ in
let do_metrics = emit_metrics ~force:timeout () in
let do_traces = emit_traces ~force:timeout () in
let do_logs = emit_logs ~force:timeout () in
let now = Mtime_clock.now () in
let do_metrics = emit_metrics_maybe ~now () in
let do_traces = emit_traces_maybe ~now () in
let do_logs = emit_logs_maybe ~now () in
if (not do_metrics) && (not do_traces) && not do_logs then
(* wait *)
(* wait for something to happen *)
let@ () = with_mutex_ m in
Condition.wait cond m
done;
(* flush remaining events *)
let@ () = guard in
ignore (emit_traces ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool);
ignore (emit_logs ~force:true () : bool);
(* flush remaining events once we exit *)
let@ () = guard_exn_ in
emit_all_force ();
C.cleanup ()
in
start_bg_thread bg_thread;
(* if the bg thread waits, this will wake it up so it can send batches *)
let wakeup () =
with_mutex_ m (fun () -> Condition.signal cond);
with_mutex_ m (fun () -> Condition.broadcast cond);
Thread.yield ()
in
(* wake up if a batch is full *)
on_metrics_full wakeup;
on_trace_full wakeup;
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
List.iter
@ -396,50 +400,47 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e))
!(!on_tick_cbs_);
if batch_timeout () then wakeup ()
let now = Mtime_clock.now () in
if
(not (Atomic.get continue))
|| Batch.is_ready ~now batch_metrics
|| Batch.is_ready ~now batch_traces
|| Batch.is_ready ~now batch_logs
then
wakeup ()
in
if config.ticker_thread then (
(* thread that calls [tick()] regularly, to help enforce timeouts *)
let tick_thread () =
while true do
while Atomic.get continue do
Thread.delay 0.5;
tick ()
done
done;
wakeup ()
in
start_bg_thread tick_thread
);
let module M = struct
let push_trace e =
E_trace.push e;
if batch_timeout () then wakeup ()
let push_trace e = if Batch.push batch_traces e then wakeup ()
let push_metrics e =
E_metrics.push e;
if batch_timeout () then wakeup ()
let push_metrics e = if Batch.push batch_metrics e then wakeup ()
let push_logs e =
E_logs.push e;
if batch_timeout () then wakeup ()
let push_logs e = if Batch.push batch_logs e then wakeup ()
let set_on_tick_callbacks = set_on_tick_callbacks
let tick = tick
let cleanup () =
continue := false;
Atomic.set continue false;
with_mutex_ m (fun () -> Condition.broadcast cond)
end in
(module M)
) else (
on_metrics_full (fun () ->
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
ignore (emit_metrics () : bool));
on_trace_full (fun () -> ignore (emit_traces () : bool));
on_logs_full (fun () -> ignore (emit_logs () : bool));
let cleanup () =
emit_all_force ();
C.cleanup ()
@ -447,25 +448,33 @@ let mk_emitter ~(config : Config.t) () : (module EMITTER) =
let module M = struct
let push_trace e =
let@ () = guard in
E_trace.push e;
if batch_timeout () then emit_all_force ()
let@ () = guard_exn_ in
Batch.push' batch_traces e;
let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now () : bool)
let push_metrics e =
let@ () = guard in
E_metrics.push e;
if batch_timeout () then emit_all_force ()
let@ () = guard_exn_ in
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
Batch.push' batch_metrics e;
let now = Mtime_clock.now () in
ignore (emit_metrics_maybe ~now () : bool)
let push_logs e =
let@ () = guard in
E_logs.push e;
if batch_timeout () then emit_all_force ()
let@ () = guard_exn_ in
Batch.push' batch_logs e;
let now = Mtime_clock.now () in
ignore (emit_logs_maybe ~now () : bool)
let set_on_tick_callbacks = set_on_tick_callbacks
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics ();
if batch_timeout () then emit_all_force ()
let now = Mtime_clock.now () in
ignore (emit_traces_maybe ~now () : bool);
ignore (emit_metrics_maybe ~now () : bool);
ignore (emit_logs_maybe ~now () : bool);
()
let cleanup = cleanup
end in
@ -516,13 +525,13 @@ end)
[
make_resource_metrics
[
sum ~name:"otel-export.dropped" ~is_monotonic:true
sum ~name:"otel.export.dropped" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped);
];
sum ~name:"otel-export.errors" ~is_monotonic:true
sum ~name:"otel.export.errors" ~is_monotonic:true
[
int
~start_time_unix_nano:(Mtime.to_uint64_ns last_emit)
@ -537,12 +546,13 @@ end)
{
send =
(fun m ~ret ->
let@ () = Lock.with_lock in
if !debug_ then
Format.eprintf "send metrics %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
m;
let@ () = Lock.with_lock in
let m = List.rev_append (additional_metrics ()) m in
push_metrics m;
ret ());
@ -552,11 +562,12 @@ end)
{
send =
(fun m ~ret ->
let@ () = Lock.with_lock in
if !debug_ then
Format.eprintf "send logs %a@."
(Format.pp_print_list Logs.pp_resource_logs)
m;
let@ () = Lock.with_lock in
push_logs m;
ret ());
}