feat: implement basic support for logs

also send them to the collector.
This commit is contained in:
Simon Cruanes 2022-04-15 21:35:42 -04:00
parent 64df05d010
commit 1bedb57123
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 203 additions and 50 deletions

View file

@ -6,6 +6,7 @@ type t = {
url: string;
batch_traces: int option;
batch_metrics: int option;
batch_logs: int option;
batch_timeout_ms: int;
thread: bool;
ticker_thread: bool;
@ -13,13 +14,13 @@ type t = {
let pp out self =
let ppiopt = Format.pp_print_option Format.pp_print_int in
let {debug; url; batch_traces; batch_metrics;
let {debug; url; batch_traces; batch_metrics; batch_logs;
batch_timeout_ms; thread; ticker_thread} = self in
Format.fprintf out
"{@[ debug=%B;@ url=%S;@ \
batch_traces=%a;@ batch_metrics=%a;@ \
batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \
batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}"
debug url ppiopt batch_traces ppiopt batch_metrics
debug url ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs
batch_timeout_ms thread ticker_thread
let make
@ -27,9 +28,11 @@ let make
?(url= get_url())
?(batch_traces=Some 400)
?(batch_metrics=None)
?(batch_logs=Some 400)
?(batch_timeout_ms=500)
?(thread=true)
?(ticker_thread=true)
() : t =
{ debug; url; batch_traces; batch_metrics; batch_timeout_ms;
{ debug; url; batch_traces; batch_metrics; batch_logs;
batch_timeout_ms;
thread; ticker_thread; }

View file

@ -22,6 +22,10 @@ type t = {
Default [None].
*)
batch_logs : int option;
(** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
@ -42,6 +46,7 @@ val make :
?debug:bool -> ?url:string ->
?batch_traces:int option ->
?batch_metrics:int option ->
?batch_logs:int option ->
?batch_timeout_ms:int ->
?thread:bool ->
?ticker_thread:bool ->

View file

@ -124,6 +124,7 @@ module type EMITTER = sig
val push_trace : Trace.resource_spans list -> unit
val push_metrics : Metrics.resource_metrics list -> unit
val push_logs : Logs.resource_logs list -> unit
val tick : unit -> unit
val cleanup : unit -> unit
@ -195,23 +196,19 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
mk_push ?batch:config.batch_traces () in
let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full =
mk_push ?batch:config.batch_metrics () in
let ((module E_logs) : Logs.resource_logs list push), on_logs_full =
mk_push ?batch:config.batch_logs () in
let encoder = Pbrt.Encoder.create() in
let ((module C) as curl) = (module Curl() : CURL) in
let send_metrics_http (l:Metrics.resource_metrics list list) =
let send_http_ ~path ~encode x : unit =
Pbrt.Encoder.reset encoder;
let resource_metrics =
List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Metrics_service.encode_export_metrics_service_request
(Metrics_service.default_export_metrics_service_request
~resource_metrics ())
encoder;
encode x encoder;
let data = Pbrt.Encoder.to_string encoder in
begin match
C.send ~path:"/v1/metrics" ~decode:(fun _ -> ())
data
C.send ~path ~decode:(fun _ -> ()) data
with
| Ok () -> ()
| Error err ->
@ -221,23 +218,31 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
end;
in
let send_metrics_http (l:Metrics.resource_metrics list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Metrics_service.default_export_metrics_service_request
~resource_metrics:l () in
send_http_ ~path:"/v1/metrics"
~encode:Metrics_service.encode_export_metrics_service_request x
in
let send_traces_http (l:Trace.resource_spans list list) =
Pbrt.Encoder.reset encoder;
let resource_spans =
List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Trace_service.encode_export_trace_service_request
(Trace_service.default_export_trace_service_request ~resource_spans ())
encoder;
begin match
C.send ~path:"/v1/traces" ~decode:(fun _ -> ())
(Pbrt.Encoder.to_string encoder)
with
| Ok () -> ()
| Error err ->
(* TODO: log error _via_ otel? *)
Atomic.incr n_errors;
report_err_ err
end;
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Trace_service.default_export_trace_service_request
~resource_spans:l () in
send_http_ ~path:"/v1/traces"
~encode:Trace_service.encode_export_trace_service_request x
in
let send_logs_http (l:Logs.resource_logs list list) =
let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
let x =
Logs_service.default_export_logs_service_request
~resource_logs:l () in
send_http_ ~path:"/v1/logs"
~encode:Logs_service.encode_export_logs_service_request x
in
let last_wakeup = Atomic.make (Mtime_clock.now()) in
@ -247,29 +252,28 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
Mtime.Span.compare elapsed timeout >= 0
in
let emit_metrics ?(force=false) () : bool =
if force || (not force && E_metrics.is_big_enough ()) then (
let batch = ref [AList.pop_all gc_metrics] in
E_metrics.pop_iter_all (fun l -> batch := l :: !batch);
let emit_ (type a) (module P: PUSH with type elt = a list)
?(init=fun() -> []) ?(force=false) ~send_http () : bool =
if force || (not force && P.is_big_enough ()) then (
let batch = ref [init()] in
P.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_metrics_http !batch;
send_http !batch;
Atomic.set last_wakeup (Mtime_clock.now());
);
do_something
) else false
in
let emit_traces ?(force=false) () : bool =
if force || (not force && E_trace.is_big_enough ()) then (
let batch = ref [] in
E_trace.pop_iter_all (fun l -> batch := l :: !batch);
let do_something = not (l_is_empty !batch) in
if do_something then (
send_traces_http !batch;
Atomic.set last_wakeup (Mtime_clock.now());
);
do_something
) else false
let emit_metrics ?force () : bool =
emit_ (module E_metrics)
~init:(fun () -> AList.pop_all gc_metrics)
~send_http:send_metrics_http ()
and emit_traces ?force () : bool =
emit_ (module E_trace) ~send_http:send_traces_http ()
and emit_logs ?force () : bool =
emit_ (module E_logs) ~send_http:send_logs_http ()
in
let[@inline] guard f =
@ -280,9 +284,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
in
let emit_all_force () =
let@ () = guard in
ignore (emit_traces ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool);
ignore (emit_logs ~force:true () : bool);
in
@ -305,7 +309,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let do_metrics = emit_metrics ~force:timeout () in
let do_traces = emit_traces ~force:timeout () in
if not do_metrics && not do_traces then (
let do_logs = emit_logs ~force:timeout () in
if not do_metrics && not do_traces && not do_logs then (
(* wait *)
let@ () = with_mutex_ m in
Condition.wait cond m;
@ -314,8 +319,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
(* flush remaining events *)
begin
let@ () = guard in
ignore (emit_traces ~force:true () : bool);
ignore (emit_metrics ~force:true () : bool);
emit_all_force();
C.cleanup();
end
in
@ -354,6 +358,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
let push_metrics e =
E_metrics.push e;
if batch_timeout() then wakeup()
let push_logs e =
E_logs.push e;
if batch_timeout() then wakeup()
let tick=tick
let cleanup () =
continue := false;
@ -367,6 +374,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
ignore (emit_metrics () : bool));
on_trace_full (fun () ->
ignore (emit_traces () : bool));
on_logs_full (fun () ->
ignore (emit_logs () : bool));
let cleanup () =
emit_all_force();
@ -384,6 +393,11 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) =
E_metrics.push e;
if batch_timeout() then emit_all_force()
let push_logs e =
let@() = guard in
E_logs.push e;
if batch_timeout() then emit_all_force()
let tick () =
if Atomic.get needs_gc_metrics then sample_gc_metrics();
if batch_timeout() then emit_all_force()
@ -449,6 +463,14 @@ module Backend(Arg : sig val config : Config.t end)()
push_metrics m;
ret()
}
let send_logs : Logs.resource_logs list sender = {
send=fun m ~ret ->
let@() = with_lock_ in
if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m;
push_logs m;
ret()
}
end
let setup_ ~(config:Config.t) () =

View file

@ -90,3 +90,15 @@
(action (run ocaml-protoc %{file}
-I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary)))
(rule
(targets logs_service_types.ml logs_service_types.mli
logs_service_pp.ml logs_service_pp.mli
logs_service_pb.ml logs_service_pb.mli)
(deps
(:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto)
(source_tree %{project_root}/vendor/opentelemetry-proto/))
(action (run ocaml-protoc %{file}
-I %{project_root}/vendor/opentelemetry-proto/
-ml_out . -pp -binary)))

View file

@ -46,6 +46,18 @@ module Proto = struct
include Status_pp
include Status_pb
end
module Logs = struct
include Logs_types
include Logs_pb
include Logs_pp
end
module Logs_service = struct
include Logs_service_types
include Logs_service_pb
include Logs_service_pp
end
end
(** {2 Timestamps} *)
@ -100,6 +112,8 @@ module Collector = struct
val send_metrics : Metrics.resource_metrics list sender
val send_logs : Logs.resource_logs list sender
val rand_bytes_16 : unit -> bytes
(** Generate 16 bytes of random data *)
@ -135,6 +149,11 @@ module Collector = struct
| None -> ret()
| Some (module B) -> B.send_metrics.send l ~ret
let send_logs (l:Logs.resource_logs list) ~ret =
match !backend with
| None -> ret()
| Some (module B) -> B.send_logs.send l ~ret
let rand_bytes_16 () =
match !backend with
| None -> Bytes.make 16 '?'
@ -658,8 +677,98 @@ module Metrics = struct
Collector.send_metrics [rm] ~ret:ignore
end
module Logs = struct
(** {2 Logs} *)
(** Logs.
See {{: https://opentelemetry.io/docs/reference/specification/overview/#log-signal} the spec} *)
module Logs = struct
open Logs_types
type t = log_record
(** Severity level of a log event *)
type severity = Logs_types.severity_number =
| Severity_number_unspecified
| Severity_number_trace
| Severity_number_trace2
| Severity_number_trace3
| Severity_number_trace4
| Severity_number_debug
| Severity_number_debug2
| Severity_number_debug3
| Severity_number_debug4
| Severity_number_info
| Severity_number_info2
| Severity_number_info3
| Severity_number_info4
| Severity_number_warn
| Severity_number_warn2
| Severity_number_warn3
| Severity_number_warn4
| Severity_number_error
| Severity_number_error2
| Severity_number_error3
| Severity_number_error4
| Severity_number_fatal
| Severity_number_fatal2
| Severity_number_fatal3
| Severity_number_fatal4
let pp_severity = Logs_pp.pp_severity_number
type flags = Logs_types.log_record_flags =
| Log_record_flag_unspecified
| Log_record_flag_trace_flags_mask
let pp_flags = Logs_pp.pp_log_record_flags
(** Make a single log entry *)
let make
?time ?(observed_time_unix_nano=Timestamp_ns.now_unix_ns())
?severity ?log_level ?flags ?trace_id ?span_id
(body:value) : t =
let time_unix_nano = match time with
| None -> observed_time_unix_nano
| Some t -> t
in
let body = _conv_value body in
default_log_record
~time_unix_nano ~observed_time_unix_nano
?severity_number:severity ?severity_text:log_level
?flags ?trace_id ?span_id ~body ()
(** Make a log entry whose body is a string *)
let make_str
?time ?observed_time_unix_nano
?severity ?log_level ?flags ?trace_id ?span_id
(body:string) : t =
make
?time ?observed_time_unix_nano
?severity ?log_level ?flags ?trace_id ?span_id
(`String body)
(** Make a log entry with format *)
let make_strf
?time ?observed_time_unix_nano
?severity ?log_level ?flags ?trace_id ?span_id
fmt =
Format.kasprintf
(fun bod ->
make_str
?time ?observed_time_unix_nano
?severity ?log_level ?flags ?trace_id ?span_id bod)
fmt
let emit ?service_name ?attrs (l:t list) : unit =
let attributes = Globals.mk_attributes ?service_name ?attrs () in
let resource = Proto.Resource.default_resource ~attributes () in
let ll = default_instrumentation_library_logs
~instrumentation_library:(Some Globals.instrumentation_library)
~log_records:l () in
let rl = default_resource_logs ~resource:(Some resource)
~instrumentation_library_logs:[ll] () in
Collector.send_logs [rl] ~ret:ignore
end
(** {2 Utils} *)

View file

@ -36,6 +36,8 @@ let run () =
Unix.sleepf !sleep_inner;
if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *)
T.Logs.(emit [make @@ `String "log"]);
T.Trace.add_event scope (fun()->T.Event.make "done with alloc");
with Failure _ ->
());