Merge pull request #49 from imandra-ai/wip-perf-2023-12-20

perf and features
This commit is contained in:
Simon Cruanes 2023-12-21 08:51:13 -05:00 committed by GitHub
commit b95eb21282
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 155 additions and 53 deletions

View file

@ -257,7 +257,7 @@ module type EMITTER = sig
val push_logs : Logs.resource_logs list -> unit val push_logs : Logs.resource_logs list -> unit
val set_on_tick_callbacks : (unit -> unit) list ref -> unit val set_on_tick_callbacks : (unit -> unit) AList.t -> unit
val tick : unit -> unit val tick : unit -> unit
@ -288,7 +288,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let batch_logs : Logs.resource_logs list Batch.t = let batch_logs : Logs.resource_logs list Batch.t =
Batch.make ?batch:config.batch_logs ?timeout () Batch.make ?batch:config.batch_logs ?timeout ()
let on_tick_cbs_ = Atomic.make (ref []) let on_tick_cbs_ = Atomic.make (AList.make ())
let set_on_tick_callbacks = Atomic.set on_tick_cbs_ let set_on_tick_callbacks = Atomic.set on_tick_cbs_
@ -384,7 +384,7 @@ let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
with e -> with e ->
Printf.eprintf "on tick callback raised: %s\n" Printf.eprintf "on tick callback raised: %s\n"
(Printexc.to_string e)) (Printexc.to_string e))
!(Atomic.get on_tick_cbs_); (AList.get @@ Atomic.get on_tick_cbs_);
() ()
(* thread that calls [tick()] regularly, to help enforce timeouts *) (* thread that calls [tick()] regularly, to help enforce timeouts *)

View file

@ -7,20 +7,42 @@ type t = {
batch_timeout_ms: int; batch_timeout_ms: int;
bg_threads: int; bg_threads: int;
ticker_thread: bool; ticker_thread: bool;
ticker_interval_ms: int;
self_trace: bool;
} }
let pp out self = let pp out self =
let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in let pp_header ppf (a, b) = Format.fprintf ppf "@[%s: @,%s@]@." a b in
let ppheaders = Format.pp_print_list pp_header in let ppheaders = Format.pp_print_list pp_header in
let { debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } = let {
debug;
url;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
} =
self self
in in
Format.fprintf out Format.fprintf out
"{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \ "{@[ debug=%B;@ url=%S;@ headers=%a;@ batch_timeout_ms=%d; bg_threads=%d;@ \
ticker_thread=%B @]}" ticker_thread=%B;@ ticker_interval_ms=%d;@ self_trace=%B @]}"
debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread debug url ppheaders headers batch_timeout_ms bg_threads ticker_thread
ticker_interval_ms self_trace
let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ()) let make ?(debug = !debug_) ?(url = get_url ()) ?(headers = get_headers ())
?(batch_timeout_ms = 500) ?(bg_threads = 4) ?(ticker_thread = true) () : t = ?(batch_timeout_ms = 2_000) ?(bg_threads = 4) ?(ticker_thread = true)
let bg_threads = max 2 (min bg_threads 32) in ?(ticker_interval_ms = 500) ?(self_trace = false) () : t =
{ debug; url; headers; batch_timeout_ms; bg_threads; ticker_thread } let bg_threads = max 1 (min bg_threads 32) in
{
debug;
url;
headers;
batch_timeout_ms;
bg_threads;
ticker_thread;
ticker_interval_ms;
self_trace;
}

View file

