From 64df05d01023d314681d93f6e54acddc1d0f1a3e Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 15 Apr 2022 17:52:34 -0400 Subject: [PATCH 1/6] wip: support logs in the API --- src/dune | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/dune b/src/dune index edf2cdf6..706442e2 100644 --- a/src/dune +++ b/src/dune @@ -59,6 +59,17 @@ -I %{project_root}/vendor/opentelemetry-proto/ -ml_out . -pp -binary))) +(rule + (targets logs_types.ml logs_types.mli + logs_pb.ml logs_pb.mli + logs_pp.ml logs_pp.mli) + (deps + (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action (run ocaml-protoc %{file} + -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) + (rule (targets metrics_service_types.ml metrics_service_types.mli metrics_service_pp.ml metrics_service_pp.mli From 1bedb57123f15db0faad91d92ae4f07b7c9a1b64 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 15 Apr 2022 21:35:42 -0400 Subject: [PATCH 2/6] feat: implement basic support for logs also send them to the collector. --- src/client/config.ml | 11 ++- src/client/config.mli | 5 + src/client/opentelemetry_client_ocurl.ml | 112 ++++++++++++++--------- src/dune | 12 +++ src/opentelemetry.ml | 111 +++++++++++++++++++++- tests/bin/emit1.ml | 2 + 6 files changed, 203 insertions(+), 50 deletions(-) diff --git a/src/client/config.ml b/src/client/config.ml index 2ecb2b8d..a3fb068e 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -6,6 +6,7 @@ type t = { url: string; batch_traces: int option; batch_metrics: int option; + batch_logs: int option; batch_timeout_ms: int; thread: bool; ticker_thread: bool; @@ -13,13 +14,13 @@ type t = { let pp out self = let ppiopt = Format.pp_print_option Format.pp_print_int in - let {debug; url; batch_traces; batch_metrics; + let {debug; url; batch_traces; batch_metrics; batch_logs; batch_timeout_ms; thread; ticker_thread} = self in Format.fprintf out "{@[ debug=%B;@ url=%S;@ \ - batch_traces=%a;@ batch_metrics=%a;@ \ + batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" - debug url ppiopt batch_traces ppiopt batch_metrics + debug url ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms thread ticker_thread let make @@ -27,9 +28,11 @@ let make ?(url= get_url()) ?(batch_traces=Some 400) ?(batch_metrics=None) + ?(batch_logs=Some 400) ?(batch_timeout_ms=500) ?(thread=true) ?(ticker_thread=true) () : t = - { debug; url; batch_traces; batch_metrics; batch_timeout_ms; + { debug; url; batch_traces; batch_metrics; batch_logs; + batch_timeout_ms; thread; ticker_thread; } diff --git a/src/client/config.mli b/src/client/config.mli index 892c91af..cfa501bd 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -22,6 +22,10 @@ type t = { Default [None]. *) + batch_logs : int option; + (** Batch logs? See {!batch_metrics} for details. + Default [Some 400] *) + batch_timeout_ms: int; (** Number of milliseconds after which we will emit a batch, even incomplete. @@ -42,6 +46,7 @@ val make : ?debug:bool -> ?url:string -> ?batch_traces:int option -> ?batch_metrics:int option -> + ?batch_logs:int option -> ?batch_timeout_ms:int -> ?thread:bool -> ?ticker_thread:bool -> diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 780ef6c2..1ae0272b 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -124,6 +124,7 @@ module type EMITTER = sig val push_trace : Trace.resource_spans list -> unit val push_metrics : Metrics.resource_metrics list -> unit + val push_logs : Logs.resource_logs list -> unit val tick : unit -> unit val cleanup : unit -> unit @@ -195,23 +196,19 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = mk_push ?batch:config.batch_traces () in let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = mk_push ?batch:config.batch_metrics () in + let ((module E_logs) : Logs.resource_logs list push), on_logs_full = + mk_push ?batch:config.batch_logs () in let encoder = Pbrt.Encoder.create() in let ((module C) as curl) = (module Curl() : CURL) in - let send_metrics_http (l:Metrics.resource_metrics list list) = + let send_http_ ~path ~encode x : unit = Pbrt.Encoder.reset encoder; - let resource_metrics = - List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Metrics_service.encode_export_metrics_service_request - (Metrics_service.default_export_metrics_service_request - ~resource_metrics ()) - encoder; + encode x encoder; let data = Pbrt.Encoder.to_string encoder in begin match - C.send ~path:"/v1/metrics" ~decode:(fun _ -> ()) - data + C.send ~path ~decode:(fun _ -> ()) data with | Ok () -> () | Error err -> @@ -221,23 +218,31 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = end; in + let send_metrics_http (l:Metrics.resource_metrics list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Metrics_service.default_export_metrics_service_request + ~resource_metrics:l () in + send_http_ ~path:"/v1/metrics" + ~encode:Metrics_service.encode_export_metrics_service_request x + in + let send_traces_http (l:Trace.resource_spans list list) = - Pbrt.Encoder.reset encoder; - let resource_spans = - List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Trace_service.encode_export_trace_service_request - (Trace_service.default_export_trace_service_request ~resource_spans ()) - encoder; - begin match - C.send ~path:"/v1/traces" ~decode:(fun _ -> ()) - (Pbrt.Encoder.to_string encoder) - with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - end; + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Trace_service.default_export_trace_service_request + ~resource_spans:l () in + send_http_ ~path:"/v1/traces" + ~encode:Trace_service.encode_export_trace_service_request x + in + + let send_logs_http (l:Logs.resource_logs list list) = + let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in + let x = + Logs_service.default_export_logs_service_request + ~resource_logs:l () in + send_http_ ~path:"/v1/logs" + ~encode:Logs_service.encode_export_logs_service_request x in let last_wakeup = Atomic.make (Mtime_clock.now()) in @@ -247,29 +252,28 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = Mtime.Span.compare elapsed timeout >= 0 in - let emit_metrics ?(force=false) () : bool = - if force || (not force && E_metrics.is_big_enough ()) then ( - let batch = ref [AList.pop_all gc_metrics] in - E_metrics.pop_iter_all (fun l -> batch := l :: !batch); + let emit_ (type a) (module P: PUSH with type elt = a list) + ?(init=fun() -> []) ?(force=false) ~send_http () : bool = + if force || (not force && P.is_big_enough ()) then ( + let batch = ref [init()] in + P.pop_iter_all (fun l -> batch := l :: !batch); let do_something = not (l_is_empty !batch) in if do_something then ( - send_metrics_http !batch; + send_http !batch; Atomic.set last_wakeup (Mtime_clock.now()); ); do_something ) else false in - let emit_traces ?(force=false) () : bool = - if force || (not force && E_trace.is_big_enough ()) then ( - let batch = ref [] in - E_trace.pop_iter_all (fun l -> batch := l :: !batch); - let do_something = not (l_is_empty !batch) in - if do_something then ( - send_traces_http !batch; - Atomic.set last_wakeup (Mtime_clock.now()); - ); - do_something - ) else false + + let emit_metrics ?force () : bool = + emit_ (module E_metrics) + ~init:(fun () -> AList.pop_all gc_metrics) + ~send_http:send_metrics_http () + and emit_traces ?force () : bool = + emit_ (module E_trace) ~send_http:send_traces_http () + and emit_logs ?force () : bool = + emit_ (module E_logs) ~send_http:send_logs_http () in let[@inline] guard f = @@ -280,9 +284,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let emit_all_force () = - let@ () = guard in ignore (emit_traces ~force:true () : bool); ignore (emit_metrics ~force:true () : bool); + ignore (emit_logs ~force:true () : bool); in @@ -305,7 +309,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let do_metrics = emit_metrics ~force:timeout () in let do_traces = emit_traces ~force:timeout () in - if not do_metrics && not do_traces then ( + let do_logs = emit_logs ~force:timeout () in + if not do_metrics && not do_traces && not do_logs then ( (* wait *) let@ () = with_mutex_ m in Condition.wait cond m; @@ -314,8 +319,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = (* flush remaining events *) begin let@ () = guard in - ignore (emit_traces ~force:true () : bool); - ignore (emit_metrics ~force:true () : bool); + emit_all_force(); C.cleanup(); end in @@ -354,6 +358,9 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let push_metrics e = E_metrics.push e; if batch_timeout() then wakeup() + let push_logs e = + E_logs.push e; + if batch_timeout() then wakeup() let tick=tick let cleanup () = continue := false; @@ -367,6 +374,8 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = ignore (emit_metrics () : bool)); on_trace_full (fun () -> ignore (emit_traces () : bool)); + on_logs_full (fun () -> + ignore (emit_logs () : bool)); let cleanup () = emit_all_force(); @@ -384,6 +393,11 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = E_metrics.push e; if batch_timeout() then emit_all_force() + let push_logs e = + let@() = guard in + E_logs.push e; + if batch_timeout() then emit_all_force() + let tick () = if Atomic.get needs_gc_metrics then sample_gc_metrics(); if batch_timeout() then emit_all_force() @@ -449,6 +463,14 @@ module Backend(Arg : sig val config : Config.t end)() push_metrics m; ret() } + + let send_logs : Logs.resource_logs list sender = { + send=fun m ~ret -> + let@() = with_lock_ in + if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m; + push_logs m; + ret() + } end let setup_ ~(config:Config.t) () = diff --git a/src/dune b/src/dune index 706442e2..933ec2aa 100644 --- a/src/dune +++ b/src/dune @@ -90,3 +90,15 @@ (action (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ -ml_out . -pp -binary))) + +(rule + (targets logs_service_types.ml logs_service_types.mli + logs_service_pp.ml logs_service_pp.mli + logs_service_pb.ml logs_service_pb.mli) + (deps + (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action (run ocaml-protoc %{file} + -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) + diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 5d09e8c9..612315f4 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -46,6 +46,18 @@ module Proto = struct include Status_pp include Status_pb end + + module Logs = struct + include Logs_types + include Logs_pb + include Logs_pp + end + + module Logs_service = struct + include Logs_service_types + include Logs_service_pb + include Logs_service_pp + end end (** {2 Timestamps} *) @@ -100,6 +112,8 @@ module Collector = struct val send_metrics : Metrics.resource_metrics list sender + val send_logs : Logs.resource_logs list sender + val rand_bytes_16 : unit -> bytes (** Generate 16 bytes of random data *) @@ -135,6 +149,11 @@ module Collector = struct | None -> ret() | Some (module B) -> B.send_metrics.send l ~ret + let send_logs (l:Logs.resource_logs list) ~ret = + match !backend with + | None -> ret() + | Some (module B) -> B.send_logs.send l ~ret + let rand_bytes_16 () = match !backend with | None -> Bytes.make 16 '?' @@ -658,8 +677,98 @@ module Metrics = struct Collector.send_metrics [rm] ~ret:ignore end -module Logs = struct +(** {2 Logs} *) +(** Logs. + + See {{: https://opentelemetry.io/docs/reference/specification/overview/#log-signal} the spec} *) +module Logs = struct + open Logs_types + + type t = log_record + + (** Severity level of a log event *) + type severity = Logs_types.severity_number = + | Severity_number_unspecified + | Severity_number_trace + | Severity_number_trace2 + | Severity_number_trace3 + | Severity_number_trace4 + | Severity_number_debug + | Severity_number_debug2 + | Severity_number_debug3 + | Severity_number_debug4 + | Severity_number_info + | Severity_number_info2 + | Severity_number_info3 + | Severity_number_info4 + | Severity_number_warn + | Severity_number_warn2 + | Severity_number_warn3 + | Severity_number_warn4 + | Severity_number_error + | Severity_number_error2 + | Severity_number_error3 + | Severity_number_error4 + | Severity_number_fatal + | Severity_number_fatal2 + | Severity_number_fatal3 + | Severity_number_fatal4 + + let pp_severity = Logs_pp.pp_severity_number + + type flags = Logs_types.log_record_flags = + | Log_record_flag_unspecified + | Log_record_flag_trace_flags_mask + + let pp_flags = Logs_pp.pp_log_record_flags + + (** Make a single log entry *) + let make + ?time ?(observed_time_unix_nano=Timestamp_ns.now_unix_ns()) + ?severity ?log_level ?flags ?trace_id ?span_id + (body:value) : t = + let time_unix_nano = match time with + | None -> observed_time_unix_nano + | Some t -> t + in + let body = _conv_value body in + default_log_record + ~time_unix_nano ~observed_time_unix_nano + ?severity_number:severity ?severity_text:log_level + ?flags ?trace_id ?span_id ~body () + + (** Make a log entry whose body is a string *) + let make_str + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id + (body:string) : t = + make + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id + (`String body) + + (** Make a log entry with format *) + let make_strf + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id + fmt = + Format.kasprintf + (fun bod -> + make_str + ?time ?observed_time_unix_nano + ?severity ?log_level ?flags ?trace_id ?span_id bod) + fmt + + let emit ?service_name ?attrs (l:t list) : unit = + let attributes = Globals.mk_attributes ?service_name ?attrs () in + let resource = Proto.Resource.default_resource ~attributes () in + let ll = default_instrumentation_library_logs + ~instrumentation_library:(Some Globals.instrumentation_library) + ~log_records:l () in + let rl = default_resource_logs ~resource:(Some resource) + ~instrumentation_library_logs:[ll] () in + Collector.send_logs [rl] ~ret:ignore end (** {2 Utils} *) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index c7adf5f6..e57e8381 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -36,6 +36,8 @@ let run () = Unix.sleepf !sleep_inner; if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) + T.Logs.(emit [make @@ `String "log"]); + T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); with Failure _ -> ()); From 619b389322a7b49f11bfac22832566541e8df224 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 15 Apr 2022 21:53:43 -0400 Subject: [PATCH 3/6] type error fixed --- src/opentelemetry.ml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 612315f4..6c413a3c 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -732,6 +732,8 @@ module Logs = struct | None -> observed_time_unix_nano | Some t -> t in + let trace_id = Option.map Trace_id.to_bytes trace_id in + let span_id = Option.map Span_id.to_bytes span_id in let body = _conv_value body in default_log_record ~time_unix_nano ~observed_time_unix_nano From 9e19e323db3c72a912afdf1dd891615aeb480395 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 15 Apr 2022 21:53:52 -0400 Subject: [PATCH 4/6] update example --- tests/bin/emit1.ml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index e57e8381..c0c42996 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -24,6 +24,12 @@ let run () = "loop.inner" in Unix.sleepf !sleep_outer; + T.Logs.(emit [ + make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id + ~severity:Severity_number_info + "inner at %d" j + ]); + incr i; (try @@ -36,8 +42,6 @@ let run () = Unix.sleepf !sleep_inner; if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) - T.Logs.(emit [make @@ `String "log"]); - T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); with Failure _ -> ()); From 8c363341e56d7bdb401c80a8038f8166c9fd7682 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 28 Apr 2022 10:03:08 -0400 Subject: [PATCH 5/6] chore: require ocaml-protoc 2.2 required because the Logs proto file contains a hex literal. --- dune-project | 4 ++-- opentelemetry-client-ocurl.opam | 2 +- opentelemetry.opam | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dune-project b/dune-project index 93d60bb3..0ead41b2 100644 --- a/dune-project +++ b/dune-project @@ -18,7 +18,7 @@ (ocaml (>= "4.08")) ptime (odoc :with-doc) - (ocaml-protoc (>= 2.1))) + (ocaml-protoc (>= 2.2))) (tags (instrumentation tracing opentelemetry datadog jaeger))) @@ -40,7 +40,7 @@ (mtime (>= "1.4")) ; for spans ; atomic ; vendored (opentelemetry (= :version)) - (ocaml-protoc (>= 2.1)) + (ocaml-protoc (>= 2.2)) (odoc :with-doc) ocurl) (synopsis "Collector client for opentelemetry, using http + ocurl")) diff --git a/opentelemetry-client-ocurl.opam b/opentelemetry-client-ocurl.opam index f65d64ee..760071c6 100644 --- a/opentelemetry-client-ocurl.opam +++ b/opentelemetry-client-ocurl.opam @@ -13,7 +13,7 @@ depends: [ "ocaml" {>= "4.08"} "mtime" {>= "1.4"} "opentelemetry" {= version} - "ocaml-protoc" {>= "2.1"} + "ocaml-protoc" {>= "2.2"} "odoc" {with-doc} "ocurl" ] diff --git a/opentelemetry.opam b/opentelemetry.opam index f02508d8..b4d44fc2 100644 --- a/opentelemetry.opam +++ b/opentelemetry.opam @@ -14,7 +14,7 @@ depends: [ "ocaml" {>= "4.08"} "ptime" "odoc" {with-doc} - "ocaml-protoc" {>= "2.1"} + "ocaml-protoc" {>= "2.2"} ] build: [ ["dune" "subst"] {dev} From 3f9bd94837251767bee93241b8dcad462e68b2c8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 12 May 2022 11:54:06 -0400 Subject: [PATCH 6/6] format --- dune | 4 +- src/atomic/atomic.post412.mli | 19 +- src/atomic/atomic.pre412.mli | 19 +- src/atomic/dune | 20 +- src/atomic/gen.ml | 27 +- src/client/AList.ml | 12 +- src/client/AList.mli | 3 + src/client/FQueue.ml | 16 +- src/client/FQueue.mli | 12 +- src/client/common_.ml | 17 +- src/client/config.ml | 44 +- src/client/config.mli | 31 +- src/client/dune | 11 +- src/client/gen_ids.ml | 36 +- src/client/opentelemetry_client_ocurl.ml | 419 +++++++++-------- src/client/opentelemetry_client_ocurl.mli | 1 - src/dune | 158 +++---- .../cohttp/opentelemetry_cohttp_lwt.ml | 238 +++++----- src/lwt/dune | 8 +- src/lwt/opentelemetry_lwt.ml | 42 +- src/opentelemetry.ml | 424 ++++++++++-------- tests/bin/cohttp_client.ml | 60 ++- tests/bin/dune | 3 +- tests/bin/emit1.ml | 89 ++-- tests/test_trace_context.ml | 30 +- 25 files changed, 956 insertions(+), 787 deletions(-) diff --git a/dune b/dune index f66632f8..f2eef302 100644 --- a/dune +++ b/dune @@ -1,3 +1,3 @@ - (env - (_ (flags :standard -warn-error -a+8))) + (_ + (flags :standard -warn-error -a+8))) diff --git a/src/atomic/atomic.post412.mli b/src/atomic/atomic.post412.mli index 36ac6ce3..806271b2 100644 --- a/src/atomic/atomic.post412.mli +++ b/src/atomic/atomic.post412.mli @@ -1,4 +1,3 @@ - (**************************************************************************) (* *) (* OCaml *) @@ -19,34 +18,34 @@ (** Atomic references. *) -(** An atomic (mutable) reference to a value of type ['a]. *) type 'a t = 'a Stdlib.Atomic.t +(** An atomic (mutable) reference to a value of type ['a]. *) -(** Create an atomic reference. *) val make : 'a -> 'a t +(** Create an atomic reference. *) -(** Get the current value of the atomic reference. *) val get : 'a t -> 'a +(** Get the current value of the atomic reference. *) -(** Set a new value for the atomic reference. *) val set : 'a t -> 'a -> unit +(** Set a new value for the atomic reference. *) -(** Set a new value for the atomic reference, and return the current value. *) val exchange : 'a t -> 'a -> 'a +(** Set a new value for the atomic reference, and return the current value. *) +val compare_and_set : 'a t -> 'a -> 'a -> bool (** [compare_and_set r seen v] sets the new value of [r] to [v] only if its current value is physically equal to [seen] -- the comparison and the set occur atomically. Returns [true] if the comparison succeeded (so the set happened) and [false] otherwise. *) -val compare_and_set : 'a t -> 'a -> 'a -> bool +val fetch_and_add : int t -> int -> int (** [fetch_and_add r n] atomically increments the value of [r] by [n], and returns the current value (before the increment). *) -val fetch_and_add : int t -> int -> int -(** [incr r] atomically increments the value of [r] by [1]. *) val incr : int t -> unit +(** [incr r] atomically increments the value of [r] by [1]. *) -(** [decr r] atomically decrements the value of [r] by [1]. *) val decr : int t -> unit +(** [decr r] atomically decrements the value of [r] by [1]. *) diff --git a/src/atomic/atomic.pre412.mli b/src/atomic/atomic.pre412.mli index 89905280..f19bef29 100644 --- a/src/atomic/atomic.pre412.mli +++ b/src/atomic/atomic.pre412.mli @@ -1,4 +1,3 @@ - (**************************************************************************) (* *) (* OCaml *) @@ -19,34 +18,34 @@ (** Atomic references. *) -(** An atomic (mutable) reference to a value of type ['a]. *) type 'a t +(** An atomic (mutable) reference to a value of type ['a]. *) -(** Create an atomic reference. *) val make : 'a -> 'a t +(** Create an atomic reference. *) -(** Get the current value of the atomic reference. *) val get : 'a t -> 'a +(** Get the current value of the atomic reference. *) -(** Set a new value for the atomic reference. *) val set : 'a t -> 'a -> unit +(** Set a new value for the atomic reference. *) -(** Set a new value for the atomic reference, and return the current value. *) val exchange : 'a t -> 'a -> 'a +(** Set a new value for the atomic reference, and return the current value. *) +val compare_and_set : 'a t -> 'a -> 'a -> bool (** [compare_and_set r seen v] sets the new value of [r] to [v] only if its current value is physically equal to [seen] -- the comparison and the set occur atomically. Returns [true] if the comparison succeeded (so the set happened) and [false] otherwise. *) -val compare_and_set : 'a t -> 'a -> 'a -> bool +val fetch_and_add : int t -> int -> int (** [fetch_and_add r n] atomically increments the value of [r] by [n], and returns the current value (before the increment). *) -val fetch_and_add : int t -> int -> int -(** [incr r] atomically increments the value of [r] by [1]. *) val incr : int t -> unit +(** [incr r] atomically increments the value of [r] by [1]. *) -(** [decr r] atomically decrements the value of [r] by [1]. *) val decr : int t -> unit +(** [decr r] atomically decrements the value of [r] by [1]. *) diff --git a/src/atomic/dune b/src/atomic/dune index ac05b2e1..b357237c 100644 --- a/src/atomic/dune +++ b/src/atomic/dune @@ -1,15 +1,15 @@ - (library - (name opentelemetry_atomic) - (synopsis "Compatibility package for the Atomic module for opentelemetry") - (public_name opentelemetry.atomic) - (modules atomic)) + (name opentelemetry_atomic) + (synopsis "Compatibility package for the Atomic module for opentelemetry") + (public_name opentelemetry.atomic) + (modules atomic)) (executable - (modules gen) - (name gen)) + (modules gen) + (name gen)) (rule - (targets atomic.ml atomic.mli atomic.ml) - (deps atomic.pre412.mli atomic.post412.mli) - (action (run ./gen.exe))) + (targets atomic.ml atomic.mli atomic.ml) + (deps atomic.pre412.mli atomic.post412.mli) + (action + (run ./gen.exe))) diff --git a/src/atomic/gen.ml b/src/atomic/gen.ml index 29d0ae63..7eafc3c3 100644 --- a/src/atomic/gen.ml +++ b/src/atomic/gen.ml @@ -1,6 +1,5 @@ - - -let atomic_before_412 = {| +let atomic_before_412 = + {| type 'a t = {mutable x: 'a} let[@inline] make x = {x} let[@inline] get {x} = x @@ -32,7 +31,9 @@ let atomic_before_412 = {| let atomic_after_412 = {|include Stdlib.Atomic|} let write_file file s = - let oc = open_out file in output_string oc s; close_out oc + let oc = open_out file in + output_string oc s; + close_out oc let copy_file file1 file2 = let oc = open_out file2 in @@ -41,14 +42,22 @@ let copy_file file1 file2 = try while true do let n = input ic buf 0 (Bytes.length buf) in - if n=0 then raise End_of_file; + if n = 0 then raise End_of_file; output oc buf 0 n done with End_of_file -> () let () = - let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x,y) in - write_file "atomic.ml" (if version >= (4,12) then atomic_after_412 else atomic_before_412); - copy_file (if version >= (4,12) then "atomic.post412.mli" else "atomic.pre412.mli") "atomic.mli" ; + let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x, y) in + write_file "atomic.ml" + (if version >= (4, 12) then + atomic_after_412 + else + atomic_before_412); + copy_file + (if version >= (4, 12) then + "atomic.post412.mli" + else + "atomic.pre412.mli") + "atomic.mli"; () - diff --git a/src/client/AList.ml b/src/client/AList.ml index 0a8c54b0..9b7d47df 100644 --- a/src/client/AList.ml +++ b/src/client/AList.ml @@ -1,13 +1,21 @@ module Atomic = Opentelemetry_atomic.Atomic type 'a t = 'a list Atomic.t + let make () = Atomic.make [] + let add self x = while let old = Atomic.get self in let l' = x :: old in not (Atomic.compare_and_set self old l') - do () done + do + () + done + let rec pop_all self = let l = Atomic.get self in - if Atomic.compare_and_set self l [] then l else pop_all self + if Atomic.compare_and_set self l [] then + l + else + pop_all self diff --git a/src/client/AList.mli b/src/client/AList.mli index f4db8292..b4c718dd 100644 --- a/src/client/AList.mli +++ b/src/client/AList.mli @@ -1,6 +1,9 @@ (** Atomic list *) type 'a t + val make : unit -> 'a t + val add : 'a t -> 'a -> unit + val pop_all : 'a t -> 'a list diff --git a/src/client/FQueue.ml b/src/client/FQueue.ml index fc48def3..ce04dad3 100644 --- a/src/client/FQueue.ml +++ b/src/client/FQueue.ml @@ -1,5 +1,3 @@ - - type 'a t = { arr: 'a array; mutable i: int; @@ -7,23 +5,23 @@ type 'a t = { let create ~dummy n : _ t = assert (n >= 1); - { arr=Array.make n dummy; - i=0; - } + { arr = Array.make n dummy; i = 0 } let[@inline] size self = self.i + let[@inline] is_full self = self.i = Array.length self.arr -let push (self:_ t) x : bool = - if is_full self then false +let push (self : _ t) x : bool = + if is_full self then + false else ( self.arr.(self.i) <- x; self.i <- 1 + self.i; true ) -let pop_iter_all (self: _ t) f = - for j=0 to self.i-1 do +let pop_iter_all (self : _ t) f = + for j = 0 to self.i - 1 do f self.arr.(j) done; self.i <- 0 diff --git a/src/client/FQueue.mli b/src/client/FQueue.mli index b80544d2..0a03b00d 100644 --- a/src/client/FQueue.mli +++ b/src/client/FQueue.mli @@ -1,9 +1,11 @@ - (** queue of fixed size *) type 'a t -val create : dummy:'a -> int -> 'a t -val size : _ t -> int -val push : 'a t -> 'a -> bool (* true iff it could write element *) -val pop_iter_all : 'a t -> ('a -> unit) -> unit +val create : dummy:'a -> int -> 'a t + +val size : _ t -> int + +val push : 'a t -> 'a -> bool (* true iff it could write element *) + +val pop_iter_all : 'a t -> ('a -> unit) -> unit diff --git a/src/client/common_.ml b/src/client/common_.ml index d8bf312a..32e25566 100644 --- a/src/client/common_.ml +++ b/src/client/common_.ml @@ -1,10 +1,15 @@ module Atomic = Opentelemetry_atomic.Atomic -let[@inline] (let@) f x = f x +let[@inline] ( let@ ) f x = f x -let debug_ = ref (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with Some ("1"|"true") -> true | _ -> false) +let debug_ = + ref + (match Sys.getenv_opt "OTEL_OCAML_DEBUG" with + | Some ("1" | "true") -> true + | _ -> false) let lock_ : (unit -> unit) ref = ref ignore + let unlock_ : (unit -> unit) ref = ref ignore let set_mutex ~lock ~unlock : unit = @@ -13,7 +18,7 @@ let set_mutex ~lock ~unlock : unit = (* critical section for [f()] *) let[@inline] with_lock_ f = - !lock_(); + !lock_ (); Fun.protect ~finally:!unlock_ f let[@inline] with_mutex_ m f = @@ -21,6 +26,10 @@ let[@inline] with_mutex_ m f = Fun.protect ~finally:(fun () -> Mutex.unlock m) f let default_url = "http://localhost:4318" -let url = ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) + +let url = + ref (try Sys.getenv "OTEL_EXPORTER_OTLP_ENDPOINT" with _ -> default_url) + let get_url () = !url + let set_url s = url := s diff --git a/src/client/config.ml b/src/client/config.ml index a3fb068e..083a5b2f 100644 --- a/src/client/config.ml +++ b/src/client/config.ml @@ -1,4 +1,3 @@ - open Common_ type t = { @@ -14,25 +13,34 @@ type t = { let pp out self = let ppiopt = Format.pp_print_option Format.pp_print_int in - let {debug; url; batch_traces; batch_metrics; batch_logs; - batch_timeout_ms; thread; ticker_thread} = self in + let { + debug; + url; + batch_traces; + batch_metrics; + batch_logs; + batch_timeout_ms; + thread; + ticker_thread; + } = + self + in Format.fprintf out - "{@[ debug=%B;@ url=%S;@ \ - batch_traces=%a;@ batch_metrics=%a;@ batch_logs=%a;@ \ - batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" + "{@[ debug=%B;@ url=%S;@ batch_traces=%a;@ batch_metrics=%a;@ \ + batch_logs=%a;@ batch_timeout_ms=%d; thread=%B;@ ticker_thread=%B @]}" debug url ppiopt batch_traces ppiopt batch_metrics ppiopt batch_logs batch_timeout_ms thread ticker_thread -let make - ?(debug= !debug_) - ?(url= get_url()) - ?(batch_traces=Some 400) - ?(batch_metrics=None) - ?(batch_logs=Some 400) - ?(batch_timeout_ms=500) - ?(thread=true) - ?(ticker_thread=true) - () : t = - { debug; url; batch_traces; batch_metrics; batch_logs; +let make ?(debug = !debug_) ?(url = get_url ()) ?(batch_traces = Some 400) + ?(batch_metrics = None) ?(batch_logs = Some 400) ?(batch_timeout_ms = 500) + ?(thread = true) ?(ticker_thread = true) () : t = + { + debug; + url; + batch_traces; + batch_metrics; + batch_logs; batch_timeout_ms; - thread; ticker_thread; } + thread; + ticker_thread; + } diff --git a/src/client/config.mli b/src/client/config.mli index cfa501bd..76fd1cc5 100644 --- a/src/client/config.mli +++ b/src/client/config.mli @@ -1,56 +1,49 @@ - type t = { debug: bool; - url: string; - (** Url of the endpoint. Default is "http://localhost:4318", + (** Url of the endpoint. Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. *) - batch_traces: int option; - (** Batch traces? If [Some i], then this produces batches of (at most) + (** Batch traces? If [Some i], then this produces batches of (at most) [i] items. If [None], there is no batching. Note that traces and metrics are batched separately. Default [Some 400]. *) - batch_metrics: int option; - (** Batch metrics? If [Some i], then this produces batches of (at most) + (** Batch metrics? If [Some i], then this produces batches of (at most) [i] items. If [None], there is no batching. Note that traces and metrics are batched separately. Default [None]. *) - - batch_logs : int option; - (** Batch logs? See {!batch_metrics} for details. + batch_logs: int option; + (** Batch logs? See {!batch_metrics} for details. Default [Some 400] *) - batch_timeout_ms: int; - (** Number of milliseconds after which we will emit a batch, even + (** Number of milliseconds after which we will emit a batch, even incomplete. Note that the batch might take longer than that, because this is only checked when a new event occurs. Default 500. *) - - thread: bool; - (** Is there a background thread? Default [true] *) - + thread: bool; (** Is there a background thread? Default [true] *) ticker_thread: bool; - (** Is there a ticker thread? Default [true]. + (** Is there a ticker thread? Default [true]. This thread will regularly call [tick()] on the backend, to make sure it makes progress, and regularly send events to the collector. This option is ignored if [thread=false]. *) } val make : - ?debug:bool -> ?url:string -> + ?debug:bool -> + ?url:string -> ?batch_traces:int option -> ?batch_metrics:int option -> ?batch_logs:int option -> ?batch_timeout_ms:int -> ?thread:bool -> ?ticker_thread:bool -> - unit -> t + unit -> + t (** Make a configuration *) val pp : Format.formatter -> t -> unit diff --git a/src/client/dune b/src/client/dune index 56d7fbf1..20d85230 100644 --- a/src/client/dune +++ b/src/client/dune @@ -1,8 +1,5 @@ - (library - (name opentelemetry_client_ocurl) - (public_name opentelemetry-client-ocurl) - (libraries opentelemetry opentelemetry.atomic - curl ocaml-protoc threads - mtime mtime.clock.os)) - + (name opentelemetry_client_ocurl) + (public_name opentelemetry-client-ocurl) + (libraries opentelemetry opentelemetry.atomic curl ocaml-protoc threads + mtime mtime.clock.os)) diff --git a/src/client/gen_ids.ml b/src/client/gen_ids.ml index 049f6efc..04652ffb 100644 --- a/src/client/gen_ids.ml +++ b/src/client/gen_ids.ml @@ -1,34 +1,36 @@ - open Common_ (* generate random IDs *) -module Make() = struct - let rand_ = Random.State.make_self_init() +module Make () = struct + let rand_ = Random.State.make_self_init () let rand_bytes_8 () : bytes = - let@() = with_lock_ in + let@ () = with_lock_ in let b = Bytes.create 8 in - for i=0 to 1 do - let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) - Bytes.set b (i*3) (Char.chr (r land 0xff)); - Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); - Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); + for i = 0 to 1 do + let r = Random.State.bits rand_ in + (* 30 bits, of which we use 24 *) + Bytes.set b (i * 3) (Char.chr (r land 0xff)); + Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff)); + Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff)) done; let r = Random.State.bits rand_ in Bytes.set b 6 (Char.chr (r land 0xff)); - Bytes.set b 7 (Char.chr (r lsr 8 land 0xff)); + Bytes.set b 7 (Char.chr ((r lsr 8) land 0xff)); b let rand_bytes_16 () : bytes = - let@() = with_lock_ in + let@ () = with_lock_ in let b = Bytes.create 16 in - for i=0 to 4 do - let r = Random.State.bits rand_ in (* 30 bits, of which we use 24 *) - Bytes.set b (i*3) (Char.chr (r land 0xff)); - Bytes.set b (i*3+1) (Char.chr (r lsr 8 land 0xff)); - Bytes.set b (i*3+2) (Char.chr (r lsr 16 land 0xff)); + for i = 0 to 4 do + let r = Random.State.bits rand_ in + (* 30 bits, of which we use 24 *) + Bytes.set b (i * 3) (Char.chr (r land 0xff)); + Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff)); + Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff)) done; let r = Random.State.bits rand_ in - Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *) + Bytes.set b 15 (Char.chr (r land 0xff)); + (* last byte *) b end diff --git a/src/client/opentelemetry_client_ocurl.ml b/src/client/opentelemetry_client_ocurl.ml index 1ae0272b..16ba0033 100644 --- a/src/client/opentelemetry_client_ocurl.ml +++ b/src/client/opentelemetry_client_ocurl.ml @@ -1,4 +1,3 @@ - (* https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md @@ -10,48 +9,56 @@ include Common_ let needs_gc_metrics = Atomic.make false -let gc_metrics = AList.make() (* side channel for GC, appended to {!E_metrics}'s data *) +let gc_metrics = AList.make () +(* side channel for GC, appended to {!E_metrics}'s data *) (* capture current GC metrics and push them into {!gc_metrics} for later collection *) let sample_gc_metrics () = Atomic.set needs_gc_metrics false; - let l = OT.Metrics.make_resource_metrics - ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) - @@ Opentelemetry.GC_metrics.get_metrics() in + let l = + OT.Metrics.make_resource_metrics + ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) + @@ Opentelemetry.GC_metrics.get_metrics () + in AList.add gc_metrics l module Config = Config -let _init_curl = lazy ( - Curl.global_init Curl.CURLINIT_GLOBALALL; - at_exit Curl.global_cleanup; -) +let _init_curl = + lazy + (Curl.global_init Curl.CURLINIT_GLOBALALL; + at_exit Curl.global_cleanup) -type error = [ - | `Status of int * Opentelemetry.Proto.Status.status +type error = + [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string -] + ] let n_errors = Atomic.make 0 + let n_dropped = Atomic.make 0 let report_err_ = function | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg | `Status (code, status) -> - Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." - code Proto.Status.pp_status status + Format.eprintf + "@[<2>opentelemetry: export failed with@ http code=%d@ status %a@]@." code + Proto.Status.pp_status status module type CURL = sig - val send : path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + val send : + path:string -> decode:(Pbrt.Decoder.t -> 'a) -> string -> ('a, error) result + val cleanup : unit -> unit end (* create a curl client *) -module Curl() : CURL = struct +module Curl () : CURL = struct open Opentelemetry.Proto - let() = Lazy.force _init_curl + + let () = Lazy.force _init_curl let buf_res = Buffer.create 256 @@ -65,35 +72,35 @@ module Curl() : CURL = struct (* TODO: use Curl multi *) (* send the content to the remote endpoint/path *) - let send ~path ~decode (bod:string) : ('a, error) result = + let send ~path ~decode (bod : string) : ('a, error) result = Curl.reset curl; if !debug_ then Curl.set_verbose curl true; Curl.set_url curl (!url ^ path); Curl.set_httppost curl []; - Curl.set_httpheader curl ["Content-Type: application/x-protobuf"]; + Curl.set_httpheader curl [ "Content-Type: application/x-protobuf" ]; (* write body *) Curl.set_post curl true; Curl.set_postfieldsize curl (String.length bod); Curl.set_readfunction curl - begin - let i = ref 0 in - (fun n -> - if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; - let len = min n (String.length bod - !i) in - let s = String.sub bod !i len in - if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; - i := !i + len; - s) - end; + (let i = ref 0 in + fun n -> + if !debug_ then Printf.eprintf "curl asks for %d bytes\n%!" n; + let len = min n (String.length bod - !i) in + let s = String.sub bod !i len in + if !debug_ then Printf.eprintf "gave curl %d bytes\n%!" len; + i := !i + len; + s); (* read result's body *) Buffer.clear buf_res; - Curl.set_writefunction curl - (fun s -> Buffer.add_string buf_res s; String.length s); + Curl.set_writefunction curl (fun s -> + Buffer.add_string buf_res s; + String.length s); try match Curl.perform curl with | () -> let code = Curl.get_responsecode curl in - if !debug_ then Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); + if !debug_ then + Printf.eprintf "result body: %S\n%!" (Buffer.contents buf_res); let dec = Pbrt.Decoder.of_string (Buffer.contents buf_res) in if code >= 200 && code < 300 then ( let res = decode dec in @@ -103,17 +110,24 @@ module Curl() : CURL = struct Error (`Status (code, status)) ) | exception Curl.CurlException (_, code, msg) -> - let status = Status.default_status - ~code:(Int32.of_int code) ~message:(Bytes.unsafe_of_string msg) () in - Error(`Status (code, status)) + let status = + Status.default_status ~code:(Int32.of_int code) + ~message:(Bytes.unsafe_of_string msg) + () + in + Error (`Status (code, status)) with e -> Error (`Failure (Printexc.to_string e)) end module type PUSH = sig type elt + val push : elt -> unit + val is_empty : unit -> bool + val is_big_enough : unit -> bool + val pop_iter_all : (elt -> unit) -> unit end @@ -123,151 +137,170 @@ module type EMITTER = sig open Opentelemetry.Proto val push_trace : Trace.resource_spans list -> unit + val push_metrics : Metrics.resource_metrics list -> unit + val push_logs : Logs.resource_logs list -> unit val tick : unit -> unit + val cleanup : unit -> unit end type 'a push = (module PUSH with type elt = 'a) -type on_full_cb = (unit -> unit) + +type on_full_cb = unit -> unit (* make a "push" object, along with a setter for a callback to call when it's ready to emit a batch *) -let mk_push (type a) ?batch () : (module PUSH with type elt = a) * (on_full_cb -> unit) = - let on_full: on_full_cb ref = ref ignore in +let mk_push (type a) ?batch () : + (module PUSH with type elt = a) * (on_full_cb -> unit) = + let on_full : on_full_cb ref = ref ignore in let push = match batch with | None -> let r = ref None in let module M = struct type elt = a + let is_empty () = !r == None + let is_big_enough () = !r != None + let push x = - r := Some x; !on_full() - let pop_iter_all f = Option.iter f !r; r := None + r := Some x; + !on_full () + + let pop_iter_all f = + Option.iter f !r; + r := None end in (module M : PUSH with type elt = a) - | Some n -> let q = FQueue.create ~dummy:(Obj.magic 0) (3 * n) in let module M = struct type elt = a + let is_empty () = FQueue.size q = 0 + let is_big_enough () = FQueue.size q >= n + let push x = - if not (FQueue.push q x) || FQueue.size q > n then ( - !on_full(); - if not (FQueue.push q x) then ( - Atomic.incr n_dropped; (* drop item *) - ) + if (not (FQueue.push q x)) || FQueue.size q > n then ( + !on_full (); + if not (FQueue.push q x) then Atomic.incr n_dropped (* drop item *) ) + let pop_iter_all f = FQueue.pop_iter_all q f end in (module M : PUSH with type elt = a) - in - push, ((:=) on_full) + + push, ( := ) on_full (* start a thread in the background, running [f()] *) -let start_bg_thread (f: unit -> unit) : unit = - let run() = +let start_bg_thread (f : unit -> unit) : unit = + let run () = (* block some signals: USR1 USR2 TERM PIPE ALARM STOP, see [$ kill -L] *) - ignore (Thread.sigmask Unix.SIG_BLOCK [10; 12; 13; 14; 15; 19] : _ list); - f() + ignore (Thread.sigmask Unix.SIG_BLOCK [ 10; 12; 13; 14; 15; 19 ] : _ list); + f () in ignore (Thread.create run () : Thread.t) -let l_is_empty = function [] -> true | _::_ -> false +let l_is_empty = function + | [] -> true + | _ :: _ -> false + let batch_is_empty = List.for_all l_is_empty (* make an emitter. exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~(config:Config.t) () : (module EMITTER) = +let mk_emitter ~(config : Config.t) () : (module EMITTER) = let open Proto in - let continue = ref true in let ((module E_trace) : Trace.resource_spans list push), on_trace_full = - mk_push ?batch:config.batch_traces () in - let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full = - mk_push ?batch:config.batch_metrics () in + mk_push ?batch:config.batch_traces () + in + let ((module E_metrics) : Metrics.resource_metrics list push), on_metrics_full + = + mk_push ?batch:config.batch_metrics () + in let ((module E_logs) : Logs.resource_logs list push), on_logs_full = - mk_push ?batch:config.batch_logs () in + mk_push ?batch:config.batch_logs () + in - let encoder = Pbrt.Encoder.create() in + let encoder = Pbrt.Encoder.create () in - let ((module C) as curl) = (module Curl() : CURL) in + let ((module C) as curl) = (module Curl () : CURL) in let send_http_ ~path ~encode x : unit = Pbrt.Encoder.reset encoder; encode x encoder; let data = Pbrt.Encoder.to_string encoder in - begin match - C.send ~path ~decode:(fun _ -> ()) data - with - | Ok () -> () - | Error err -> - (* TODO: log error _via_ otel? *) - Atomic.incr n_errors; - report_err_ err - end; + match C.send ~path ~decode:(fun _ -> ()) data with + | Ok () -> () + | Error err -> + (* TODO: log error _via_ otel? *) + Atomic.incr n_errors; + report_err_ err in - let send_metrics_http (l:Metrics.resource_metrics list list) = + let send_metrics_http (l : Metrics.resource_metrics list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = - Metrics_service.default_export_metrics_service_request - ~resource_metrics:l () in + Metrics_service.default_export_metrics_service_request ~resource_metrics:l + () + in send_http_ ~path:"/v1/metrics" ~encode:Metrics_service.encode_export_metrics_service_request x in - let send_traces_http (l:Trace.resource_spans list list) = + let send_traces_http (l : Trace.resource_spans list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = - Trace_service.default_export_trace_service_request - ~resource_spans:l () in + Trace_service.default_export_trace_service_request ~resource_spans:l () + in send_http_ ~path:"/v1/traces" ~encode:Trace_service.encode_export_trace_service_request x in - let send_logs_http (l:Logs.resource_logs list list) = + let send_logs_http (l : Logs.resource_logs list list) = let l = List.fold_left (fun acc l -> List.rev_append l acc) [] l in let x = - Logs_service.default_export_logs_service_request - ~resource_logs:l () in + Logs_service.default_export_logs_service_request ~resource_logs:l () + in send_http_ ~path:"/v1/logs" ~encode:Logs_service.encode_export_logs_service_request x in - let last_wakeup = Atomic.make (Mtime_clock.now()) in + let last_wakeup = Atomic.make (Mtime_clock.now ()) in let timeout = Mtime.Span.(config.batch_timeout_ms * ms) in - let batch_timeout() : bool = - let elapsed = Mtime.span (Mtime_clock.now()) (Atomic.get last_wakeup) in + let batch_timeout () : bool = + let elapsed = Mtime.span (Mtime_clock.now ()) (Atomic.get last_wakeup) in Mtime.Span.compare elapsed timeout >= 0 in - let emit_ (type a) (module P: PUSH with type elt = a list) - ?(init=fun() -> []) ?(force=false) ~send_http () : bool = - if force || (not force && P.is_big_enough ()) then ( - let batch = ref [init()] in + let emit_ (type a) (module P : PUSH with type elt = a list) + ?(init = fun () -> []) ?(force = false) ~send_http () : bool = + if force || ((not force) && P.is_big_enough ()) then ( + let batch = ref [ init () ] in P.pop_iter_all (fun l -> batch := l :: !batch); let do_something = not (l_is_empty !batch) in if do_something then ( send_http !batch; - Atomic.set last_wakeup (Mtime_clock.now()); + Atomic.set last_wakeup (Mtime_clock.now ()) ); do_something - ) else false + ) else + false in let emit_metrics ?force () : bool = - emit_ (module E_metrics) + emit_ + (module E_metrics) ~init:(fun () -> AList.pop_all gc_metrics) ~send_http:send_metrics_http () and emit_traces ?force () : bool = @@ -277,7 +310,7 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = in let[@inline] guard f = - try f() + try f () with e -> Printf.eprintf "opentelemetry-curl: uncaught exception: %s\n%!" (Printexc.to_string e) @@ -286,57 +319,51 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let emit_all_force () = ignore (emit_traces ~force:true () : bool); ignore (emit_metrics ~force:true () : bool); - ignore (emit_logs ~force:true () : bool); + ignore (emit_logs ~force:true () : bool) in - if config.thread then ( - begin - let m = Mutex.create() in - set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m); - end; + (let m = Mutex.create () in + set_mutex ~lock:(fun () -> Mutex.lock m) ~unlock:(fun () -> Mutex.unlock m)); - let ((module C) as curl) = (module Curl() : CURL) in + let ((module C) as curl) = (module Curl () : CURL) in - let m = Mutex.create() in - let cond = Condition.create() in + let m = Mutex.create () in + let cond = Condition.create () in (* loop for the thread that processes events and sends them to collector *) let bg_thread () = while !continue do let@ () = guard in - let timeout = batch_timeout() in + let timeout = batch_timeout () in let do_metrics = emit_metrics ~force:timeout () in let do_traces = emit_traces ~force:timeout () in let do_logs = emit_logs ~force:timeout () in - if not do_metrics && not do_traces && not do_logs then ( + if (not do_metrics) && (not do_traces) && not do_logs then (* wait *) let@ () = with_mutex_ m in - Condition.wait cond m; - ) + Condition.wait cond m done; (* flush remaining events *) - begin - let@ () = guard in - emit_all_force(); - C.cleanup(); - end + let@ () = guard in + emit_all_force (); + C.cleanup () in start_bg_thread bg_thread; let wakeup () = with_mutex_ m (fun () -> Condition.signal cond); - Thread.yield() + Thread.yield () in (* wake up if a batch is full *) on_metrics_full wakeup; on_trace_full wakeup; - let tick() = - if Atomic.get needs_gc_metrics then sample_gc_metrics(); - if batch_timeout() then wakeup() + let tick () = + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + if batch_timeout () then wakeup () in if config.ticker_thread then ( @@ -344,96 +371,105 @@ let mk_emitter ~(config:Config.t) () : (module EMITTER) = let tick_thread () = while true do Thread.delay 0.5; - tick(); + tick () done in - start_bg_thread tick_thread; + start_bg_thread tick_thread ); let module M = struct let push_trace e = E_trace.push e; - if batch_timeout() then wakeup() + if batch_timeout () then wakeup () + let push_metrics e = E_metrics.push e; - if batch_timeout() then wakeup() + if batch_timeout () then wakeup () + let push_logs e = E_logs.push e; - if batch_timeout() then wakeup() - let tick=tick + if batch_timeout () then wakeup () + + let tick = tick + let cleanup () = continue := false; with_mutex_ m (fun () -> Condition.broadcast cond) end in (module M) ) else ( - on_metrics_full (fun () -> - if Atomic.get needs_gc_metrics then sample_gc_metrics(); + if Atomic.get needs_gc_metrics then sample_gc_metrics (); ignore (emit_metrics () : bool)); - on_trace_full (fun () -> - ignore (emit_traces () : bool)); - on_logs_full (fun () -> - ignore (emit_logs () : bool)); + on_trace_full (fun () -> ignore (emit_traces () : bool)); + on_logs_full (fun () -> ignore (emit_logs () : bool)); let cleanup () = - emit_all_force(); - C.cleanup(); + emit_all_force (); + C.cleanup () in let module M = struct let push_trace e = - let@() = guard in + let@ () = guard in E_trace.push e; - if batch_timeout() then emit_all_force() + if batch_timeout () then emit_all_force () let push_metrics e = - let@() = guard in + let@ () = guard in E_metrics.push e; - if batch_timeout() then emit_all_force() + if batch_timeout () then emit_all_force () let push_logs e = - let@() = guard in + let@ () = guard in E_logs.push e; - if batch_timeout() then emit_all_force() + if batch_timeout () then emit_all_force () let tick () = - if Atomic.get needs_gc_metrics then sample_gc_metrics(); - if batch_timeout() then emit_all_force() + if Atomic.get needs_gc_metrics then sample_gc_metrics (); + if batch_timeout () then emit_all_force () let cleanup = cleanup end in (module M) ) -module Backend(Arg : sig val config : Config.t end)() - : Opentelemetry.Collector.BACKEND -= struct - include Gen_ids.Make() +module Backend (Arg : sig + val config : Config.t +end) +() : Opentelemetry.Collector.BACKEND = struct + include Gen_ids.Make () include (val mk_emitter ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector - let send_trace : Trace.resource_spans list sender = { - send=fun l ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l; - push_trace l; - ret() - } + let send_trace : Trace.resource_spans list sender = + { + send = + (fun l ~ret -> + let@ () = with_lock_ in + if !debug_ then + Format.eprintf "send spans %a@." + (Format.pp_print_list Trace.pp_resource_spans) + l; + push_trace l; + ret ()); + } - let last_sent_metrics = Atomic.make (Mtime_clock.now()) - let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) + let last_sent_metrics = Atomic.make (Mtime_clock.now ()) + + let timeout_sent_metrics = Mtime.Span.(5 * s) + (* send metrics from time to time *) let signal_emit_gc_metrics () = Atomic.set needs_gc_metrics true let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) let last_emit = Atomic.get last_sent_metrics in - let now = Mtime_clock.now() in + let now = Mtime_clock.now () in let add_own_metrics = let elapsed = Mtime.span last_emit now in Mtime.Span.compare elapsed timeout_sent_metrics > 0 @@ -442,51 +478,76 @@ module Backend(Arg : sig val config : Config.t end)() if add_own_metrics then ( let open OT.Metrics in Atomic.set last_sent_metrics now; - [make_resource_metrics [ - sum ~name:"otel-export.dropped" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); - ]; - sum ~name:"otel-export.errors" ~is_monotonic:true [ - int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) - ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); - ]; - ]] - ) else [] + [ + make_resource_metrics + [ + sum ~name:"otel-export.dropped" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); + ]; + sum ~name:"otel-export.errors" ~is_monotonic:true + [ + int + ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) + ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); + ]; + ]; + ] + ) else + [] - let send_metrics : Metrics.resource_metrics list sender = { - send=fun m ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m; + let send_metrics : Metrics.resource_metrics list sender = + { + send = + (fun m ~ret -> + let@ () = with_lock_ in + if !debug_ then + Format.eprintf "send metrics %a@." + (Format.pp_print_list Metrics.pp_resource_metrics) + m; - let m = List.rev_append (additional_metrics()) m in - push_metrics m; - ret() - } + let m = List.rev_append (additional_metrics ()) m in + push_metrics m; + ret ()); + } - let send_logs : Logs.resource_logs list sender = { - send=fun m ~ret -> - let@() = with_lock_ in - if !debug_ then Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m; - push_logs m; - ret() - } + let send_logs : Logs.resource_logs list sender = + { + send = + (fun m ~ret -> + let@ () = with_lock_ in + if !debug_ then + Format.eprintf "send logs %a@." + (Format.pp_print_list Logs.pp_resource_logs) + m; + push_logs m; + ret ()); + } end -let setup_ ~(config:Config.t) () = +let setup_ ~(config : Config.t) () = debug_ := config.debug; - let module B = Backend(struct let config=config end)() in + let module B = + Backend + (struct + let config = config + end) + () + in Opentelemetry.Collector.backend := Some (module B); B.cleanup -let setup ?(config=Config.make()) ?(enable=true) () = +let setup ?(config = Config.make ()) ?(enable = true) () = if enable then ( let cleanup = setup_ ~config () in at_exit cleanup ) -let with_setup ?(config=Config.make()) ?(enable=true) () f = +let with_setup ?(config = Config.make ()) ?(enable = true) () f = if enable then ( let cleanup = setup_ ~config () in Fun.protect ~finally:cleanup f - ) else f() + ) else + f () diff --git a/src/client/opentelemetry_client_ocurl.mli b/src/client/opentelemetry_client_ocurl.mli index 6147d5b1..02404de9 100644 --- a/src/client/opentelemetry_client_ocurl.mli +++ b/src/client/opentelemetry_client_ocurl.mli @@ -1,4 +1,3 @@ - (* TODO: more options from https://opentelemetry.io/docs/reference/specification/protocol/exporter/ diff --git a/src/dune b/src/dune index 933ec2aa..71c22545 100644 --- a/src/dune +++ b/src/dune @@ -8,97 +8,99 @@ ; ### protobuf rules ### (rule - (targets status_types.ml status_types.mli - status_pb.ml status_pb.mli - status_pp.ml status_pp.mli) - (deps (:file status.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} -ml_out . -pp -binary))) + (targets status_types.ml status_types.mli status_pb.ml status_pb.mli + status_pp.ml status_pp.mli) + (deps + (:file status.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -ml_out . -pp -binary))) (rule - (targets common_types.ml common_types.mli - common_pb.ml common_pb.mli - common_pp.ml common_pp.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets common_types.ml common_types.mli common_pb.ml common_pb.mli + common_pp.ml common_pp.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets resource_types.ml resource_types.mli - resource_pb.ml resource_pb.mli - resource_pp.ml resource_pp.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets resource_types.ml resource_types.mli resource_pb.ml resource_pb.mli + resource_pp.ml resource_pp.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets trace_types.ml trace_types.mli - trace_pb.ml trace_pb.mli - trace_pp.ml trace_pp.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets trace_types.ml trace_types.mli trace_pb.ml trace_pb.mli trace_pp.ml + trace_pp.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets metrics_types.ml metrics_types.mli - metrics_pb.ml metrics_pb.mli - metrics_pp.ml metrics_pp.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets metrics_types.ml metrics_types.mli metrics_pb.ml metrics_pb.mli + metrics_pp.ml metrics_pp.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets logs_types.ml logs_types.mli - logs_pb.ml logs_pb.mli - logs_pp.ml logs_pp.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets logs_types.ml logs_types.mli logs_pb.ml logs_pb.mli logs_pp.ml + logs_pp.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/logs/v1/logs.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets metrics_service_types.ml metrics_service_types.mli - metrics_service_pp.ml metrics_service_pp.mli - metrics_service_pb.ml metrics_service_pb.mli) - (deps (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets metrics_service_types.ml metrics_service_types.mli + metrics_service_pp.ml metrics_service_pp.mli metrics_service_pb.ml + metrics_service_pb.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets trace_service_types.ml trace_service_types.mli - trace_service_pp.ml trace_service_pp.mli - trace_service_pb.ml trace_service_pb.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) + (targets trace_service_types.ml trace_service_types.mli trace_service_pp.ml + trace_service_pp.mli trace_service_pb.ml trace_service_pb.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) (rule - (targets logs_service_types.ml logs_service_types.mli - logs_service_pp.ml logs_service_pp.mli - logs_service_pb.ml logs_service_pb.mli) - (deps - (:file %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto) - (source_tree %{project_root}/vendor/opentelemetry-proto/)) - (action (run ocaml-protoc %{file} - -I %{project_root}/vendor/opentelemetry-proto/ - -ml_out . -pp -binary))) - + (targets logs_service_types.ml logs_service_types.mli logs_service_pp.ml + logs_service_pp.mli logs_service_pb.ml logs_service_pb.mli) + (deps + (:file + %{project_root}/vendor/opentelemetry-proto/opentelemetry/proto/collector/logs/v1/logs_service.proto) + (source_tree %{project_root}/vendor/opentelemetry-proto/)) + (action + (run ocaml-protoc %{file} -I %{project_root}/vendor/opentelemetry-proto/ + -ml_out . -pp -binary))) diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index f2293c67..4af27555 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -4,6 +4,14 @@ open Cohttp open Cohttp_lwt module Server : sig + val trace : + ?service_name:string -> + ?attrs:Otel.Span.key_value list -> + ('conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t) -> + 'conn -> + Request.t -> + 'body -> + (Response.t * 'body) Lwt.t (** Trace requests to a Cohttp server. Use it like this: @@ -18,19 +26,8 @@ module Server : sig ~mode:(`TCP (`Port 8080)) (Server.make () ~callback:callback_traced) *) - val trace : - ?service_name:string -> - ?attrs:Otel.Span.key_value list -> - ('conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t) -> - 'conn -> Request.t -> 'body -> (Response.t * 'body) Lwt.t - (** Trace a new internal span. - - Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace - scope in the [x-ocaml-otel-traceparent] header in the request for - convenience. - *) - val with_: + val with_ : ?trace_state:string -> ?service_name:string -> ?attrs:Otel.Span.key_value list -> @@ -40,21 +37,28 @@ module Server : sig Request.t -> (Request.t -> 'a Lwt.t) -> 'a Lwt.t + (** Trace a new internal span. + Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace + scope in the [x-ocaml-otel-traceparent] header in the request for + convenience. + *) + + val get_trace_context : + ?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header added by [trace] and [with_]. *) - val get_trace_context : ?from:[`Internal | `External] -> Request.t -> Otel.Trace.scope option + val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used by [trace] and [with_]. *) - val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t + val remove_trace_context : Request.t -> Request.t (** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and [with_]. *) - val remove_trace_context : Request.t -> Request.t end = struct let attrs_of_request (req : Request.t) = let meth = req |> Request.meth |> Code.string_of_method in @@ -63,25 +67,24 @@ end = struct let ua = Header.get (Request.headers req) "user-agent" in let uri = Request.uri req in List.concat - [ [ ("http.method", `String meth) ] - ; (match host with None -> [] | Some h -> [ ("http.host", `String h) ]) - ; [ ("http.url", `String (Uri.to_string uri)) ] - ; ( match ua with - | None -> - [] - | Some ua -> - [ ("http.user_agent", `String ua) ] ) - ; ( match referer with - | None -> - [] - | Some r -> - [ ("http.request.header.referer", `String r) ] ) + [ + [ "http.method", `String meth ]; + (match host with + | None -> [] + | Some h -> [ "http.host", `String h ]); + [ "http.url", `String (Uri.to_string uri) ]; + (match ua with + | None -> [] + | Some ua -> [ "http.user_agent", `String ua ]); + (match referer with + | None -> [] + | Some r -> [ "http.request.header.referer", `String r ]); ] let attrs_of_response (res : Response.t) = let code = Response.status res in let code = Code.code_of_status code in - [ ("http.status_code", `Int code) ] + [ "http.status_code", `Int code ] let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent" @@ -89,11 +92,12 @@ end = struct let module Traceparent = Otel.Trace_context.Traceparent in let headers = Header.add (Request.headers req) header_x_ocaml_otel_traceparent - (Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id ()) + (Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id + ()) in { req with headers } - let get_trace_context ?(from=`Internal) req = + let get_trace_context ?(from = `Internal) req = let module Traceparent = Otel.Trace_context.Traceparent in let name = match from with @@ -103,125 +107,125 @@ end = struct match Header.get (Request.headers req) name with | None -> None | Some v -> - (match Traceparent.of_value v with - | Ok (trace_id, parent_id) -> - (Some Otel.Trace.{ trace_id; span_id = parent_id; events = []; attrs = []}) - | Error _ -> None) + (match Traceparent.of_value v with + | Ok (trace_id, parent_id) -> + Some + Otel.Trace.{ trace_id; span_id = parent_id; events = []; attrs = [] } + | Error _ -> None) let remove_trace_context req = - let headers = Header.remove (Request.headers req) header_x_ocaml_otel_traceparent in + let headers = + Header.remove (Request.headers req) header_x_ocaml_otel_traceparent + in { req with headers } - let trace ?service_name ?(attrs=[]) callback = - fun conn req body -> + let trace ?service_name ?(attrs = []) callback conn req body = let scope = get_trace_context ~from:`External req in - Otel_lwt.Trace.with_ - ?service_name - "request" - ~kind:Span_kind_server + Otel_lwt.Trace.with_ ?service_name "request" ~kind:Span_kind_server ?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope) ?parent:(Option.map (fun scope -> scope.Otel.Trace.span_id) scope) ~attrs:(attrs @ attrs_of_request req) (fun scope -> let open Lwt.Syntax in let req = set_trace_context scope req in - let* (res, body) = callback conn req body in - Otel.Trace.add_attrs scope (fun () -> attrs_of_response res) ; - Lwt.return (res, body) ) + let* res, body = callback conn req body in + Otel.Trace.add_attrs scope (fun () -> attrs_of_response res); + Lwt.return (res, body)) - let with_ ?trace_state ?service_name ?attrs ?(kind=Otel.Span.Span_kind_internal) ?links name req (f : Request.t -> 'a Lwt.t) = + let with_ ?trace_state ?service_name ?attrs + ?(kind = Otel.Span.Span_kind_internal) ?links name req + (f : Request.t -> 'a Lwt.t) = let scope = get_trace_context ~from:`Internal req in - Otel_lwt.Trace.with_ - ?trace_state - ?service_name - ?attrs - ~kind + Otel_lwt.Trace.with_ ?trace_state ?service_name ?attrs ~kind ?trace_id:(Option.map (fun scope -> scope.Otel.Trace.trace_id) scope) ?parent:(Option.map (fun scope -> scope.Otel.Trace.span_id) scope) - ?links - name + ?links name (fun scope -> let open Lwt.Syntax in let req = set_trace_context scope req in f req) end -let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = +let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = let module Traced = struct - open Lwt.Syntax + open Lwt.Syntax - let attrs_for ~uri ~meth () = - [ ("http.method", `String (Code.string_of_method `GET)) - ; ("http.url", `String (Uri.to_string uri)) - ] + let attrs_for ~uri ~meth () = + [ + "http.method", `String (Code.string_of_method `GET); + "http.url", `String (Uri.to_string uri); + ] - let context_for ~uri ~meth = - let trace_id = match scope with | Some scope -> Some scope.trace_id | None -> None in - let parent = match scope with | Some scope -> Some scope.span_id | None -> None in - let attrs = attrs_for ~uri ~meth () in - (trace_id, parent, attrs) + let context_for ~uri ~meth = + let trace_id = + match scope with + | Some scope -> Some scope.trace_id + | None -> None + in + let parent = + match scope with + | Some scope -> Some scope.span_id + | None -> None + in + let attrs = attrs_for ~uri ~meth () in + trace_id, parent, attrs - let add_traceparent (scope : Otel.Trace.scope) headers = - let module Traceparent = Otel.Trace_context.Traceparent in - let headers = match headers with | None -> Header.init () | Some headers -> headers in - Header.add headers Traceparent.name - (Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id ()) + let add_traceparent (scope : Otel.Trace.scope) headers = + let module Traceparent = Otel.Trace_context.Traceparent in + let headers = + match headers with + | None -> Header.init () + | Some headers -> headers + in + Header.add headers Traceparent.name + (Traceparent.to_value ~trace_id:scope.trace_id ~parent_id:scope.span_id + ()) - type ctx = C.ctx + type ctx = C.ctx - let call ?ctx ?headers ?body ?chunked meth (uri : Uri.t) : (Response.t * Cohttp_lwt.Body.t) Lwt.t = - let (trace_id, parent, attrs) = context_for ~uri ~meth in - Otel_lwt.Trace.with_ "request" - ~kind:Span_kind_client - ?trace_id - ?parent - ~attrs - (fun scope -> - let headers = add_traceparent scope headers in - let* (res, body) = C.call ?ctx ~headers ?body ?chunked meth uri in - Otel.Trace.add_attrs scope (fun () -> - let code = Response.status res in - let code = Code.code_of_status code in - [ ("http.status_code", `Int code) ]) ; - Lwt.return (res, body)) + let call ?ctx ?headers ?body ?chunked meth (uri : Uri.t) : + (Response.t * Cohttp_lwt.Body.t) Lwt.t = + let trace_id, parent, attrs = context_for ~uri ~meth in + Otel_lwt.Trace.with_ "request" ~kind:Span_kind_client ?trace_id ?parent + ~attrs (fun scope -> + let headers = add_traceparent scope headers in + let* res, body = C.call ?ctx ~headers ?body ?chunked meth uri in + Otel.Trace.add_attrs scope (fun () -> + let code = Response.status res in + let code = Code.code_of_status code in + [ "http.status_code", `Int code ]); + Lwt.return (res, body)) - let head ?ctx ?headers uri = - let open Lwt.Infix in - call ?ctx ?headers `HEAD uri >|= fst + let head ?ctx ?headers uri = + let open Lwt.Infix in + call ?ctx ?headers `HEAD uri >|= fst - let get ?ctx ?headers uri = call ?ctx ?headers `GET uri + let get ?ctx ?headers uri = call ?ctx ?headers `GET uri - let delete ?ctx ?body ?chunked ?headers uri = - call ?ctx ?headers ?body ?chunked `DELETE uri + let delete ?ctx ?body ?chunked ?headers uri = + call ?ctx ?headers ?body ?chunked `DELETE uri - let post ?ctx ?body ?chunked ?headers uri = - call ?ctx ?headers ?body ?chunked `POST uri + let post ?ctx ?body ?chunked ?headers uri = + call ?ctx ?headers ?body ?chunked `POST uri - let put ?ctx ?body ?chunked ?headers uri = - call ?ctx ?headers ?body ?chunked `PUT uri + let put ?ctx ?body ?chunked ?headers uri = + call ?ctx ?headers ?body ?chunked `PUT uri - let patch ?ctx ?body ?chunked ?headers uri = - call ?ctx ?headers ?body ?chunked `PATCH uri + let patch ?ctx ?body ?chunked ?headers uri = + call ?ctx ?headers ?body ?chunked `PATCH uri - let post_form ?ctx ?headers ~params uri = - let (trace_id, parent, attrs) = context_for ~uri ~meth:`POST in - Otel_lwt.Trace.with_ "request" - ~kind:Span_kind_client - ?trace_id - ?parent - ~attrs - (fun scope -> - let headers = add_traceparent scope headers in - let* (res, body) = - C.post_form ?ctx ~headers ~params uri - in - Otel.Trace.add_attrs scope (fun () -> - let code = Response.status res in - let code = Code.code_of_status code in - [ ("http.status_code", `Int code) ]) ; - Lwt.return (res, body)) + let post_form ?ctx ?headers ~params uri = + let trace_id, parent, attrs = context_for ~uri ~meth:`POST in + Otel_lwt.Trace.with_ "request" ~kind:Span_kind_client ?trace_id ?parent + ~attrs (fun scope -> + let headers = add_traceparent scope headers in + let* res, body = C.post_form ?ctx ~headers ~params uri in + Otel.Trace.add_attrs scope (fun () -> + let code = Response.status res in + let code = Code.code_of_status code in + [ "http.status_code", `Int code ]); + Lwt.return (res, body)) - let callv = C.callv (* TODO *) - end - in + let callv = C.callv (* TODO *) + end in (module Traced : Cohttp_lwt.S.Client) diff --git a/src/lwt/dune b/src/lwt/dune index 775ec01d..e547e931 100644 --- a/src/lwt/dune +++ b/src/lwt/dune @@ -1,5 +1,5 @@ (library - (name opentelemetry_lwt) - (public_name opentelemetry-lwt) - (synopsis "Lwt frontend for opentelemetry") - (libraries lwt opentelemetry)) + (name opentelemetry_lwt) + (public_name opentelemetry-lwt) + (synopsis "Lwt frontend for opentelemetry") + (libraries lwt opentelemetry)) diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index 611ec37b..37fadef6 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -1,6 +1,5 @@ open Opentelemetry open Lwt.Syntax - module Span_id = Span_id module Trace_id = Trace_id module Event = Event @@ -15,10 +14,8 @@ module Trace = struct include Trace (** Sync span guard *) - let with_ - ?trace_state ?service_name ?(attrs=[]) - ?kind ?trace_id ?parent ?scope ?links - name (f:Trace.scope -> 'a Lwt.t) : 'a Lwt.t = + let with_ ?trace_state ?service_name ?(attrs = []) ?kind ?trace_id ?parent + ?scope ?links name (f : Trace.scope -> 'a Lwt.t) : 'a Lwt.t = let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id @@ -31,30 +28,31 @@ module Trace = struct | None, Some scope -> Some scope.span_id | None, None -> None in - let start_time = Timestamp_ns.now_unix_ns() in - let span_id = Span_id.create() in - let scope = {trace_id;span_id;events=[];attrs} in + let start_time = Timestamp_ns.now_unix_ns () in + let span_id = Span_id.create () in + let scope = { trace_id; span_id; events = []; attrs } in let finally ok = - let status = match ok with + let status = + match ok with | Ok () -> default_status ~code:Status_code_ok () - | Error e -> default_status ~code:Status_code_error ~message:e () in + | Error e -> default_status ~code:Status_code_error ~message:e () + in let span, _ = - Span.create - ?kind ~trace_id ?parent ?links ~id:span_id - ?trace_state ~attrs:scope.attrs ~events:scope.events - ~start_time ~end_time:(Timestamp_ns.now_unix_ns()) - ~status - name in - emit ?service_name [span] + Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state + ~attrs:scope.attrs ~events:scope.events ~start_time + ~end_time:(Timestamp_ns.now_unix_ns ()) + ~status name + in + emit ?service_name [ span ] in Lwt.catch (fun () -> - let* x = f scope in - let () = finally (Ok ()) in - Lwt.return x) + let* x = f scope in + let () = finally (Ok ()) in + Lwt.return x) (fun e -> - let () = finally (Error (Printexc.to_string e)) in - Lwt.fail e) + let () = finally (Error (Printexc.to_string e)) in + Lwt.fail e) end module Metrics = struct diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 6c413a3c..e583179d 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -1,4 +1,3 @@ - (** Opentelemetry types and instrumentation *) (** {2 Wire format} *) @@ -68,11 +67,12 @@ end in nanoseconds. *) module Timestamp_ns = struct type t = int64 + let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600))) (** Current unix timestamp in nanoseconds *) let[@inline] now_unix_ns () : t = - let span = Ptime_clock.now() |> Ptime.to_span in + let span = Ptime_clock.now () |> Ptime.to_span in let d, ps = Ptime.Span.to_d_ps span in let d = Int64.(mul (of_int d) ns_in_a_day) in let ns = Int64.(div ps 1_000L) in @@ -90,6 +90,7 @@ end module Collector = struct open Proto + type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a } (** Sender interface for a message of type [msg]. Inspired from Logs' reporter (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) @@ -102,9 +103,6 @@ module Collector = struct It doesn't mean the event has been collected yet, it could sit in a batch queue for a little while. *) - type 'msg sender = { - send: 'a. 'msg -> ret:(unit -> 'a) -> 'a; - } (** Collector client interface. *) module type BACKEND = sig @@ -139,65 +137,68 @@ module Collector = struct (** Is there a configured backend? *) let[@inline] has_backend () : bool = !backend != None - let send_trace (l:Trace.resource_spans list) ~ret = + let send_trace (l : Trace.resource_spans list) ~ret = match !backend with - | None -> ret() + | None -> ret () | Some (module B) -> B.send_trace.send l ~ret - let send_metrics (l:Metrics.resource_metrics list) ~ret = + let send_metrics (l : Metrics.resource_metrics list) ~ret = match !backend with - | None -> ret() + | None -> ret () | Some (module B) -> B.send_metrics.send l ~ret - let send_logs (l:Logs.resource_logs list) ~ret = + let send_logs (l : Logs.resource_logs list) ~ret = match !backend with - | None -> ret() + | None -> ret () | Some (module B) -> B.send_logs.send l ~ret let rand_bytes_16 () = match !backend with | None -> Bytes.make 16 '?' - | Some (module B) -> B.rand_bytes_16() + | Some (module B) -> B.rand_bytes_16 () let rand_bytes_8 () = match !backend with | None -> Bytes.make 8 '?' - | Some (module B) -> B.rand_bytes_8() + | Some (module B) -> B.rand_bytes_8 () (** Do background work. Call this regularly if the collector doesn't already have a ticker thread or internal timer. *) let tick () = match !backend with | None -> () - | Some (module B) -> B.tick() + | Some (module B) -> B.tick () end module Util_ = struct - let bytes_to_hex (b:bytes) : string = - let i_to_hex (i:int) = - if i < 10 then Char.chr (i + Char.code '0') - else Char.chr (i - 10 + Char.code 'a') + let bytes_to_hex (b : bytes) : string = + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') in let res = Bytes.create (2 * Bytes.length b) in - for i = 0 to Bytes.length b-1 do + for i = 0 to Bytes.length b - 1 do let n = Char.code (Bytes.get b i) in Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); - Bytes.set res (2 * i + 1) (i_to_hex (n land 0x0f)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) done; Bytes.unsafe_to_string res - let bytes_of_hex (s:string) : bytes = + let bytes_of_hex (s : string) : bytes = let n_of_c = function | '0' .. '9' as c -> Char.code c - Char.code '0' | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' | _ -> raise (Invalid_argument "invalid hex char") in - if (String.length s mod 2 <> 0) then raise (Invalid_argument "hex sequence must be of even length"); + if String.length s mod 2 <> 0 then + raise (Invalid_argument "hex sequence must be of even length"); let res = Bytes.make (String.length s / 2) '\x00' in - for i=0 to String.length s/2-1 do - let n1 = n_of_c (String.get s (2*i)) in - let n2 = n_of_c (String.get s (2*i+1)) in + for i = 0 to (String.length s / 2) - 1 do + let n1 = n_of_c (String.get s (2 * i)) in + let n2 = n_of_c (String.get s ((2 * i) + 1)) in let n = (n1 lsl 4) lor n2 in Bytes.set res i (Char.chr n) done; @@ -211,36 +212,66 @@ end This 16 bytes identifier is shared by all spans in one trace. *) module Trace_id : sig type t + val create : unit -> t + val to_bytes : t -> bytes + val of_bytes : bytes -> t + val to_hex : t -> string + val of_hex : string -> t end = struct open Proto.Trace + type t = bytes + let to_bytes self = self - let create () : t = Collector.rand_bytes_16() - let of_bytes b = if Bytes.length b=16 then b else raise (Invalid_argument "trace IDs must be 16 bytes in length") + + let create () : t = Collector.rand_bytes_16 () + + let of_bytes b = + if Bytes.length b = 16 then + b + else + raise (Invalid_argument "trace IDs must be 16 bytes in length") + let to_hex self = Util_.bytes_to_hex self + let of_hex s = of_bytes (Util_.bytes_of_hex s) end (** Unique ID of a span. *) module Span_id : sig type t + val create : unit -> t + val to_bytes : t -> bytes + val of_bytes : bytes -> t + val to_hex : t -> string + val of_hex : string -> t end = struct open Proto.Trace + type t = bytes + let to_bytes self = self - let create () : t = Collector.rand_bytes_8() - let of_bytes b = if Bytes.length b=8 then b else raise (Invalid_argument "span IDs must be 8 bytes in length") + + let create () : t = Collector.rand_bytes_8 () + + let of_bytes b = + if Bytes.length b = 8 then + b + else + raise (Invalid_argument "span IDs must be 8 bytes in length") + let to_hex self = Util_.bytes_to_hex self + let of_hex s = of_bytes (Util_.bytes_of_hex s) end @@ -251,12 +282,16 @@ module Conventions = struct module Process = struct module Runtime = struct let name = "process.runtime.name" + let version = "process.runtime.version" + let description = "process.runtime.description" end end + module Service = struct let name = "service.name" + let namespace = "service.namespace" end end @@ -267,9 +302,13 @@ module Conventions = struct module Ocaml = struct module GC = struct let compactions = "process.runtime.ocaml.gc.compactions" + let major_collections = "process.runtime.ocaml.gc.major_collections" + let major_heap = "process.runtime.ocaml.gc.major_heap" + let minor_allocated = "process.runtime.ocaml.gc.minor_allocated" + let minor_collections = "process.runtime.ocaml.gc.minor_collections" end end @@ -278,11 +317,17 @@ module Conventions = struct end end -type value = [`Int of int | `String of string | `Bool of bool | `None] +type value = + [ `Int of int + | `String of string + | `Bool of bool + | `None + ] type key_value = string * value (**/**) + let _conv_value = let open Proto.Common in function @@ -294,7 +339,8 @@ let _conv_value = (**/**) (**/**) -let _conv_key_value (k,v) = + +let _conv_key_value (k, v) = let open Proto.Common in let value = _conv_value v in default_key_value ~key:k ~value () @@ -307,29 +353,30 @@ let _conv_key_value (k,v) = module Globals = struct open Proto.Common - let service_name = ref "unknown_service" (** Main service name metadata *) + let service_name = ref "unknown_service" - let service_namespace = ref None (** Namespace for the service *) + let service_namespace = ref None let instrumentation_library = - default_instrumentation_library - ~version:"%%VERSION%%" + default_instrumentation_library ~version:"%%VERSION%%" ~name:"ocaml-opentelemetry" () (** Global attributes, initially set via OTEL_RESOURCE_ATTRIBUTES and modifiable by the user code. They will be attached to each outgoing metrics/traces. *) let global_attributes : key_value list ref = - let parse_pair s = match String.split_on_char '=' s with - | [a;b] -> default_key_value ~key:a ~value:(Some (String_value b)) () + let parse_pair s = + match String.split_on_char '=' s with + | [ a; b ] -> default_key_value ~key:a ~value:(Some (String_value b)) () | _ -> failwith (Printf.sprintf "invalid attribute: %S" s) in - ref @@ + ref + @@ try - Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" |> String.split_on_char ',' - |> List.map parse_pair + Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" + |> String.split_on_char ',' |> List.map parse_pair with _ -> [] (* add global attributes to this list *) @@ -337,17 +384,20 @@ module Globals = struct let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in List.rev_append (List.filter not_redundant !global_attributes) into - let mk_attributes ?(service_name = !service_name) ?(attrs=[]) () : _ list = + let mk_attributes ?(service_name = !service_name) ?(attrs = []) () : _ list = let l = List.map _conv_key_value attrs in let l = default_key_value ~key:Conventions.Attributes.Service.name - ~value:(Some (String_value service_name)) () :: l + ~value:(Some (String_value service_name)) () + :: l in - let l = match !service_namespace with + let l = + match !service_namespace with | None -> l | Some v -> default_key_value ~key:Conventions.Attributes.Service.namespace - ~value:(Some (String_value v)) () :: l + ~value:(Some (String_value v)) () + :: l in l |> merge_global_attributes_ end @@ -360,22 +410,18 @@ end belong in a span. *) module Event : sig open Proto.Trace + type t = span_event val make : - ?time_unix_nano:Timestamp_ns.t -> - ?attrs:key_value list -> - string -> - t - + ?time_unix_nano:Timestamp_ns.t -> ?attrs:key_value list -> string -> t end = struct open Proto.Trace + type t = span_event - let make - ?(time_unix_nano=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - (name:string) : t = + let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = []) + (name : string) : t = let attrs = List.map _conv_key_value attrs in default_span_event ~time_unix_nano ~name ~attributes:attrs () end @@ -390,6 +436,7 @@ module Span : sig open Proto.Trace type t = span + type id = Span_id.t type nonrec kind = span_span_kind = @@ -412,7 +459,8 @@ module Span : sig val id : t -> Span_id.t - type key_value = string * [`Int of int | `String of string | `Bool of bool | `None] + type key_value = + string * [ `Int of int | `String of string | `Bool of bool | `None ] val create : ?kind:kind -> @@ -426,8 +474,9 @@ module Span : sig ?links:(Trace_id.t * Span_id.t * string) list -> start_time:Timestamp_ns.t -> end_time:Timestamp_ns.t -> - string -> t * id - (** [create ~trace_id name] creates a new span with its unique ID. + string -> + t * id + (** [create ~trace_id name] creates a new span with its unique ID. @param trace_id the trace this belongs to @param parent parent span, if any @param links list of links to other spans, each with their trace state @@ -436,6 +485,7 @@ end = struct open Proto.Trace type t = span + type id = Span_id.t type nonrec kind = span_span_kind = @@ -446,7 +496,8 @@ end = struct | Span_kind_producer | Span_kind_consumer - type key_value = string * [`Int of int | `String of string | `Bool of bool | `None] + type key_value = + string * [ `Int of int | `String of string | `Bool of bool | `None ] type nonrec status_code = status_status_code = | Status_code_unset @@ -458,40 +509,26 @@ end = struct code: status_code; } - let id self = Span_id.of_bytes self.span_id - let create - ?(kind=Span_kind_unspecified) - ?(id=Span_id.create()) - ?trace_state - ?(attrs=[]) - ?(events=[]) - ?status - ~trace_id ?parent ?(links=[]) - ~start_time ~end_time - name : t * id = + let create ?(kind = Span_kind_unspecified) ?(id = Span_id.create ()) + ?trace_state ?(attrs = []) ?(events = []) ?status ~trace_id ?parent + ?(links = []) ~start_time ~end_time name : t * id = let trace_id = Trace_id.to_bytes trace_id in let parent_span_id = Option.map Span_id.to_bytes parent in let attributes = List.map _conv_key_value attrs in let links = List.map - (fun (trace_id,span_id,trace_state) -> - let trace_id = Trace_id.to_bytes trace_id in - let span_id = Span_id.to_bytes span_id in - default_span_link ~trace_id ~span_id ~trace_state()) + (fun (trace_id, span_id, trace_state) -> + let trace_id = Trace_id.to_bytes trace_id in + let span_id = Span_id.to_bytes span_id in + default_span_link ~trace_id ~span_id ~trace_state ()) links in let span = - default_span - ~trace_id ?parent_span_id - ~span_id:(Span_id.to_bytes id) - ~attributes ~events - ?trace_state ~status - ~kind ~name ~links - ~start_time_unix_nano:start_time - ~end_time_unix_nano:end_time - () + default_span ~trace_id ?parent_span_id ~span_id:(Span_id.to_bytes id) + ~attributes ~events ?trace_state ~status ~kind ~name ~links + ~start_time_unix_nano:start_time ~end_time_unix_nano:end_time () in span, id end @@ -507,49 +544,47 @@ module Trace = struct let make_resource_spans ?service_name ?attrs spans = let ils = default_instrumentation_library_spans - ~instrumentation_library:(Some Globals.instrumentation_library) - ~spans () in + ~instrumentation_library:(Some Globals.instrumentation_library) ~spans + () + in let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in - default_resource_spans - ~resource:(Some resource) ~instrumentation_library_spans:[ils] () + default_resource_spans ~resource:(Some resource) + ~instrumentation_library_spans:[ ils ] () (** Sync emitter *) - let emit ?service_name ?attrs (spans:span list) : unit = + let emit ?service_name ?attrs (spans : span list) : unit = let rs = make_resource_spans ?service_name ?attrs spans in - Collector.send_trace [rs] ~ret:(fun () -> ()) + Collector.send_trace [ rs ] ~ret:(fun () -> ()) - (** Scope to be used with {!with_}. *) type scope = { trace_id: Trace_id.t; span_id: Span_id.t; mutable events: Event.t list; - mutable attrs: Span.key_value list + mutable attrs: Span.key_value list; } + (** Scope to be used with {!with_}. *) (** Add an event to the scope. It will be aggregated into the span. Note that this takes a function that produces an event, and will only call it if there is an instrumentation backend. *) - let[@inline] add_event (scope:scope) (ev:unit -> Event.t) : unit = - if Collector.has_backend() then ( - scope.events <- ev() :: scope.events - ) + let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit = + if Collector.has_backend () then scope.events <- ev () :: scope.events (** Add an attr to the scope. It will be aggregated into the span. Note that this takes a function that produces attributes, and will only call it if there is an instrumentation backend. *) - let[@inline] add_attrs (scope:scope) (attrs:unit -> Span.key_value list) : unit = - if Collector.has_backend() then ( + let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) : + unit = + if Collector.has_backend () then scope.attrs <- List.rev_append (attrs ()) scope.attrs - ) (** Sync span guard *) - let with_ - ?trace_state ?service_name ?(attrs: (string*[ 'a) : 'a = + let with_ ?trace_state ?service_name + ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope + ?links name (f : scope -> 'a) : 'a = let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id @@ -562,25 +597,26 @@ module Trace = struct | None, Some scope -> Some scope.span_id | None, None -> None in - let start_time = Timestamp_ns.now_unix_ns() in - let span_id = Span_id.create() in - let scope = {trace_id;span_id;events=[]; attrs} in + let start_time = Timestamp_ns.now_unix_ns () in + let span_id = Span_id.create () in + let scope = { trace_id; span_id; events = []; attrs } in (* called once we're done, to emit a span *) let finally res = - let status = match res with + let status = + match res with | Ok () -> default_status ~code:Status_code_ok () - | Error e -> default_status ~code:Status_code_error ~message:e () in + | Error e -> default_status ~code:Status_code_error ~message:e () + in let span, _ = (* TODO: should the attrs passed to with_ go on the Span (in Span.create) or on the ResourceSpan (in emit)? (question also applies to Opentelemetry_lwt.Trace.with) *) - Span.create - ?kind ~trace_id ?parent ?links ~id:span_id - ?trace_state ~attrs:scope.attrs ~events:scope.events - ~start_time ~end_time:(Timestamp_ns.now_unix_ns()) - ~status - name in - emit ?service_name [span]; + Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state + ~attrs:scope.attrs ~events:scope.events ~start_time + ~end_time:(Timestamp_ns.now_unix_ns ()) + ~status name + in + emit ?service_name [ span ] in try let x = f scope in @@ -601,31 +637,28 @@ module Metrics = struct type t = Metrics_types.metric - let _program_start = Timestamp_ns.now_unix_ns() + let _program_start = Timestamp_ns.now_unix_ns () (** Number data point, as a float *) - let float ?(start_time_unix_nano=_program_start) - ?(now=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - (d:float) : number_data_point = + let float ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (d : float) : + number_data_point = let attributes = attrs |> List.map _conv_key_value in - default_number_data_point - ~start_time_unix_nano ~time_unix_nano:now - ~attributes - ~value:(As_double d) () + default_number_data_point ~start_time_unix_nano ~time_unix_nano:now + ~attributes ~value:(As_double d) () (** Number data point, as an int *) - let int ?(start_time_unix_nano=_program_start) - ?(now=Timestamp_ns.now_unix_ns()) - ?(attrs=[]) - (i:int) : number_data_point = + let int ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (i : int) : + number_data_point = let attributes = attrs |> List.map _conv_key_value in default_number_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes - ~value:(As_int (Int64.of_int i)) () + ~value:(As_int (Int64.of_int i)) + () (** Aggregation of a scalar metric, always with the current value *) - let gauge ~name ?description ?unit_ (l:number_data_point list) : t = + let gauge ~name ?description ?unit_ (l : number_data_point list) : t = let data = Gauge (default_gauge ~data_points:l ()) in default_metric ~name ?description ?unit_ ~data () @@ -636,45 +669,46 @@ module Metrics = struct (** Sum of all reported measurements over a time interval *) let sum ~name ?description ?unit_ - ?(aggregation_temporality=Aggregation_temporality_cumulative) - ?is_monotonic - (l:number_data_point list) : t = + ?(aggregation_temporality = Aggregation_temporality_cumulative) + ?is_monotonic (l : number_data_point list) : t = let data = - Sum (default_sum ~data_points:l ?is_monotonic - ~aggregation_temporality ()) in + Sum (default_sum ~data_points:l ?is_monotonic ~aggregation_temporality ()) + in default_metric ~name ?description ?unit_ ~data () (* TODO - let histogram ~name ?description ?unit_ - ?aggregation_temporality - (l:number_data_point list) : t = - let data h= - Histogram (default_histogram ~data_points:l - ?aggregation_temporality ()) in - default_metric ~name ?description ?unit_ ~data () - *) + let histogram ~name ?description ?unit_ + ?aggregation_temporality + (l:number_data_point list) : t = + let data h= + Histogram (default_histogram ~data_points:l + ?aggregation_temporality ()) in + default_metric ~name ?description ?unit_ ~data () + *) (* TODO: exponential history *) (* TODO: summary *) (* TODO: exemplar *) (** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *) - let make_resource_metrics ?service_name ?attrs (l:t list) : resource_metrics = + let make_resource_metrics ?service_name ?attrs (l : t list) : resource_metrics + = let lm = default_instrumentation_library_metrics ~instrumentation_library:(Some Globals.instrumentation_library) - ~metrics:l () in + ~metrics:l () + in let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in - default_resource_metrics - ~instrumentation_library_metrics:[lm] ~resource:(Some resource) () + default_resource_metrics ~instrumentation_library_metrics:[ lm ] + ~resource:(Some resource) () (** Emit some metrics to the collector (sync). This blocks until the backend has pushed the metrics into some internal queue, or discarded them. *) - let emit ?attrs (l:t list) : unit = + let emit ?attrs (l : t list) : unit = let rm = make_resource_metrics ?attrs l in - Collector.send_metrics [rm] ~ret:ignore + Collector.send_metrics [ rm ] ~ret:ignore end (** {2 Logs} *) @@ -724,53 +758,48 @@ module Logs = struct let pp_flags = Logs_pp.pp_log_record_flags (** Make a single log entry *) - let make - ?time ?(observed_time_unix_nano=Timestamp_ns.now_unix_ns()) - ?severity ?log_level ?flags ?trace_id ?span_id - (body:value) : t = - let time_unix_nano = match time with + let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ()) + ?severity ?log_level ?flags ?trace_id ?span_id (body : value) : t = + let time_unix_nano = + match time with | None -> observed_time_unix_nano | Some t -> t in let trace_id = Option.map Trace_id.to_bytes trace_id in let span_id = Option.map Span_id.to_bytes span_id in let body = _conv_value body in - default_log_record - ~time_unix_nano ~observed_time_unix_nano - ?severity_number:severity ?severity_text:log_level - ?flags ?trace_id ?span_id ~body () + default_log_record ~time_unix_nano ~observed_time_unix_nano + ?severity_number:severity ?severity_text:log_level ?flags ?trace_id + ?span_id ~body () (** Make a log entry whose body is a string *) - let make_str - ?time ?observed_time_unix_nano - ?severity ?log_level ?flags ?trace_id ?span_id - (body:string) : t = - make - ?time ?observed_time_unix_nano - ?severity ?log_level ?flags ?trace_id ?span_id - (`String body) + let make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags + ?trace_id ?span_id (body : string) : t = + make ?time ?observed_time_unix_nano ?severity ?log_level ?flags ?trace_id + ?span_id (`String body) (** Make a log entry with format *) - let make_strf - ?time ?observed_time_unix_nano - ?severity ?log_level ?flags ?trace_id ?span_id - fmt = + let make_strf ?time ?observed_time_unix_nano ?severity ?log_level ?flags + ?trace_id ?span_id fmt = Format.kasprintf (fun bod -> - make_str - ?time ?observed_time_unix_nano - ?severity ?log_level ?flags ?trace_id ?span_id bod) + make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags + ?trace_id ?span_id bod) fmt - let emit ?service_name ?attrs (l:t list) : unit = + let emit ?service_name ?attrs (l : t list) : unit = let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in - let ll = default_instrumentation_library_logs + let ll = + default_instrumentation_library_logs ~instrumentation_library:(Some Globals.instrumentation_library) - ~log_records:l () in - let rl = default_resource_logs ~resource:(Some resource) - ~instrumentation_library_logs:[ll] () in - Collector.send_logs [rl] ~ret:ignore + ~log_records:l () + in + let rl = + default_resource_logs ~resource:(Some resource) + ~instrumentation_library_logs:[ ll ] () + in + Collector.send_logs [ rl ] ~ret:ignore end (** {2 Utils} *) @@ -780,12 +809,10 @@ end https://www.w3.org/TR/trace-context/ *) module Trace_context = struct - (** The traceparent header https://www.w3.org/TR/trace-context/#traceparent-header *) module Traceparent = struct - let name = "traceparent" (** Parse the value of the traceparent header. @@ -816,33 +843,40 @@ module Trace_context = struct let consume expected ~offset ~or_ = let len = String.length expected in let* str, offset = blit ~offset ~len ~or_ in - if str = expected then Ok offset else Error or_ + if str = expected then + Ok offset + else + Error or_ in let offset = 0 in let* offset = consume "00" ~offset ~or_:"Expected version 00" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in - let* trace_id, offset = blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id" in + let* trace_id, offset = + blit ~offset ~len:32 ~or_:"Expected 32-digit trace-id" + in let* trace_id = match Trace_id.of_hex trace_id with | trace_id -> Ok trace_id | exception Invalid_argument _ -> Error "Expected hex-encoded trace-id" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in - let* parent_id, offset = blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id" in + let* parent_id, offset = + blit ~offset ~len:16 ~or_:"Expected 16-digit parent-id" + in let* parent_id = match Span_id.of_hex parent_id with | parent_id -> Ok parent_id | exception Invalid_argument _ -> Error "Expected hex-encoded parent-id" in let* offset = consume "-" ~offset ~or_:"Expected delimiter" in - let* _flags, _offset = blit ~offset ~len:2 ~or_:"Expected 2-digit flags" in + let* _flags, _offset = + blit ~offset ~len:2 ~or_:"Expected 2-digit flags" + in Ok (trace_id, parent_id) let to_value ~(trace_id : Trace_id.t) ~(parent_id : Span_id.t) () : string = - Printf.sprintf "00-%s-%s-00" - (Trace_id.to_hex trace_id) + Printf.sprintf "00-%s-%s-00" (Trace_id.to_hex trace_id) (Span_id.to_hex parent_id) - end end @@ -862,28 +896,31 @@ end = struct (** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *) let runtime_attributes = lazy - Conventions.Attributes.[ - (Process.Runtime.name, `String "ocaml"); - (Process.Runtime.version, `String Sys.ocaml_version); - ] + Conventions.Attributes. + [ + Process.Runtime.name, `String "ocaml"; + Process.Runtime.version, `String Sys.ocaml_version; + ] let get_runtime_attributes () = Lazy.force runtime_attributes let basic_setup () = - let trigger() = + let trigger () = match !Collector.backend with | None -> () - | Some (module C) -> C.signal_emit_gc_metrics() + | Some (module C) -> C.signal_emit_gc_metrics () in ignore (Gc.create_alarm trigger : Gc.alarm) let bytes_per_word = Sys.word_size / 8 + let word_to_bytes n = n * bytes_per_word + let word_to_bytes_f n = n *. float bytes_per_word let get_metrics () : Metrics.t list = let gc = Gc.quick_stat () in - let now = Timestamp_ns.now_unix_ns() in + let now = Timestamp_ns.now_unix_ns () in let open Metrics in let open Conventions.Metrics in [ @@ -891,8 +928,7 @@ end = struct [ int ~now (word_to_bytes gc.Gc.heap_words) ]; sum ~name:Process.Runtime.Ocaml.GC.minor_allocated ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - ~unit_:"B" + ~is_monotonic:true ~unit_:"B" [ float ~now (word_to_bytes_f gc.Gc.minor_words) ]; sum ~name:Process.Runtime.Ocaml.GC.minor_collections ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative diff --git a/tests/bin/cohttp_client.ml b/tests/bin/cohttp_client.ml index 9070b30a..9279c977 100644 --- a/tests/bin/cohttp_client.ml +++ b/tests/bin/cohttp_client.ml @@ -1,25 +1,29 @@ module T = Opentelemetry module Otel_lwt = Opentelemetry_lwt + let spf = Printf.sprintf -let (let@) f x = f x + +let ( let@ ) f x = f x let sleep_inner = ref 0.1 + let sleep_outer = ref 2.0 -let mk_client ~scope = Opentelemetry_cohttp_lwt.client ~scope (module Cohttp_lwt_unix.Client) +let mk_client ~scope = + Opentelemetry_cohttp_lwt.client ~scope (module Cohttp_lwt_unix.Client) let run () = - Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url()); + Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url ()); let open Lwt.Syntax in let rec go () = let@ scope = - Otel_lwt.Trace.with_ - ~kind:T.Span.Span_kind_producer - "loop.outer" + Otel_lwt.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" in let* () = Lwt_unix.sleep !sleep_outer in let module C = (val mk_client ~scope) in - let* (res, body) = C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") in + let* res, body = + C.get (Uri.of_string "https://enec1hql02hz.x.pipedream.net") + in let* () = Cohttp_lwt.Body.drain_body body in go () in @@ -34,26 +38,40 @@ let () = let thread = ref true in let batch_traces = ref 400 in let batch_metrics = ref 3 in - let opts = [ - "--debug", Arg.Bool ((:=) debug), " enable debug output"; - "--thread", Arg.Bool ((:=) thread), " use a background thread"; - "--batch-traces", Arg.Int ((:=) batch_traces), " size of traces batch"; - "--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch"; + let opts = + [ + "--debug", Arg.Bool (( := ) debug), " enable debug output"; + "--thread", Arg.Bool (( := ) thread), " use a background thread"; + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + ( "--batch-metrics", + Arg.Int (( := ) batch_metrics), + " size of metrics batch" ); "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; - ] |> Arg.align in + ] + |> Arg.align + in Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; - let some_if_nzero r = if !r > 0 then Some !r else None in - let config = Opentelemetry_client_ocurl.Config.make - ~debug:!debug - ~batch_traces:(some_if_nzero batch_traces) - ~batch_metrics:(some_if_nzero batch_metrics) - ~thread:!thread () in + let some_if_nzero r = + if !r > 0 then + Some !r + else + None + in + let config = + Opentelemetry_client_ocurl.Config.make ~debug:!debug + ~batch_traces:(some_if_nzero batch_traces) + ~batch_metrics:(some_if_nzero batch_metrics) + ~thread:!thread () + in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; - Format.printf "Check HTTP requests at https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@."; + Format.printf + "Check HTTP requests at \ + https://requestbin.com/r/enec1hql02hz/26qShWryt5vJc1JfrOwalhr5vQt@."; - Opentelemetry_client_ocurl.with_setup ~config () (fun () -> Lwt_main.run (run ())) + Opentelemetry_client_ocurl.with_setup ~config () (fun () -> + Lwt_main.run (run ())) diff --git a/tests/bin/dune b/tests/bin/dune index 639b3ec4..15644506 100644 --- a/tests/bin/dune +++ b/tests/bin/dune @@ -6,4 +6,5 @@ (executable (name cohttp_client) (modules cohttp_client) - (libraries cohttp-lwt-unix opentelemetry opentelemetry-client-ocurl opentelemetry-cohttp-lwt)) + (libraries cohttp-lwt-unix opentelemetry opentelemetry-client-ocurl + opentelemetry-cohttp-lwt)) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index c0c42996..b00dd5e4 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -1,51 +1,53 @@ - module T = Opentelemetry + let spf = Printf.sprintf -let (let@) f x = f x + +let ( let@ ) f x = f x let sleep_inner = ref 0.1 + let sleep_outer = ref 2.0 let run () = - Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url()); - T.GC_metrics.basic_setup(); + Printf.printf "collector is on %S\n%!" (Opentelemetry_client_ocurl.get_url ()); + T.GC_metrics.basic_setup (); let i = ref 0 in while true do let@ scope = - T.Trace.with_ - ~kind:T.Span.Span_kind_producer - "loop.outer" ~attrs:["i", `Int !i] in + T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" + ~attrs:[ "i", `Int !i ] + in - for j=0 to 4 do - - let@ scope = T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope - ~attrs:["j", `Int j] - "loop.inner" in + for j = 0 to 4 do + let@ scope = + T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope + ~attrs:[ "j", `Int j ] + "loop.inner" + in Unix.sleepf !sleep_outer; - T.Logs.(emit [ - make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id - ~severity:Severity_number_info - "inner at %d" j - ]); + T.Logs.( + emit + [ + make_strf ~trace_id:scope.trace_id ~span_id:scope.span_id + ~severity:Severity_number_info "inner at %d" j; + ]); incr i; - (try - let@ _ = - T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope - "alloc" in + try + let@ _ = T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope "alloc" in (* allocate some stuff *) let _arr = Sys.opaque_identity @@ Array.make (25 * 25551) 42.0 in ignore _arr; Unix.sleepf !sleep_inner; - if j=4 && !i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) + if j = 4 && !i mod 13 = 0 then failwith "oh no"; - T.Trace.add_event scope (fun()->T.Event.make "done with alloc"); - with Failure _ -> - ()); - done; + (* simulate a failure *) + T.Trace.add_event scope (fun () -> T.Event.make "done with alloc") + with Failure _ -> () + done done let () = @@ -57,23 +59,34 @@ let () = let thread = ref true in let batch_traces = ref 400 in let batch_metrics = ref 3 in - let opts = [ - "--debug", Arg.Bool ((:=) debug), " enable debug output"; - "--thread", Arg.Bool ((:=) thread), " use a background thread"; - "--batch-traces", Arg.Int ((:=) batch_traces), " size of traces batch"; - "--batch-metrics", Arg.Int ((:=) batch_metrics), " size of metrics batch"; - "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; - "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; - ] |> Arg.align in + let opts = + [ + "--debug", Arg.Bool (( := ) debug), " enable debug output"; + "--thread", Arg.Bool (( := ) thread), " use a background thread"; + "--batch-traces", Arg.Int (( := ) batch_traces), " size of traces batch"; + ( "--batch-metrics", + Arg.Int (( := ) batch_metrics), + " size of metrics batch" ); + "--sleep-inner", Arg.Set_float sleep_inner, " sleep (in s) in inner loop"; + "--sleep-outer", Arg.Set_float sleep_outer, " sleep (in s) in outer loop"; + ] + |> Arg.align + in Arg.parse opts (fun _ -> ()) "emit1 [opt]*"; - let some_if_nzero r = if !r > 0 then Some !r else None in - let config = Opentelemetry_client_ocurl.Config.make - ~debug:!debug + let some_if_nzero r = + if !r > 0 then + Some !r + else + None + in + let config = + Opentelemetry_client_ocurl.Config.make ~debug:!debug ~batch_traces:(some_if_nzero batch_traces) ~batch_metrics:(some_if_nzero batch_metrics) - ~thread:!thread () in + ~thread:!thread () + in Format.printf "@[<2>sleep outer: %.3fs,@ sleep inner: %.3fs,@ config: %a@]@." !sleep_outer !sleep_inner Opentelemetry_client_ocurl.Config.pp config; diff --git a/tests/test_trace_context.ml b/tests/test_trace_context.ml index 83cf5bd3..de486835 100644 --- a/tests/test_trace_context.ml +++ b/tests/test_trace_context.ml @@ -2,39 +2,49 @@ open Opentelemetry let pp_traceparent fmt (trace_id, parent_id) = let open Format in - fprintf fmt "trace_id:%S parent_id:%S" - (Trace_id.to_hex trace_id) + fprintf fmt "trace_id:%S parent_id:%S" (Trace_id.to_hex trace_id) (Span_id.to_hex parent_id) - let test_of_value str = let open Format in - printf "@[Trace_context.Traceparent.of_value %S:@ %a@]@." - str - (pp_print_result ~ok:(fun fmt (trace_id, parent_id) -> + printf "@[Trace_context.Traceparent.of_value %S:@ %a@]@." str + (pp_print_result + ~ok:(fun fmt (trace_id, parent_id) -> fprintf fmt "Ok %a" pp_traceparent (trace_id, parent_id)) ~error:(fun fmt msg -> fprintf fmt "Error %S" msg)) (Trace_context.Traceparent.of_value str) let () = test_of_value "xx" + let () = test_of_value "00" + let () = test_of_value "00-xxxx" + let () = test_of_value "00-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + let () = test_of_value "00-0123456789abcdef0123456789abcdef" + let () = test_of_value "00-0123456789abcdef0123456789abcdef-xxxx" + let () = test_of_value "00-0123456789abcdef0123456789abcdef-xxxxxxxxxxxxxxxx" + let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef" + let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef-" + let () = test_of_value "00-0123456789abcdef0123456789abcdef-0123456789abcdef-00" + let () = test_of_value "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" let () = print_endline "" let test_to_value trace_id parent_id = let open Format in - printf "@[Trace_context.Traceparent.to_value %a:@ %S@]@." - pp_traceparent (trace_id, parent_id) + printf "@[Trace_context.Traceparent.to_value %a:@ %S@]@." pp_traceparent + (trace_id, parent_id) (Trace_context.Traceparent.to_value ~trace_id ~parent_id ()) - -let () = test_to_value (Trace_id.of_hex "4bf92f3577b34da6a3ce929d0e0e4736") (Span_id.of_hex "00f067aa0ba902b7") +let () = + test_to_value + (Trace_id.of_hex "4bf92f3577b34da6a3ce929d0e0e4736") + (Span_id.of_hex "00f067aa0ba902b7")