diff --git a/src/client/config.ml b/src/client/config.ml index 2ecb2b8d..a3fb068e 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -6,6 +6,7 @@ type t = { url: string; batch_traces: int option; batch_metrics: int option; + batch_logs: int option; batch_timeout_ms: int; thread: bool; ticker_thread: bool; @@ -13,13 +14,13 @@ type t = { let pp out self = let ppiopt = Format.pp_print_option Format.pp_print_int in - let {debug; url; batch_traces; batch_metrics; + let {debug; url; batch_traces; batch_metrics; batch_logs; batch_timeout_ms; thread; ticker_thread} = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ \ - batch_traces=%a;@ batch_metrics=%a;@ \ + batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" - debug url ppiopt batch_traces ppiopt batch_metrics + debug url ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms thread ticker_thread let make @@ -27,9 +28,11 @@ let make ?(url= get_url()) ?(batch_traces=Some 400) ?(batch_metrics=None) + ?(batch_logs=Some 400) ?(batch_timeout_ms=500) ?(thread=true) ?(ticker_thread=true) () : t = - { debug; url; batch_traces; batch_metrics; batch_timeout_ms; + { debug; url; batch_traces; batch_metrics; batch_logs; + batch_timeout_ms; thread; ticker_thread; } diff --git a/src/client/config.mli b/src/client/config.mli index 892c91af..cfa501bd 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -22,6 +22,10 @@ type t = { Default [None]. *) + batch_logs : int option; + (** Batch logs? See {!batch_metrics} for details. + Default [Some 400] *) + batch_timeout_ms: int; (** Number of milliseconds after which we will emit a batch, even incomplete. @@ -42,6 +46,7 @@ val make : ?debug:bool -> ?url:string -> ?batch_traces:int option -> ?batch_metrics:int option -> + ?batch_logs:int option -> ?batch_timeout_ms:int -> ?thread:bool -> ?ticker_thread:bool -> diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 780ef6c2..1ae0272b 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -124,6 +124,7 @@ module type EMITTER = sig val push_trace : Trace.resource_spans list -> unit val push_metrics : Metrics.resource_metrics list -> unit + val push_logs : Logs.resource_logs list -> unit val tick : unit -> unit val cleanup : unit -> unit @@ -195,23 +196,19 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = mk_push ?batch:config.batch_traces () in let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = mk_push ?batch:config.batch_metrics () in + let ((module E_logs) : Logs.resource_logs list push), on_logs_full = + mk_push ?batch:config.batch_logs () in let encoder = Pbrt.Encoder.create() in let ((module C) as curl) = (module Curl() : CURL) in - let send_metrics_http (l:Metrics.resource_metrics list list) = + let send_http_ ~path ~encode x : unit = Pbrt.Encoder.reset encoder; - let resource_metrics = - List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Metrics_service.encode_export_metrics_service_request - (Metrics_service.default_export_metrics_service_request - ~resource_metrics ()) - encoder; + encode x encoder; let data = Pbrt.Encoder.to_string encoder in begin match - C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) - data + C.send ~path ~decode:(fun _ -> ()) data with | Ok () -> () | Error err -> @@ -221,23 +218,31 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = end; in + let send_metrics_http (l:Metrics.resource_metrics list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Metrics_service.default_export_metrics_service_request + ~resource_metrics:l () in + send_http_ ~path:"/v1/metrics" + ~encode:Metrics_service.encode_export_metrics_service_request x + in + let send_traces_http (l:Trace.resource_spans list list) = - Pbrt.Encoder.reset encoder; - let resource_spans = - List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Trace_service.encode_export_trace_service_request - (Trace_service.default_export_trace_service_request ~resource_spans ()) - encoder; - begin match - C.send ~path:"/v1/traces" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) - with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - end; + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Trace_service.default_export_trace_service_request + ~resource_spans:l () in + send_http_ ~path:"/v1/traces" + ~encode:Trace_service.encode_export_trace_service_request x + in + + let send_logs_http (l:Logs.resource_logs list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Logs_service.default_export_logs_service_request + ~resource_logs:l () in + send_http_ ~path:"/v1/logs" + ~encode:Logs_service.encode_export_logs_service_request x in let last_wakeup = Atomic.make (Mtime_clock.now()) in @@ -247,29 +252,28 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = Mtime.Span.compare elapsed timeout >= 0 in - let emit_metrics ?(force=false) () : bool = - if force || (not force && E_metrics.is_big_enough ()) then ( - let batch = ref [AList.pop_all gc_metrics] in - E_metrics.pop_iter_all (fun l -> batch := l :: !batch); + let emit_ (type a) (module P: PUSH with type elt = a list) + ?(init=fun() -> []) ?(force=false) ~send_http () : bool = + if force || (not force && P.is_big_enough ()) then ( + let batch = ref [init()] in + P.pop_iter_all (fun l -> batch := l :: !batch); let do_something = not (l_is_empty !batch) in if do_something then ( - send_metrics_http !batch; + send_http !batch; Atomic.set last_wakeup (Mtime_clock.now()); ); do_something ) else false in - let emit_traces ?(force=false) () : bool = - if force || (not force && E_trace.is_big_enough ()) then ( - let batch = ref [] in - E_trace.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_traces_http !batch; - Atomic.set last_wakeup (Mtime_clock.now()); - ); - do_something - ) else false + + let emit_metrics ?force () : bool = + emit_ (module E_metrics) + ~init:(fun () -> AList.pop_all gc_metrics) + ~send_http:send_metrics_http () + and emit_traces ?force () : bool = + emit_ (module E_trace) ~send_http:send_traces_http () + and emit_logs ?force () : bool = + emit_ (module E_logs) ~send_http:send_logs_http () in let[@inline] guard f = @@ -280,9 +284,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let emit_all_force () = - let@ () = guard in ignore (emit_traces ~force:true () : bool); ignore (emit_metrics ~force:true () : bool); + ignore (emit_logs ~force:true () : bool); in @@ -305,7 +309,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let do_metrics = emit_metrics ~force:timeout () in let do_traces = emit_traces ~force:timeout () in - if not do_metrics && not do_traces then ( + let do_logs = emit_logs ~force:timeout () in + if not do_metrics && not do_traces && not do_logs then ( (* wait *) let@ () = with_mutex_ m in Condition.wait cond m; @@ -314,8 +319,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = (* flush remaining events *) begin let@ () = guard in - ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); + emit_all_force(); C.cleanup(); end in @@ -354,6 +358,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let push_metrics e = E_metrics.push e; if batch_timeout() then wakeup() + let push_logs e = + E_logs.push e; + if batch_timeout() then wakeup() let tick=tick let cleanup () = continue := false; @@ -367,6 +374,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = ignore (emit_metrics () : bool)); on_trace_full (fun () -> ignore (emit_traces () : bool)); + on_logs_full (fun () -> + ignore (emit_logs () : bool)); let cleanup () = emit_all_force(); @@ -384,6 +393,11 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = E_metrics.push e; if batch_timeout() then emit_all_force() + let push_logs e = + let@() = guard in + E_logs.push e; + if batch_timeout() then emit_all_force() + let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics(); if batch_timeout() then emit_all_force() @@ -449,6 +463,14 @@ module Backend(Arg : sig val config : Config.t end)() push_metrics m; ret() } + + let send_logs : Logs.resource_logs list sender = { + send=fun m ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m; + push_logs m; + ret() + } end let setup_ ~(config:Config.t) () = diff --git a/src/dune b/src/dune index 706442e2..933ec2aa 100644 --- a/src/dune +++ b/src/dune @@ -90,3 +90,15 @@ (action (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ -ml_out . -pp -binary))) + +(rule + (targets logs_service_types.ml logs_service_types.mli + logs_service_pp.ml logs_service_pp.mli + logs_service_pb.ml logs_service_pb.mli) + (deps + (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action (run ocaml-protoc %{file} + -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) + diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 5d09e8c9..612315f4 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -46,6 +46,18 @@ module Proto = struct include Status_pp include Status_pb end + + module Logs = struct + include Logs_types + include Logs_pb + include Logs_pp + end + + module Logs_service = struct + include Logs_service_types + include Logs_service_pb + include Logs_service_pp + end end (** {2 Timestamps} *) @@ -100,6 +112,8 @@ module Collector = struct val send_metrics : Metrics.resource_metrics list sender + val send_logs : Logs.resource_logs list sender + val rand_bytes_16 : unit -> bytes (** Generate 16 bytes of random data *) @@ -135,6 +149,11 @@ module Collector = struct | None -> ret() | Some (module B) -> B.send_metrics.send l ~ret + let send_logs (l:Logs.resource_logs list) ~ret = + match !backend with + | None -> ret() + | Some (module B) -> B.send_logs.send l ~ret + let rand_bytes_16 () = match !backend with | None -> Bytes.make 16 '?' @@ -658,8 +677,98 @@ module Metrics = struct Collector.send_metrics [rm] ~ret:ignore end -module Logs = struct +(** {2 Logs} *) +(** Logs. + + See {{: https://opentelemetry.io/docs/reference/specification/overview/#log-signal} the spec} *) +module Logs = struct + open Logs_types + + type t = log_record + + (** Severity level of a log event *) + type severity = Logs_types.severity_number = + | Severity_number_unspecified + | Severity_number_trace + | Severity_number_trace2 + | Severity_number_trace3 + | Severity_number_trace4 + | Severity_number_debug + | Severity_number_debug2 + | Severity_number_debug3 + | Severity_number_debug4 + | Severity_number_info + | Severity_number_info2 + | Severity_number_info3 + | Severity_number_info4 + | Severity_number_warn + | Severity_number_warn2 + | Severity_number_warn3 + | Severity_number_warn4 + | Severity_number_error + | Severity_number_error2 + | Severity_number_error3 + | Severity_number_error4 + | Severity_number_fatal + | Severity_number_fatal2 + | Severity_number_fatal3 + | Severity_number_fatal4 + + let pp_severity = Logs_pp.pp_severity_number + + type flags = Logs_types.log_record_flags = + | Log_record_flag_unspecified + | Log_record_flag_trace_flags_mask + + let pp_flags = Logs_pp.pp_log_record_flags + + (** Make a single log entry *) + let make + ?time ?(observed_time_unix_nano=Timestamp_ns.now_unix_ns()) + ?severity ?log_level ?flags ?trace_id ?span_id + (body:value) : t = + let time_unix_nano = match time with + | None -> observed_time_unix_nano + | Some t -> t + in + let body = _conv_value body in + default_log_record + ~time_unix_nano ~observed_time_unix_nano + ?severity_number:severity ?severity_text:log_level + ?flags ?trace_id ?span_id ~body () + + (** Make a log entry whose body is a string *) + let make_str + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id + (body:string) : t = + make + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id + (`String body) + + (** Make a log entry with format *) + let make_strf + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id + fmt = + Format.kasprintf + (fun bod -> + make_str + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id bod) + fmt + + let emit ?service_name ?attrs (l:t list) : unit = + let attributes = Globals.mk_attributes ?service_name ?attrs () in + let resource = Proto.Resource.default_resource ~attributes () in + let ll = default_instrumentation_library_logs + ~instrumentation_library:(Some Globals.instrumentation_library) + ~log_records:l () in + let rl = default_resource_logs ~resource:(Some resource) + ~instrumentation_library_logs:[ll] () in + Collector.send_logs [rl] ~ret:ignore end (** {2 Utils} *) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index c7adf5f6..e57e8381 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -36,6 +36,8 @@ let run () = Unix.sleepf !sleep_inner; if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) + T.Logs.(emit [make @@ `String "log"]); + T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); with Failure _ -> ());