@ -12,12 +12,24 @@ type t = private {
(** Number of milliseconds after which we will emit a batch, even (** Number of milliseconds after which we will emit a batch, even
incomplete. incomplete.
Note that the batch might take longer than that, because this is Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *) only checked when a new event occurs or when a tick
is emitted. Default 2_000. *)
bg_threads: int; bg_threads: int;
(** Are there background threads, and how many? Default [4] *) (** Are there background threads, and how many? Default [4].
This will be adjusted to be at least [1] and at most [32]. *)
ticker_thread: bool; ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should (** If true, start a thread that regularly checks if signals should
be sent to the collector. Default [true] *) be sent to the collector. Default [true] *)
ticker_interval_ms: int;
(** Interval for ticker thread, in milliseconds. This is
only useful if [ticker_thread] is [true].
This will be clamped between [2 ms] and some longer
interval (maximum [60s] currently).
Default 500.
@since NEXT_RELEASE *)
self_trace: bool;
(** If true, the OTEL library will also emit its own spans. Default [false].
@since NEXT_RELEASE *)
} }
(** Configuration. (** Configuration.
@ -31,6 +43,8 @@ val make :
?batch_timeout_ms:int -> ?batch_timeout_ms:int ->
?bg_threads:int -> ?bg_threads:int ->
?ticker_thread:bool -> ?ticker_thread:bool ->
?ticker_interval_ms:int ->
?self_trace:bool ->
unit -> unit ->
t t
(** Make a configuration. (** Make a configuration.

View file

@ -17,6 +17,33 @@ let timeout_gc_metrics = Mtime.Span.(20 * s)
(** side channel for GC, appended to metrics batch data *) (** side channel for GC, appended to metrics batch data *)
let gc_metrics = AList.make () let gc_metrics = AList.make ()
(** Mini tracing module (disabled if [config.self_trace=false]) *)
module Self_trace = struct
let enabled = Atomic.make true
let add_event (scope : Scope.t) ev = scope.events <- ev :: scope.events
let dummy_trace_id_ = Trace_id.create ()
let dummy_span_id = Span_id.create ()
let with_ ?kind ?attrs name f =
if Atomic.get enabled then
Opentelemetry.Trace.with_ ?kind ?attrs name f
else (
(* do nothing *)
let scope =
{
Scope.trace_id = dummy_trace_id_;
span_id = dummy_span_id;
attrs = [];
events = [];
}
in
f scope
)
end
(** capture current GC metrics if {!needs_gc_metrics} is true (** capture current GC metrics if {!needs_gc_metrics} is true
or it has been a long time since the last GC metrics collection, or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *) and push them into {!gc_metrics} for later collection *)
@ -120,9 +147,19 @@ end = struct
let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit let send_http_ ~stop ~config (client : Curl.t) encoder ~path ~encode x : unit
= =
Pbrt.Encoder.reset encoder; let@ _sc =
encode x encoder; Self_trace.with_ ~kind:Span.Span_kind_producer "otel-ocurl.send-http"
let data = Pbrt.Encoder.to_string encoder in in
let data =
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "encode-proto"
in
Pbrt.Encoder.reset encoder;
encode x encoder;
Pbrt.Encoder.to_string encoder
in
let url = let url =
let url = config.Config.url in let url = config.Config.url in
if url <> "" && String.get url (String.length url - 1) = '/' then if url <> "" && String.get url (String.length url - 1) = '/' then
@ -138,11 +175,18 @@ end = struct
("Content-Type", "application/x-protobuf") :: config.headers ("Content-Type", "application/x-protobuf") :: config.headers
in in
match match
let@ _sc =
Self_trace.with_ ~kind:Span.Span_kind_internal "curl.post"
~attrs:[ "sz", `Int (String.length data); "url", `String url ]
in
Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) () Ezcurl.post ~headers ~client ~params:[] ~url ~content:(`String data) ()
with with
| Ok { code; _ } when code >= 200 && code < 300 -> () | Ok { code; _ } when code >= 200 && code < 300 -> ()
| Ok { code; body; headers = _; info = _ } -> | Ok { code; body; headers = _; info = _ } ->
Atomic.incr n_errors; Atomic.incr n_errors;
Self_trace.add_event _sc
@@ Opentelemetry.Event.make "error" ~attrs:[ "code", `Int code ];
if !debug_ || config.debug then ( if !debug_ || config.debug then (
let dec = Pbrt.Decoder.of_string body in let dec = Pbrt.Decoder.of_string body in
let body = let body =
@ -171,6 +215,11 @@ end = struct
let send_logs_http ~stop ~config (client : Curl.t) encoder let send_logs_http ~stop ~config (client : Curl.t) encoder
(l : Logs.resource_logs list list) : unit = (l : Logs.resource_logs list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-logs"
~attrs:[ "n", `Int (List.length l) ]
in
let x = let x =
Logs_service.default_export_logs_service_request ~resource_logs:l () Logs_service.default_export_logs_service_request ~resource_logs:l ()
in in
@ -180,6 +229,11 @@ end = struct
let send_metrics_http ~stop ~config curl encoder let send_metrics_http ~stop ~config curl encoder
(l : Metrics.resource_metrics list list) : unit = (l : Metrics.resource_metrics list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-metrics"
~attrs:[ "n", `Int (List.length l) ]
in
let x = let x =
Metrics_service.default_export_metrics_service_request ~resource_metrics:l Metrics_service.default_export_metrics_service_request ~resource_metrics:l
() ()
@ -190,6 +244,11 @@ end = struct
let send_traces_http ~stop ~config curl encoder let send_traces_http ~stop ~config curl encoder
(l : Trace.resource_spans list list) : unit = (l : Trace.resource_spans list list) : unit =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let@ _sp =
Self_trace.with_ ~kind:Span_kind_producer "send-traces"
~attrs:[ "n", `Int (List.length l) ]
in
let x = let x =
Trace_service.default_export_trace_service_request ~resource_spans:l () Trace_service.default_export_trace_service_request ~resource_spans:l ()
in in
@ -248,8 +307,8 @@ end = struct
in in
let send_metrics () = let send_metrics () =
B_queue.push self.send_q let metrics = AList.pop_all gc_metrics :: Batch.pop_all batches.metrics in
(To_send.Send_metric (Batch.pop_all batches.metrics)) B_queue.push self.send_q (To_send.Send_metric metrics)
in in
let send_logs () = let send_logs () =
@ -266,39 +325,36 @@ end = struct
(* read multiple events at once *) (* read multiple events at once *)
B_queue.pop_all self.q local_q; B_queue.pop_all self.q local_q;
let now = Mtime_clock.now () in (* are we asked to flush all events? *)
let must_flush_all = ref false in
(* how to process a single event *) (* how to process a single event *)
let process_ev (ev : Event.t) : unit = let process_ev (ev : Event.t) : unit =
match ev with match ev with
| Event.E_metric m -> | Event.E_metric m -> Batch.push batches.metrics m
Batch.push batches.metrics m; | Event.E_trace tr -> Batch.push batches.traces tr
if should_send_batch_ ~config ~now batches.metrics then | Event.E_logs logs -> Batch.push batches.logs logs
send_metrics ()
| Event.E_trace tr ->
Batch.push batches.traces tr;
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_logs logs ->
Batch.push batches.logs logs;
if should_send_batch_ ~config ~now batches.logs then send_logs ()
| Event.E_tick -> | Event.E_tick ->
(* check for batches whose timeout expired *) (* the only impact of "tick" is that it wakes us up regularly *)
if should_send_batch_ ~config ~now batches.metrics then ()
send_metrics (); | Event.E_flush_all -> must_flush_all := true
if should_send_batch_ ~config ~now batches.logs then send_logs ();
if should_send_batch_ ~config ~now batches.traces then
send_traces ()
| Event.E_flush_all ->
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
in in
while not (Queue.is_empty local_q) do Queue.iter process_ev local_q;
let ev = Queue.pop local_q in Queue.clear local_q;
process_ev ev
done if !must_flush_all then (
if Batch.len batches.metrics > 0 then send_metrics ();
if Batch.len batches.logs > 0 then send_logs ();
if Batch.len batches.traces > 0 then send_traces ()
) else (
let now = Mtime_clock.now () in
if should_send_batch_ ~config ~now batches.metrics then
send_metrics ();
if should_send_batch_ ~config ~now batches.traces then send_traces ();
if should_send_batch_ ~config ~now batches.logs then send_logs ()
)
done done
with B_queue.Closed -> () with B_queue.Closed -> ()
@ -418,15 +474,14 @@ let create_backend ?(stop = Atomic.make false)
ret ()); ret ());
} }
let on_tick_cbs_ = Atomic.make (ref []) let on_tick_cbs_ = Atomic.make (AList.make ())
let set_on_tick_callbacks = Atomic.set on_tick_cbs_ let set_on_tick_callbacks = Atomic.set on_tick_cbs_
let tick () = let tick () =
sample_gc_metrics_if_needed (); sample_gc_metrics_if_needed ();
Backend_impl.send_event backend Event.E_tick; Backend_impl.send_event backend Event.E_tick;
let l = Atomic.get on_tick_cbs_ in List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_)
List.iter (fun f -> f ()) !l
let cleanup () = Backend_impl.shutdown backend let cleanup () = Backend_impl.shutdown backend
end in end in
@ -451,9 +506,11 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
Opentelemetry.Collector.set_backend backend; Opentelemetry.Collector.set_backend backend;
if config.url <> get_url () then set_url config.url; if config.url <> get_url () then set_url config.url;
Atomic.set Self_trace.enabled config.self_trace;
if config.ticker_thread then ( if config.ticker_thread then (
let sleep_ms = min 5_000 (max 2 config.batch_timeout_ms) in (* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
); );

View file

@ -4,6 +4,8 @@ type 'a t = 'a list Atomic.t
let make () = Atomic.make [] let make () = Atomic.make []
let get = Atomic.get
let add self x = let add self x =
while while
let old = Atomic.get self in let old = Atomic.get self in

View file

@ -2,6 +2,9 @@
type 'a t type 'a t
val get : 'a t -> 'a list
(** Snapshot *)
val make : unit -> 'a t val make : unit -> 'a t
val add : 'a t -> 'a -> unit val add : 'a t -> 'a -> unit

View file

@ -6,6 +6,10 @@ module Lock = Lock
module Rand_bytes = Rand_bytes module Rand_bytes = Rand_bytes
(** Generation of random identifiers. *) (** Generation of random identifiers. *)
module AList = AList
(** Atomic list, for internal usage
@since NEXT_RELEASE *)
open struct open struct
let[@inline] result_bind x f = let[@inline] result_bind x f =
match x with match x with
@ -82,7 +86,7 @@ module Collector = struct
(** Should be called regularly for background processing, (** Should be called regularly for background processing,
timeout checks, etc. *) timeout checks, etc. *)
val set_on_tick_callbacks : (unit -> unit) list ref -> unit val set_on_tick_callbacks : (unit -> unit) AList.t -> unit
(** Give the collector the list of callbacks to be executed (** Give the collector the list of callbacks to be executed
when [tick()] is called. Each such callback should be short and when [tick()] is called. Each such callback should be short and
reentrant. Depending on the collector's implementation, it might be reentrant. Depending on the collector's implementation, it might be
@ -158,7 +162,7 @@ module Collector = struct
(* hidden *) (* hidden *)
open struct open struct
let on_tick_cbs_ = ref [] let on_tick_cbs_ = AList.make ()
let backend : backend option ref = ref None let backend : backend option ref = ref None
end end
@ -194,7 +198,7 @@ module Collector = struct
let[@inline] rand_bytes_8 () = !Rand_bytes.rand_bytes_8 () let[@inline] rand_bytes_8 () = !Rand_bytes.rand_bytes_8 ()
let on_tick f = on_tick_cbs_ := f :: !on_tick_cbs_ let[@inline] on_tick f = AList.add on_tick_cbs_ f
(** Do background work. Call this regularly if the collector doesn't (** Do background work. Call this regularly if the collector doesn't
already have a ticker thread or internal timer. *) already have a ticker thread or internal timer. *)
@ -1134,7 +1138,8 @@ end
These metrics are emitted after each GC collection. *) These metrics are emitted after each GC collection. *)
module GC_metrics : sig module GC_metrics : sig
val basic_setup : unit -> unit val basic_setup : unit -> unit
(** Setup a hook that will emit GC statistics regularly *) (** Setup a hook that will emit GC statistics on every tick (assuming
a ticker thread) *)
val get_runtime_attributes : unit -> Span.key_value list val get_runtime_attributes : unit -> Span.key_value list
(** Get OCaml name and version runtime attributes *) (** Get OCaml name and version runtime attributes *)
@ -1154,13 +1159,12 @@ end = struct
let get_runtime_attributes () = Lazy.force runtime_attributes let get_runtime_attributes () = Lazy.force runtime_attributes
let basic_setup () = let basic_setup () =
(* emit metrics when GC is called *) let on_tick () =
let on_gc () =
match Collector.get_backend () with match Collector.get_backend () with
| None -> () | None -> ()
| Some (module C) -> C.signal_emit_gc_metrics () | Some (module C) -> C.signal_emit_gc_metrics ()
in in
ignore (Gc.create_alarm on_gc : Gc.alarm) Collector.on_tick on_tick
let bytes_per_word = Sys.word_size / 8 let bytes_per_word = Sys.word_size / 8

View file

@ -124,7 +124,7 @@ let () =
Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; Arg.parse opts (fun _ -> ()) "emit1 [opt]*";
let config = let config =
Opentelemetry_client_ocurl.Config.make ~debug:!debug Opentelemetry_client_ocurl.Config.make ~debug:!debug ~self_trace:true
?bg_threads: ?bg_threads:
(let n = !n_bg_threads in (let n = !n_bg_threads in
if n = 0 then if n = 0 then