diff --git a/src/core/common_.ml b/src/core/common_.ml new file mode 100644 index 00000000..30bb39d2 --- /dev/null +++ b/src/core/common_.ml @@ -0,0 +1,5 @@ +let spf = Printf.sprintf + +module Proto = Opentelemetry_proto +module Atomic = Opentelemetry_atomic.Atomic +module Ambient_context = Opentelemetry_ambient_context diff --git a/src/core/context.ml b/src/core/context.ml new file mode 100644 index 00000000..883f646e --- /dev/null +++ b/src/core/context.ml @@ -0,0 +1,17 @@ +(** The context used in OTEL operations, to carry the current trace, etc. + + https://opentelemetry.io/docs/specs/otel/context/ *) + +type t = Hmap.t +(** The context type. We use [Hmap.t] as it's standard and widely used. *) + +type 'a key = 'a Hmap.key + +let set = Hmap.add + +(** @raise Invalid_argument if not present *) +let get_exn : 'a key -> t -> 'a = Hmap.get + +let get : 'a key -> t -> 'a option = Hmap.find + +let[@inline] new_key () : 'a key = Hmap.Key.create () diff --git a/src/core/conventions.ml b/src/core/conventions.ml new file mode 100644 index 00000000..c4002cb1 --- /dev/null +++ b/src/core/conventions.ml @@ -0,0 +1,130 @@ +(** Semantic conventions. + + {{:https://opentelemetry.io/docs/specs/semconv/} + https://opentelemetry.io/docs/specs/semconv/} *) + +module Attributes = struct + module Process = struct + module Runtime = struct + let name = "process.runtime.name" + + let version = "process.runtime.version" + + let description = "process.runtime.description" + end + end + + (** https://opentelemetry.io/docs/specs/semconv/attributes-registry/code/ *) + module Code = struct + (** Int *) + let column = "code.column" + + let filepath = "code.filepath" + + let function_ = "code.function" + + (** int *) + let line = "code.lineno" + + let namespace = "code.namespace" + + let stacktrace = "code.stacktrace" + end + + module Service = struct + let name = "service.name" + + let namespace = "service.namespace" + + let instance_id = "service.instance.id" + + let version = "service.version" + end + + module HTTP = struct + let error_type = "error.type" + + let request_method = "http.request.method" + + let route = "http.route" + + let url_full = "url.full" + + (** HTTP status code, int *) + let response_status_code = "http.response.status_code" + + let server_address = "server.address" + + let server_port = "server.port" + + (** http or https *) + let url_scheme = "url.scheme" + end + + (** https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/host.md + *) + module Host = struct + let id = "host.id" + + let name = "host.name" + + let type_ = "host.type" + + let arch = "host.arch" + + let ip = "host.ip" + + let mac = "host.mac" + + let image_id = "host.image.id" + + let image_name = "host.image.name" + + let image_version = "host.image.version" + end +end + +module Metrics = struct + module Process = struct + module Runtime = struct + module Ocaml = struct + module GC = struct + let compactions = "process.runtime.ocaml.gc.compactions" + + let major_collections = "process.runtime.ocaml.gc.major_collections" + + let major_heap = "process.runtime.ocaml.gc.major_heap" + + let minor_allocated = "process.runtime.ocaml.gc.minor_allocated" + + let minor_collections = "process.runtime.ocaml.gc.minor_collections" + end + end + end + end + + (** https://opentelemetry.io/docs/specs/semconv/http/ *) + module HTTP = struct + module Server = struct + let request_duration = "http.server.request.duration" + + let active_requests = "http.server.active_requests" + + (** Histogram *) + let request_body_size = "http.server.request.body.size" + + (** Histogram *) + let response_body_size = "http.server.response.body.size" + end + + module Client = struct + let request_duration = "http.client.request.duration" + + (** Histogram *) + let request_body_size = "http.client.request.body.size" + + (** Histogram *) + let response_body_size = "http.client.response.body.size" + end + end +end diff --git a/src/core/event.ml b/src/core/event.ml new file mode 100644 index 00000000..3d632a4d --- /dev/null +++ b/src/core/event.ml @@ -0,0 +1,9 @@ +open Common_ +open Proto.Trace + +type t = span_event + +let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = []) + (name : string) : t = + let attrs = List.map Key_value.conv attrs in + make_span_event ~time_unix_nano ~name ~attributes:attrs () diff --git a/src/core/event.mli b/src/core/event.mli new file mode 100644 index 00000000..8b90f641 --- /dev/null +++ b/src/core/event.mli @@ -0,0 +1,12 @@ +(** Events. + + Events occur at a given time and can carry attributes. They always belong in + a span. *) + +open Common_ +open Proto.Trace + +type t = span_event + +val make : + ?time_unix_nano:Timestamp_ns.t -> ?attrs:Key_value.t list -> string -> t diff --git a/src/core/exporter.ml b/src/core/exporter.ml new file mode 100644 index 00000000..7ccdc823 --- /dev/null +++ b/src/core/exporter.ml @@ -0,0 +1,128 @@ +(** Exporter. + + This is the pluggable component that actually sends signals to a OTEL + collector, or prints them, or saves them somewhere. + + This is part of the SDK, not just the API, so most real implementations live + in their own library. *) + +open Common_ + +open struct + module Proto = Opentelemetry_proto +end + +(** Main exporter interface *) +class type t = object + method send_trace : Proto.Trace.span list -> unit + + method send_metrics : Proto.Metrics.metric list -> unit + + method send_logs : Proto.Logs.log_record list -> unit + + method tick : unit -> unit + (** Should be called regularly for background processing, timeout checks, etc. + *) + + method add_on_tick_callback : (unit -> unit) -> unit + (** Add the given of callback to the exporter when [tick()] is called. The + callback should be short and reentrant. Depending on the exporter's + implementation, it might be called from a thread that is not the one that + called [on_tick]. *) + + method cleanup : on_done:(unit -> unit) -> unit -> unit + (** [cleanup ~on_done ()] is called when the exporter is shut down, and is + responsible for sending remaining batches, flushing sockets, etc. + @param on_done + callback invoked after the cleanup is done. @since 0.12 *) +end + +(** Dummy exporter, does nothing *) +let dummy : t = + let ticker = Tick_callbacks.create () in + object + method send_trace = ignore + + method send_metrics = ignore + + method send_logs = ignore + + method tick () = Tick_callbacks.tick ticker + + method add_on_tick_callback cb = Tick_callbacks.on_tick ticker cb + + method cleanup ~on_done () = on_done () + end + +let[@inline] send_trace (self : #t) (l : Proto.Trace.span list) = + self#send_trace l + +let[@inline] send_metrics (self : #t) (l : Proto.Metrics.metric list) = + self#send_metrics l + +let[@inline] send_logs (self : #t) (l : Proto.Logs.log_record list) = + self#send_logs l + +let[@inline] on_tick (self : #t) f = self#add_on_tick_callback f + +(** Do background work. Call this regularly if the collector doesn't already + have a ticker thread or internal timer. *) +let[@inline] tick (self : #t) = self#tick () + +let[@inline] cleanup (self : #t) ~on_done : unit = self#cleanup ~on_done () + +(** Main exporter, used by the main tracing functions. + + It is better to pass an explicit exporter when possible. *) +module Main_exporter = struct + (* hidden *) + open struct + (* a list of callbacks automatically added to the main exporter *) + let on_tick_cbs_ = AList.make () + + let exporter : t option Atomic.t = Atomic.make None + end + + (** Set the global exporter *) + let set (exp : t) : unit = + List.iter exp#add_on_tick_callback (AList.get on_tick_cbs_); + Atomic.set exporter (Some exp) + + (** Remove current exporter, if any. + @param on_done see {!t#cleanup}, @since 0.12 *) + let remove ~on_done () : unit = + match Atomic.exchange exporter None with + | None -> () + | Some exp -> + exp#tick (); + cleanup exp ~on_done + + (** Is there a configured exporter? *) + let present () : bool = Option.is_some (Atomic.get exporter) + + (** Current exporter, if any *) + let[@inline] get () : t option = Atomic.get exporter + + let add_on_tick_callback f = + AList.add on_tick_cbs_ f; + Option.iter (fun exp -> exp#add_on_tick_callback f) (get ()) +end + +let set_backend = Main_exporter.set [@@deprecated "use `Main_exporter.set`"] + +let remove_backend = Main_exporter.remove +[@@deprecated "use `Main_exporter.remove`"] + +let has_backend = Main_exporter.present +[@@deprecated "use `Main_exporter.present`"] + +let get_backend = Main_exporter.get [@@deprecated "use `Main_exporter.ge"] + +let with_setup_debug_backend ?(on_done = ignore) (exp : #t) ?(enable = true) () + f = + let exp = (exp :> t) in + if enable then ( + set_backend exp; + Fun.protect ~finally:(fun () -> cleanup exp ~on_done) f + ) else + f () diff --git a/src/core/gc_metrics.ml b/src/core/gc_metrics.ml new file mode 100644 index 00000000..c48b6734 --- /dev/null +++ b/src/core/gc_metrics.ml @@ -0,0 +1,49 @@ +open Common_ + +open struct + let[@inline] bytes_per_word = Sys.word_size / 8 + + let[@inline] word_to_bytes n = n * bytes_per_word + + let[@inline] word_to_bytes_f n = n *. float bytes_per_word +end + +let get_metrics () : Metrics.t list = + let gc = Gc.quick_stat () in + let now = Timestamp_ns.now_unix_ns () in + let open Metrics in + let open Conventions.Metrics in + [ + gauge ~name:Process.Runtime.Ocaml.GC.major_heap ~unit_:"B" + [ int ~now (word_to_bytes gc.Gc.heap_words) ]; + sum ~name:Process.Runtime.Ocaml.GC.minor_allocated + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true ~unit_:"B" + [ float ~now (word_to_bytes_f gc.Gc.minor_words) ]; + sum ~name:Process.Runtime.Ocaml.GC.minor_collections + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~now gc.Gc.minor_collections ]; + sum ~name:Process.Runtime.Ocaml.GC.major_collections + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~now gc.Gc.major_collections ]; + sum ~name:Process.Runtime.Ocaml.GC.compactions + ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative + ~is_monotonic:true + [ int ~now gc.Gc.compactions ]; + ] + +let setup (exp : #Exporter.t) = + let on_tick () = + let m = get_metrics () in + exp#send_metrics m + in + Exporter.on_tick exp on_tick + +let setup_on_main_exporter () = + match Exporter.Main_exporter.get () with + | None -> () + | Some exp -> setup exp + +let basic_setup = setup_on_main_exporter diff --git a/src/core/gc_metrics.mli b/src/core/gc_metrics.mli new file mode 100644 index 00000000..817967d2 --- /dev/null +++ b/src/core/gc_metrics.mli @@ -0,0 +1,17 @@ +(** Export GC metrics. + + These metrics are emitted regularly. *) + +val get_metrics : unit -> Metrics.t list +(** Get a few metrics from the current state of the GC. *) + +val setup : #Exporter.t -> unit +(** Setup a hook that will emit GC statistics on every tick. It does assume that + [tick] is called regularly on the exporter. For example, if we ensure the + exporter's [tick] function is called every 5s, we'll get GC metrics every + 5s. *) + +val setup_on_main_exporter : unit -> unit +(** Setup the hook on the main exporter. *) + +val basic_setup : unit -> unit [@@deprecated "use setup_on_main_exporter"] diff --git a/src/core/globals.ml b/src/core/globals.ml new file mode 100644 index 00000000..36e3e975 --- /dev/null +++ b/src/core/globals.ml @@ -0,0 +1,102 @@ +(** Process-wide metadata, environment variables, etc. *) + +open Common_ +open Proto.Common + +(** Main service name metadata *) +let service_name = ref "unknown_service" + +(** Namespace for the service *) +let service_namespace = ref None + +(** Unique identifier for the service *) +let service_instance_id = ref None + +(** Version for the service + @since 0.12 *) +let service_version = ref None + +let instrumentation_library = + make_instrumentation_scope ~version:"%%VERSION_NUM%%" ~name:"ocaml-otel" () + +(** Global attributes, initially set via OTEL_RESOURCE_ATTRIBUTES and modifiable + by the user code. They will be attached to each outgoing metrics/traces. *) +let global_attributes : key_value list ref = + let parse_pair s = + match String.split_on_char '=' s with + | [ a; b ] -> make_key_value ~key:a ~value:(String_value b) () + | _ -> failwith (Printf.sprintf "invalid attribute: %S" s) + in + ref + @@ + try + Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" + |> String.split_on_char ',' |> List.map parse_pair + with _ -> [] + +(** Add a global attribute *) +let add_global_attribute (key : string) (v : Value.t) : unit = + global_attributes := Key_value.conv (key, v) :: !global_attributes + +(* add global attributes to this list *) +let merge_global_attributes_ into : _ list = + let open Key_value in + let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in + List.rev_append (List.filter not_redundant !global_attributes) into + +(** Default span kind in {!Span.create}. This will be used in all spans that do + not specify [~kind] explicitly; it is set to "internal", following + directions from the [.proto] file. It can be convenient to set "client" or + "server" uniformly in here. + @since 0.4 *) +let default_span_kind = ref Proto.Trace.Span_kind_internal + +open struct + let runtime_attributes = + Conventions.Attributes. + [ + Process.Runtime.name, `String "ocaml"; + Process.Runtime.version, `String Sys.ocaml_version; + ] + + let runtime_attributes_converted = List.map Key_value.conv runtime_attributes +end + +(** Attributes about the OCaml runtime. See + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes +*) +let[@inline] get_runtime_attributes () = runtime_attributes + +let mk_attributes ?(service_name = !service_name) ?(attrs = []) () : _ list = + let l = List.rev_map Key_value.conv attrs in + let l = List.rev_append runtime_attributes_converted l in + let l = + make_key_value ~key:Conventions.Attributes.Service.name + ~value:(String_value service_name) () + :: l + in + let l = + match !service_instance_id with + | None -> l + | Some v -> + make_key_value ~key:Conventions.Attributes.Service.instance_id + ~value:(String_value v) () + :: l + in + let l = + match !service_namespace with + | None -> l + | Some v -> + make_key_value ~key:Conventions.Attributes.Service.namespace + ~value:(String_value v) () + :: l + in + let l = + match !service_version with + | None -> l + | Some v -> + make_key_value ~key:Conventions.Attributes.Service.version + ~value:(String_value v) () + :: l + in + l |> merge_global_attributes_ diff --git a/src/core/key_value.ml b/src/core/key_value.ml new file mode 100644 index 00000000..6760c340 --- /dev/null +++ b/src/core/key_value.ml @@ -0,0 +1,8 @@ +open Common_ + +type t = string * Value.t + +let conv (k, v) = + let open Proto.Common in + let value = Value.conv v in + make_key_value ~key:k ?value () diff --git a/src/core/lock.ml b/src/core/lock.ml deleted file mode 100644 index 6ce295bb..00000000 --- a/src/core/lock.ml +++ /dev/null @@ -1,17 +0,0 @@ -let lock_ : (unit -> unit) ref = ref ignore - -let unlock_ : (unit -> unit) ref = ref ignore - -let set_mutex ~lock ~unlock : unit = - lock_ := lock; - unlock_ := unlock - -let[@inline] with_lock f = - !lock_ (); - match f () with - | x -> - !unlock_ (); - x - | exception e -> - !unlock_ (); - Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ()) diff --git a/src/core/lock.mli b/src/core/lock.mli deleted file mode 100644 index 2040bd1b..00000000 --- a/src/core/lock.mli +++ /dev/null @@ -1,9 +0,0 @@ -(** A global lock, modifiable by the user *) - -val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit -(** Set a pair of lock/unlock functions that are used to protect access to - global state, if needed. By default these do nothing. *) - -val with_lock : (unit -> 'a) -> 'a -(** Call [f()] while holding the mutex defined {!set_mutex}, then release the - mutex. *) diff --git a/src/core/log_record.ml b/src/core/log_record.ml new file mode 100644 index 00000000..9212a8e5 --- /dev/null +++ b/src/core/log_record.ml @@ -0,0 +1,76 @@ +(** Logs. + + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#log-signal} + the spec} *) + +open Common_ +open Proto.Logs + +type t = Proto.Logs.log_record + +(** Severity level of a log event *) +type severity = Proto.Logs.severity_number = + | Severity_number_unspecified + | Severity_number_trace + | Severity_number_trace2 + | Severity_number_trace3 + | Severity_number_trace4 + | Severity_number_debug + | Severity_number_debug2 + | Severity_number_debug3 + | Severity_number_debug4 + | Severity_number_info + | Severity_number_info2 + | Severity_number_info3 + | Severity_number_info4 + | Severity_number_warn + | Severity_number_warn2 + | Severity_number_warn3 + | Severity_number_warn4 + | Severity_number_error + | Severity_number_error2 + | Severity_number_error3 + | Severity_number_error4 + | Severity_number_fatal + | Severity_number_fatal2 + | Severity_number_fatal3 + | Severity_number_fatal4 + +let pp_severity = pp_severity_number + +type flags = Proto.Logs.log_record_flags = + | Log_record_flags_do_not_use + | Log_record_flags_trace_flags_mask + +let pp_flags = Proto.Logs.pp_log_record_flags + +(** Make a single log entry *) +let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ()) + ?severity ?log_level ?flags ?trace_id ?span_id (body : Value.t) : t = + let time_unix_nano = + match time with + | None -> observed_time_unix_nano + | Some t -> t + in + let trace_id = Option.map Trace_id.to_bytes trace_id in + let span_id = Option.map Span_id.to_bytes span_id in + let body = Value.conv body in + make_log_record ~time_unix_nano ~observed_time_unix_nano + ?severity_number:severity ?severity_text:log_level ?flags ?trace_id ?span_id + ?body () + +(** Make a log entry whose body is a string *) +let make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags + ?trace_id ?span_id (body : string) : t = + make ?time ?observed_time_unix_nano ?severity ?log_level ?flags ?trace_id + ?span_id (`String body) + +(** Make a log entry with format *) +let make_strf ?time ?observed_time_unix_nano ?severity ?log_level ?flags + ?trace_id ?span_id fmt = + Format.kasprintf + (fun bod -> + make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags + ?trace_id ?span_id bod) + fmt diff --git a/src/core/logger.ml b/src/core/logger.ml new file mode 100644 index 00000000..33b890e2 --- /dev/null +++ b/src/core/logger.ml @@ -0,0 +1,37 @@ +(** Logs. + + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#log-signal} + the spec} *) + +open Common_ + +(** A logger object *) +class type t = object + method is_enabled : Log_record.severity -> bool + + method emit : Log_record.t list -> unit +end + +(** Dummy logger, always disabled *) +let dummy : t = + object + method is_enabled _ = false + + method emit _ = () + end + +class simple (exp : #Exporter.t) : t = + object + method is_enabled _ = true + + method emit logs = if logs <> [] then exp#send_logs logs + end + +let emit ?service_name:_ ?attrs:_ (l : Log_record.t list) : unit = + match Exporter.Main_exporter.get () with + | None -> () + | Some e -> e#send_logs l +[@@deprecated "use an explicit Logger"] + +let k_logger : t Context.key = Context.new_key () diff --git a/src/core/metrics.ml b/src/core/metrics.ml new file mode 100644 index 00000000..f91538f8 --- /dev/null +++ b/src/core/metrics.ml @@ -0,0 +1,80 @@ +(** Metrics. + + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#metric-signal} + the spec} *) + +open Common_ +open Proto +open Proto.Metrics + +type t = Metrics.metric +(** A single metric, measuring some time-varying quantity or statistical + distribution. It is composed of one or more data points that have precise + values and time stamps. Each distinct metric should have a distinct name. *) + +open struct + let _program_start = Timestamp_ns.now_unix_ns () +end + +(** Number data point, as a float *) +let float ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (d : float) : + number_data_point = + let attributes = attrs |> List.map Key_value.conv in + make_number_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes + ~value:(As_double d) () + +(** Number data point, as an int *) +let int ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (i : int) : + number_data_point = + let attributes = attrs |> List.map Key_value.conv in + make_number_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes + ~value:(As_int (Int64.of_int i)) + () + +(** Aggregation of a scalar metric, always with the current value *) +let gauge ~name ?description ?unit_ (l : number_data_point list) : t = + let data = Gauge (make_gauge ~data_points:l ()) in + make_metric ~name ?description ?unit_ ~data () + +type aggregation_temporality = Metrics.aggregation_temporality = + | Aggregation_temporality_unspecified + | Aggregation_temporality_delta + | Aggregation_temporality_cumulative + +(** Sum of all reported measurements over a time interval *) +let sum ~name ?description ?unit_ + ?(aggregation_temporality = Aggregation_temporality_cumulative) + ?is_monotonic (l : number_data_point list) : t = + let data = + Sum (make_sum ~data_points:l ?is_monotonic ~aggregation_temporality ()) + in + make_metric ~name ?description ?unit_ ~data () + +(** Histogram data + @param count number of values in population (non negative) + @param sum sum of values in population (0 if count is 0) + @param bucket_counts + count value of histogram for each bucket. Sum of the counts must be equal + to [count]. length must be [1+length explicit_bounds] + @param explicit_bounds strictly increasing list of bounds for the buckets *) +let histogram_data_point ?(start_time_unix_nano = _program_start) + ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = []) + ?(explicit_bounds = []) ?sum ~bucket_counts ~count () : histogram_data_point + = + let attributes = attrs |> List.map Key_value.conv in + make_histogram_data_point ~start_time_unix_nano ~time_unix_nano:now + ~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum () + +let histogram ~name ?description ?unit_ ?aggregation_temporality + (l : histogram_data_point list) : t = + let data = + Histogram (make_histogram ~data_points:l ?aggregation_temporality ()) + in + make_metric ~name ?description ?unit_ ~data () + +(* TODO: exponential history *) +(* TODO: summary *) +(* TODO: exemplar *) diff --git a/src/core/metrics_callbacks.ml b/src/core/metrics_callbacks.ml new file mode 100644 index 00000000..c404c0bc --- /dev/null +++ b/src/core/metrics_callbacks.ml @@ -0,0 +1,37 @@ +open Common_ + +type t = { cbs: (unit -> Metrics.t list) AList.t } [@@unboxed] + +let create () : t = { cbs = AList.make () } + +let[@inline] add_metrics_cb (self : t) f = AList.add self.cbs f + +let add_to_exporter (exp : #Exporter.t) (self : t) = + let on_tick () = + (* collect all metrics *) + let res = ref [] in + List.iter + (fun f -> + let f_metrics = f () in + res := List.rev_append f_metrics !res) + (AList.get self.cbs); + let metrics = !res in + + (* emit the metrics *) + Exporter.send_metrics exp metrics + in + Exporter.on_tick exp on_tick + +module Main_set = struct + let cur_set_ : t option Atomic.t = Atomic.make None + + let rec get () = + match Atomic.get cur_set_ with + | Some s -> s + | None -> + let s = create () in + if Atomic.compare_and_set cur_set_ None (Some s) then + s + else + get () +end diff --git a/src/core/metrics_callbacks.mli b/src/core/metrics_callbacks.mli new file mode 100644 index 00000000..040d668f --- /dev/null +++ b/src/core/metrics_callbacks.mli @@ -0,0 +1,25 @@ +(** A set of callbacks that produce metrics when called. The metrics are + automatically called regularly. + + This allows applications to register metrics callbacks from various points + in the program (or even in libraries), and not worry about setting + alarms/intervals to emit them. *) + +type t + +val create : unit -> t + +val add_metrics_cb : t -> (unit -> Metrics.t list) -> unit +(** [register set f] adds the callback [f] to the [set]. + + [f] will be called at unspecified times and is expected to return a list of + metrics. It might be called regularly by the backend, in particular (but not + only) when {!Exporter.tick} is called. *) + +val add_to_exporter : #Exporter.t -> t -> unit +(** Make sure we export metrics at every [tick] of the exporter *) + +module Main_set : sig + val get : unit -> t + (** The global set *) +end diff --git a/src/core/metrics_emitter.ml b/src/core/metrics_emitter.ml new file mode 100644 index 00000000..4a075f4f --- /dev/null +++ b/src/core/metrics_emitter.ml @@ -0,0 +1,32 @@ +open Common_ + +class type t = object + method is_enabled : unit -> bool + + method emit : Metrics.t list -> unit +end + +class dummy : t = + object + method is_enabled () = false + + method emit _ = () + end + +class simple (exp : #Exporter.t) : t = + object + method is_enabled () = true + + method emit l = if l <> [] then exp#send_metrics l + end + +(** Emit some metrics to the collector (sync). This blocks until the backend has + pushed the metrics into some internal queue, or discarded them. + + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. *) +let emit ?attrs:_ (l : Metrics.t list) : unit = + match Exporter.Main_exporter.get () with + | None -> () + | Some exp -> exp#send_metrics l +[@@deprecated "use an explicit Metrics_emitter.t"] diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index 4b189819..b5f518c0 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -1,14 +1,6 @@ (** Opentelemetry types and instrumentation *) -open struct - let spf = Printf.sprintf - - module Atomic = Opentelemetry_atomic.Atomic - module Ambient_context = Opentelemetry_ambient_context -end - -module Lock = Lock -(** Global lock. *) +open Common_ module Rand_bytes = Rand_bytes (** Generation of random identifiers. *) @@ -30,1653 +22,75 @@ module Proto = Opentelemetry_proto (** {2 Timestamps} *) -(** Unix timestamp. +module Timestamp_ns = Timestamp_ns - These timestamps measure time since the Unix epoch (jan 1, 1970) UTC in - nanoseconds. *) -module Timestamp_ns = struct - type t = int64 +(** {2 Export signals to some external collector.} *) - let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600))) +module Exporter = Exporter +module Collector = Exporter [@@deprecated "Use 'Exporter' instead"] - (** Current unix timestamp in nanoseconds *) - let[@inline] now_unix_ns () : t = - let span = Ptime_clock.now () |> Ptime.to_span in - let d, ps = Ptime.Span.to_d_ps span in - let d = Int64.(mul (of_int d) ns_in_a_day) in - let ns = Int64.(div ps 1_000L) in - Int64.(add d ns) -end - -(** {2 Interface to data collector} *) - -(** Collector types - - These types are used by backend implementations, to send events to - collectors such as Jaeger. - - Note: most users will not need to touch this module *) -module Collector = struct - open Opentelemetry_proto - - type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a } - (** Sender interface for a message of type [msg]. Inspired from Logs' reporter - (see - {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) - but without [over] as it doesn't make much sense in presence of batching. - - The [ret] callback is used to return the desired type (unit, or a Lwt - promise, or anything else) once the event has been transferred to the - backend. It doesn't mean the event has been collected yet, it could sit in - a batch queue for a little while. *) - - (** Collector client interface. *) - module type BACKEND = sig - val send_trace : Trace.resource_spans list sender - - val send_metrics : Metrics.resource_metrics list sender - - val send_logs : Logs.resource_logs list sender - - val signal_emit_gc_metrics : unit -> unit - (** Signal the backend that it should emit GC metrics when it has the - chance. This should be installed in a GC alarm or another form of - regular trigger. *) - - val tick : unit -> unit - (** Should be called regularly for background processing, timeout checks, - etc. *) - - val set_on_tick_callbacks : (unit -> unit) AList.t -> unit - (** Give the collector the list of callbacks to be executed when [tick()] is - called. Each such callback should be short and reentrant. Depending on - the collector's implementation, it might be called from a thread that is - not the one that called [on_tick]. *) - - val cleanup : on_done:(unit -> unit) -> unit -> unit - (** [cleanup ~on_done ()] is called when the collector is shut down, and is - responsible for sending remaining batches, flushing sockets, etc. - @param on_done - callback invoked after the cleanup is done. @since 0.12 *) - end - - type backend = (module BACKEND) - - module Noop_backend : BACKEND = struct - let noop_sender _ ~ret = ret () - - let send_trace : Trace.resource_spans list sender = { send = noop_sender } - - let send_metrics : Metrics.resource_metrics list sender = - { send = noop_sender } - - let send_logs : Logs.resource_logs list sender = { send = noop_sender } - - let signal_emit_gc_metrics () = () - - let tick () = () - - let set_on_tick_callbacks _cbs = () - - let cleanup ~on_done () = - on_done (); - () - end - - module Debug_backend (B : BACKEND) : BACKEND = struct - open Proto - - let send_trace : Trace.resource_spans list sender = - { - send = - (fun l ~ret -> - Format.eprintf "SPANS: %a@." - (Format.pp_print_list Trace.pp_resource_spans) - l; - B.send_trace.send l ~ret); - } - - let send_metrics : Metrics.resource_metrics list sender = - { - send = - (fun l ~ret -> - Format.eprintf "METRICS: %a@." - (Format.pp_print_list Metrics.pp_resource_metrics) - l; - B.send_metrics.send l ~ret); - } - - let send_logs : Logs.resource_logs list sender = - { - send = - (fun l ~ret -> - Format.eprintf "LOGS: %a@." - (Format.pp_print_list Logs.pp_resource_logs) - l; - B.send_logs.send l ~ret); - } - - let signal_emit_gc_metrics () = B.signal_emit_gc_metrics () - - let tick () = B.tick () - - let set_on_tick_callbacks cbs = B.set_on_tick_callbacks cbs - - let cleanup ~on_done () = B.cleanup ~on_done () - end - - let debug_backend : backend = (module Debug_backend (Noop_backend)) - - (* hidden *) - open struct - let on_tick_cbs_ = AList.make () - - let backend : backend option Atomic.t = Atomic.make None - end - - (** Set collector backend *) - let set_backend (b : backend) : unit = - let (module B) = b in - B.set_on_tick_callbacks on_tick_cbs_; - Atomic.set backend (Some b) - - (** Remove current backend, if any. - @since 0.11 - @param on_done see {!BACKEND.cleanup}, @since 0.12 *) - let remove_backend ~on_done () : unit = - match Atomic.exchange backend None with - | None -> () - | Some (module B) -> - B.tick (); - B.cleanup ~on_done () - - (** Is there a configured backend? *) - let[@inline] has_backend () : bool = Atomic.get backend != None - - (** Current backend, if any *) - let[@inline] get_backend () : backend option = Atomic.get backend - - let send_trace (l : Trace.resource_spans list) ~ret = - match Atomic.get backend with - | None -> ret () - | Some (module B) -> B.send_trace.send l ~ret - - let send_metrics (l : Metrics.resource_metrics list) ~ret = - match Atomic.get backend with - | None -> ret () - | Some (module B) -> B.send_metrics.send l ~ret - - let send_logs (l : Logs.resource_logs list) ~ret = - match Atomic.get backend with - | None -> ret () - | Some (module B) -> B.send_logs.send l ~ret - - let[@inline] rand_bytes_16 () = !Rand_bytes.rand_bytes_16 () - - let[@inline] rand_bytes_8 () = !Rand_bytes.rand_bytes_8 () - - let[@inline] on_tick f = AList.add on_tick_cbs_ f - - (** Do background work. Call this regularly if the collector doesn't already - have a ticker thread or internal timer. *) - let tick () = - match Atomic.get backend with - | None -> () - | Some (module B) -> B.tick () - - let with_setup_debug_backend ?(on_done = ignore) b ?(enable = true) () f = - let (module B : BACKEND) = b in - if enable then ( - set_backend b; - Fun.protect ~finally:(B.cleanup ~on_done) f - ) else - f () -end - -(**/**) - -module Util_ = struct - let int_to_hex (i : int) = - if i < 10 then - Char.chr (i + Char.code '0') - else - Char.chr (i - 10 + Char.code 'a') - - let bytes_to_hex_into b res off : unit = - for i = 0 to Bytes.length b - 1 do - let n = Char.code (Bytes.get b i) in - Bytes.set res ((2 * i) + off) (int_to_hex ((n land 0xf0) lsr 4)); - Bytes.set res ((2 * i) + 1 + off) (int_to_hex (n land 0x0f)) - done - - let bytes_to_hex (b : bytes) : string = - let res = Bytes.create (2 * Bytes.length b) in - bytes_to_hex_into b res 0; - Bytes.unsafe_to_string res - - let int_of_hex = function - | '0' .. '9' as c -> Char.code c - Char.code '0' - | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' - | c -> raise (Invalid_argument (spf "invalid hex char: %C" c)) - - let bytes_of_hex_substring (s : string) off len = - if len mod 2 <> 0 then - raise (Invalid_argument "hex sequence must be of even length"); - let res = Bytes.make (len / 2) '\x00' in - for i = 0 to (len / 2) - 1 do - let n1 = int_of_hex (String.get s (off + (2 * i))) in - let n2 = int_of_hex (String.get s (off + (2 * i) + 1)) in - let n = (n1 lsl 4) lor n2 in - Bytes.set res i (Char.chr n) - done; - res - - let bytes_of_hex (s : string) : bytes = - bytes_of_hex_substring s 0 (String.length s) - - let bytes_non_zero (self : bytes) : bool = - try - for i = 0 to Bytes.length self - 1 do - if Char.code (Bytes.unsafe_get self i) <> 0 then raise_notrace Exit - done; - false - with Exit -> true -end - -(**/**) +module Tick_callbacks = Tick_callbacks +(** Helper to implement part of the exporter *) (** {2 Identifiers} *) -(** Trace ID. +module Trace_id = Trace_id - This 16 bytes identifier is shared by all spans in one trace. *) -module Trace_id : sig - type t +let k_trace_id = Trace_id.k_trace_id - val create : unit -> t +module Span_id = Span_id +module Span_ctx = Span_ctx - val dummy : t - - val pp : Format.formatter -> t -> unit - - val is_valid : t -> bool - - val to_bytes : t -> bytes - - val of_bytes : bytes -> t - - val to_hex : t -> string - - val to_hex_into : t -> bytes -> int -> unit - - val of_hex : string -> t - - val of_hex_substring : string -> int -> t -end = struct - type t = bytes - - let[@inline] to_bytes self = self - - let dummy : t = Bytes.make 16 '\x00' - - let create () : t = - let b = Collector.rand_bytes_16 () in - assert (Bytes.length b = 16); - (* make sure the identifier is not all 0, which is a dummy identifier. *) - Bytes.set b 0 (Char.unsafe_chr (Char.code (Bytes.get b 0) lor 1)); - b - - let[@inline] of_bytes b = - if Bytes.length b = 16 then - b - else - raise (Invalid_argument "trace ID must be 16 bytes in length") - - let is_valid = Util_.bytes_non_zero - - let to_hex = Util_.bytes_to_hex - - let to_hex_into = Util_.bytes_to_hex_into - - let[@inline] of_hex s = of_bytes (Util_.bytes_of_hex s) - - let[@inline] of_hex_substring s off = - of_bytes (Util_.bytes_of_hex_substring s off 32) - - let pp fmt t = Format.fprintf fmt "%s" (to_hex t) -end - -(** Hmap key to carry around a {!Trace_id.t}, to remember what the current trace - is. - @since 0.8 *) -let k_trace_id : Trace_id.t Hmap.key = Hmap.Key.create () - -(** Unique ID of a span. *) -module Span_id : sig - type t - - val create : unit -> t - - val dummy : t - - val pp : Format.formatter -> t -> unit - - val is_valid : t -> bool - - val to_bytes : t -> bytes - - val of_bytes : bytes -> t - - val to_hex : t -> string - - val to_hex_into : t -> bytes -> int -> unit - - val of_hex : string -> t - - val of_hex_substring : string -> int -> t -end = struct - type t = bytes - - let[@inline] to_bytes self = self - - let dummy : t = Bytes.make 8 '\x00' - - let create () : t = - let b = Collector.rand_bytes_8 () in - assert (Bytes.length b = 8); - (* make sure the identifier is not all 0, which is a dummy identifier. *) - Bytes.set b 0 (Char.unsafe_chr (Char.code (Bytes.get b 0) lor 1)); - b - - let is_valid = Util_.bytes_non_zero - - let of_bytes b = - if Bytes.length b = 8 then - b - else - raise (Invalid_argument "span IDs must be 8 bytes in length") - - let to_hex = Util_.bytes_to_hex - - let to_hex_into = Util_.bytes_to_hex_into - - let[@inline] of_hex s = of_bytes (Util_.bytes_of_hex s) - - let[@inline] of_hex_substring s off = - of_bytes (Util_.bytes_of_hex_substring s off 16) - - let pp fmt t = Format.fprintf fmt "%s" (to_hex t) -end - -(** Span context. This bundles up a trace ID and parent ID. - - {{:https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} - https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} - @since 0.7 *) -module Span_ctx : sig - type t - - val make : - ?sampled:bool -> trace_id:Trace_id.t -> parent_id:Span_id.t -> unit -> t - - val dummy : t - (** Invalid span context, to be used as a placeholder *) - - val is_valid : t -> bool - - val trace_id : t -> Trace_id.t - - val parent_id : t -> Span_id.t - - val sampled : t -> bool - - val to_w3c_trace_context : t -> bytes - - val of_w3c_trace_context : bytes -> (t, string) result - - val of_w3c_trace_context_exn : bytes -> t - (** @raise Invalid_argument if parsing failed *) -end = struct - (* TODO: trace state *) - - type t = { - trace_id: Trace_id.t; - parent_id: Span_id.t; - sampled: bool; - } - - let dummy = - { trace_id = Trace_id.dummy; parent_id = Span_id.dummy; sampled = false } - - let make ?(sampled = false) ~trace_id ~parent_id () : t = - { trace_id; parent_id; sampled } - - let[@inline] is_valid self = - Trace_id.is_valid self.trace_id && Span_id.is_valid self.parent_id - - let[@inline] sampled self = self.sampled - - let[@inline] trace_id self = self.trace_id - - let[@inline] parent_id self = self.parent_id - - let to_w3c_trace_context (self : t) : bytes = - let bs = Bytes.create 55 in - Bytes.set bs 0 '0'; - Bytes.set bs 1 '0'; - Bytes.set bs 2 '-'; - Trace_id.to_hex_into self.trace_id bs 3; - (* +32 *) - Bytes.set bs (3 + 32) '-'; - Span_id.to_hex_into self.parent_id bs 36; - (* +16 *) - Bytes.set bs 52 '-'; - Bytes.set bs 53 '0'; - Bytes.set bs 54 - (if self.sampled then - '1' - else - '0'); - bs - - let of_w3c_trace_context bs : _ result = - try - if Bytes.length bs <> 55 then invalid_arg "trace context must be 55 bytes"; - (match int_of_string_opt (Bytes.sub_string bs 0 2) with - | Some 0 -> () - | Some n -> invalid_arg @@ spf "version is %d, expected 0" n - | None -> invalid_arg "expected 2-digit version"); - if Bytes.get bs 2 <> '-' then invalid_arg "expected '-' before trace_id"; - let trace_id = - try Trace_id.of_hex_substring (Bytes.unsafe_to_string bs) 3 - with Invalid_argument msg -> invalid_arg (spf "in trace id: %s" msg) - in - if Bytes.get bs (3 + 32) <> '-' then - invalid_arg "expected '-' before parent_id"; - let parent_id = - try Span_id.of_hex_substring (Bytes.unsafe_to_string bs) 36 - with Invalid_argument msg -> invalid_arg (spf "in span id: %s" msg) - in - if Bytes.get bs 52 <> '-' then invalid_arg "expected '-' after parent_id"; - let sampled = int_of_string_opt (Bytes.sub_string bs 53 2) = Some 1 in - - (* ignore flags *) - Ok { trace_id; parent_id; sampled } - with Invalid_argument msg -> Error msg - - let of_w3c_trace_context_exn bs = - match of_w3c_trace_context bs with - | Ok t -> t - | Error msg -> invalid_arg @@ spf "invalid w3c trace context: %s" msg -end - -(** Hmap key to carry around a {!Span_ctx.t}, e.g. to remember what the current - parent span is. - @since 0.8 *) -let k_span_ctx : Span_ctx.t Hmap.key = Hmap.Key.create () +let k_span_ctx = Span_ctx.k_span_ctx (** {2 Attributes and conventions} *) -(** Semantic conventions +module Conventions = Conventions - {{:https://opentelemetry.io/docs/specs/semconv/} - https://opentelemetry.io/docs/specs/semconv/} *) -module Conventions = struct - module Attributes = struct - module Process = struct - module Runtime = struct - let name = "process.runtime.name" - - let version = "process.runtime.version" - - let description = "process.runtime.description" - end - end - - (** https://opentelemetry.io/docs/specs/semconv/attributes-registry/code/ *) - module Code = struct - (** Int *) - let column = "code.column" - - let filepath = "code.filepath" - - let function_ = "code.function" - - (** int *) - let line = "code.lineno" - - let namespace = "code.namespace" - - let stacktrace = "code.stacktrace" - end - - module Service = struct - let name = "service.name" - - let namespace = "service.namespace" - - let instance_id = "service.instance.id" - - let version = "service.version" - end - - module HTTP = struct - let error_type = "error.type" - - let request_method = "http.request.method" - - let route = "http.route" - - let url_full = "url.full" - - (** HTTP status code, int *) - let response_status_code = "http.response.status_code" - - let server_address = "server.address" - - let server_port = "server.port" - - (** http or https *) - let url_scheme = "url.scheme" - end - - (** https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/host.md - *) - module Host = struct - let id = "host.id" - - let name = "host.name" - - let type_ = "host.type" - - let arch = "host.arch" - - let ip = "host.ip" - - let mac = "host.mac" - - let image_id = "host.image.id" - - let image_name = "host.image.name" - - let image_version = "host.image.version" - end - end - - module Metrics = struct - module Process = struct - module Runtime = struct - module Ocaml = struct - module GC = struct - let compactions = "process.runtime.ocaml.gc.compactions" - - let major_collections = "process.runtime.ocaml.gc.major_collections" - - let major_heap = "process.runtime.ocaml.gc.major_heap" - - let minor_allocated = "process.runtime.ocaml.gc.minor_allocated" - - let minor_collections = "process.runtime.ocaml.gc.minor_collections" - end - end - end - end - - (** https://opentelemetry.io/docs/specs/semconv/http/ *) - module HTTP = struct - module Server = struct - let request_duration = "http.server.request.duration" - - let active_requests = "http.server.active_requests" - - (** Histogram *) - let request_body_size = "http.server.request.body.size" - - (** Histogram *) - let response_body_size = "http.server.response.body.size" - end - - module Client = struct - let request_duration = "http.client.request.duration" - - (** Histogram *) - let request_body_size = "http.client.request.body.size" - - (** Histogram *) - let response_body_size = "http.client.response.body.size" - end - end - end -end - -type value = - [ `Int of int - | `String of string - | `Bool of bool - | `Float of float - | `None - ] +type value = Value.t (** A value in a key/value attribute *) -type key_value = string * value - -open struct - let _conv_value = - let open Proto.Common in - function - | `Int i -> Some (Int_value (Int64.of_int i)) - | `String s -> Some (String_value s) - | `Bool b -> Some (Bool_value b) - | `Float f -> Some (Double_value f) - | `None -> None - - let _conv_key_value (k, v) = - let open Proto.Common in - let value = _conv_value v in - make_key_value ~key:k ?value () -end +type key_value = Key_value.t (** {2 Global settings} *) -(** Process-wide metadata, environment variables, etc. *) -module Globals = struct - open Proto.Common - - (** Main service name metadata *) - let service_name = ref "unknown_service" - - (** Namespace for the service *) - let service_namespace = ref None - - (** Unique identifier for the service *) - let service_instance_id = ref None - - (** Version for the service - @since 0.12 *) - let service_version = ref None - - let instrumentation_library : instrumentation_scope = - make_instrumentation_scope ~version:"%%VERSION_NUM%%" ~name:"ocaml-otel" () - - (** Global attributes, initially set via OTEL_RESOURCE_ATTRIBUTES and - modifiable by the user code. They will be attached to each outgoing - metrics/traces. *) - let global_attributes : key_value list ref = - let parse_pair s = - match String.split_on_char '=' s with - | [ a; b ] -> make_key_value ~key:a ~value:(String_value b) () - | _ -> failwith (Printf.sprintf "invalid attribute: %S" s) - in - ref - @@ - try - Sys.getenv "OTEL_RESOURCE_ATTRIBUTES" - |> String.split_on_char ',' |> List.map parse_pair - with _ -> [] - - (** Add a global attribute *) - let add_global_attribute (key : string) (v : value) : unit = - global_attributes := _conv_key_value (key, v) :: !global_attributes - - (* add global attributes to this list *) - let merge_global_attributes_ into : _ list = - let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in - List.rev_append (List.filter not_redundant !global_attributes) into - - (** Default span kind in {!Span.create}. This will be used in all spans that - do not specify [~kind] explicitly; it is set to "internal", following - directions from the [.proto] file. It can be convenient to set "client" or - "server" uniformly in here. - @since 0.4 *) - let default_span_kind = ref Proto.Trace.Span_kind_internal - - let mk_attributes ?(service_name = !service_name) ?(attrs = []) () : _ list = - let l = List.map _conv_key_value attrs in - let l = - make_key_value ~key:Conventions.Attributes.Service.name - ~value:(String_value service_name) () - :: l - in - let l = - match !service_instance_id with - | None -> l - | Some v -> - make_key_value ~key:Conventions.Attributes.Service.instance_id - ~value:(String_value v) () - :: l - in - let l = - match !service_namespace with - | None -> l - | Some v -> - make_key_value ~key:Conventions.Attributes.Service.namespace - ~value:(String_value v) () - :: l - in - let l = - match !service_version with - | None -> l - | Some v -> - make_key_value ~key:Conventions.Attributes.Service.version - ~value:(String_value v) () - :: l - in - l |> merge_global_attributes_ -end +module Globals = Globals (** {2 Traces and Spans} *) -(** Events. - - Events occur at a given time and can carry attributes. They always belong in - a span. *) -module Event : sig - open Proto.Trace - - type t = span_event - - val make : - ?time_unix_nano:Timestamp_ns.t -> ?attrs:key_value list -> string -> t -end = struct - open Proto.Trace - - type t = span_event - - let make ?(time_unix_nano = Timestamp_ns.now_unix_ns ()) ?(attrs = []) - (name : string) : t = - let attrs = List.map _conv_key_value attrs in - make_span_event ~time_unix_nano ~name ~attributes:attrs () -end - -(** Span Link - - A pointer from the current span to another span in the same trace or in a - different trace. For example, this can be used in batching operations, where - a single batch handler processes multiple requests from different traces or - when the handler receives a request from a different project. *) -module Span_link : sig - open Proto.Trace - - type t = span_link - - val make : - trace_id:Trace_id.t -> - span_id:Span_id.t -> - ?trace_state:string -> - ?attrs:key_value list -> - ?dropped_attributes_count:int -> - unit -> - t - - val of_span_ctx : - ?trace_state:string -> - ?attrs:key_value list -> - ?dropped_attributes_count:int -> - Span_ctx.t -> - t -end = struct - open Proto.Trace - - type t = span_link - - let make ~trace_id ~span_id ?trace_state ?(attrs = []) - ?dropped_attributes_count () : t = - let attributes = List.map _conv_key_value attrs in - let dropped_attributes_count = - Option.map Int32.of_int dropped_attributes_count - in - make_span_link - ~trace_id:(Trace_id.to_bytes trace_id) - ~span_id:(Span_id.to_bytes span_id) ?trace_state ~attributes - ?dropped_attributes_count () - - let[@inline] of_span_ctx ?trace_state ?attrs ?dropped_attributes_count - (ctx : Span_ctx.t) : t = - make ~trace_id:(Span_ctx.trace_id ctx) ~span_id:(Span_ctx.parent_id ctx) - ?trace_state ?attrs ?dropped_attributes_count () -end - -module Span_status : sig - open Proto.Trace - - type t = status = private { - mutable _presence: Pbrt.Bitfield.t; - mutable message: string; - mutable code: status_status_code; - } - - type code = status_status_code = - | Status_code_unset - | Status_code_ok - | Status_code_error - - val make : message:string -> code:code -> t -end = struct - open Proto.Trace - - type t = status = private { - mutable _presence: Pbrt.Bitfield.t; - mutable message: string; - mutable code: status_status_code; - } - - type code = status_status_code = - | Status_code_unset - | Status_code_ok - | Status_code_error - - let[@inline] make ~message ~code : t = make_status ~message ~code () -end - -(** @since 0.11 *) -module Span_kind : sig - open Proto.Trace - - type t = span_span_kind = - | Span_kind_unspecified - | Span_kind_internal - | Span_kind_server - | Span_kind_client - | Span_kind_producer - | Span_kind_consumer -end = struct - open Proto.Trace - - type t = span_span_kind = - | Span_kind_unspecified - | Span_kind_internal - | Span_kind_server - | Span_kind_client - | Span_kind_producer - | Span_kind_consumer -end +module Event = Event +module Span_link = Span_link +module Span_status = Span_status +module Span_kind = Span_kind (** {2 Scopes} *) -(** Scopes. - - A scope is a trace ID and the span ID of the currently active span. *) -module Scope : sig - type item_list - - type t = { - trace_id: Trace_id.t; - span_id: Span_id.t; - mutable items: item_list; - } - - val attrs : t -> key_value list - - val events : t -> Event.t list - - val links : t -> Span_link.t list - - val status : t -> Span_status.t option - - val kind : t -> Span_kind.t option - - val make : - trace_id:Trace_id.t -> - span_id:Span_id.t -> - ?events:Event.t list -> - ?attrs:key_value list -> - ?links:Span_link.t list -> - ?status:Span_status.t -> - unit -> - t - - val to_span_link : - ?trace_state:string -> - ?attrs:key_value list -> - ?dropped_attributes_count:int -> - t -> - Span_link.t - (** Turn the scope into a span link *) - - val to_span_ctx : t -> Span_ctx.t - (** Turn the scope into a span context *) - - val add_event : t -> (unit -> Event.t) -> unit - (** Add an event to the scope. It will be aggregated into the span. - - Note that this takes a function that produces an event, and will only call - it if there is an instrumentation backend. *) - - val record_exception : t -> exn -> Printexc.raw_backtrace -> unit - - val add_attrs : t -> (unit -> key_value list) -> unit - (** Add attributes to the scope. It will be aggregated into the span. - - Note that this takes a function that produces attributes, and will only - call it if there is an instrumentation backend. *) - - val add_links : t -> (unit -> Span_link.t list) -> unit - (** Add links to the scope. It will be aggregated into the span. - - Note that this takes a function that produces links, and will only call it - if there is an instrumentation backend. *) - - val set_status : t -> Span_status.t -> unit - (** set the span status. - - Note that this function will be called only if there is an instrumentation - backend. *) - - val set_kind : t -> Span_kind.t -> unit - (** Set the span's kind. - @since 0.11 *) - - val ambient_scope_key : t Ambient_context.key - (** The opaque key necessary to access/set the ambient scope with - {!Ambient_context}. *) - - val get_ambient_scope : ?scope:t -> unit -> t option - (** Obtain current scope from {!Ambient_context}, if available. *) - - val with_ambient_scope : t -> (unit -> 'a) -> 'a - (** [with_ambient_scope sc thunk] calls [thunk()] in a context where [sc] is - the (thread|continuation)-local scope, then reverts to the previous local - scope, if any. - - @see - ambient-context docs *) -end = struct - type item_list = - | Nil - | Ev of Event.t * item_list - | Attr of key_value * item_list - | Span_link of Span_link.t * item_list - | Span_status of Span_status.t * item_list - | Span_kind of Span_kind.t * item_list - - type t = { - trace_id: Trace_id.t; - span_id: Span_id.t; - mutable items: item_list; - } - - let attrs scope = - let rec loop acc = function - | Nil -> acc - | Attr (attr, l) -> loop (attr :: acc) l - | Ev (_, l) | Span_kind (_, l) | Span_link (_, l) | Span_status (_, l) -> - loop acc l - in - loop [] scope.items - - let events scope = - let rec loop acc = function - | Nil -> acc - | Ev (event, l) -> loop (event :: acc) l - | Attr (_, l) | Span_kind (_, l) | Span_link (_, l) | Span_status (_, l) - -> - loop acc l - in - loop [] scope.items - - let links scope = - let rec loop acc = function - | Nil -> acc - | Span_link (span_link, l) -> loop (span_link :: acc) l - | Ev (_, l) | Span_kind (_, l) | Attr (_, l) | Span_status (_, l) -> - loop acc l - in - loop [] scope.items - - let status scope = - let rec loop = function - | Nil -> None - | Span_status (status, _) -> Some status - | Ev (_, l) | Attr (_, l) | Span_kind (_, l) | Span_link (_, l) -> loop l - in - loop scope.items - - let kind scope = - let rec loop = function - | Nil -> None - | Span_kind (k, _) -> Some k - | Ev (_, l) | Span_status (_, l) | Attr (_, l) | Span_link (_, l) -> - loop l - in - loop scope.items - - let make ~trace_id ~span_id ?(events = []) ?(attrs = []) ?(links = []) ?status - () : t = - let items = - let items = - match status with - | None -> Nil - | Some status -> Span_status (status, Nil) - in - let items = List.fold_left (fun acc ev -> Ev (ev, acc)) items events in - let items = - List.fold_left (fun acc attr -> Attr (attr, acc)) items attrs - in - List.fold_left (fun acc link -> Span_link (link, acc)) items links - in - { trace_id; span_id; items } - - let[@inline] to_span_link ?trace_state ?attrs ?dropped_attributes_count - (self : t) : Span_link.t = - Span_link.make ?trace_state ?attrs ?dropped_attributes_count - ~trace_id:self.trace_id ~span_id:self.span_id () - - let[@inline] to_span_ctx (self : t) : Span_ctx.t = - Span_ctx.make ~trace_id:self.trace_id ~parent_id:self.span_id () - - let[@inline] add_event (scope : t) (ev : unit -> Event.t) : unit = - if Collector.has_backend () then scope.items <- Ev (ev (), scope.items) - - let[@inline] record_exception (scope : t) (exn : exn) - (bt : Printexc.raw_backtrace) : unit = - if Collector.has_backend () then ( - let ev = - Event.make "exception" - ~attrs: - [ - "exception.message", `String (Printexc.to_string exn); - "exception.type", `String (Printexc.exn_slot_name exn); - ( "exception.stacktrace", - `String (Printexc.raw_backtrace_to_string bt) ); - ] - in - scope.items <- Ev (ev, scope.items) - ) - - let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit = - if Collector.has_backend () then - scope.items <- - List.fold_left (fun acc attr -> Attr (attr, acc)) scope.items (attrs ()) - - let[@inline] add_links (scope : t) (links : unit -> Span_link.t list) : unit = - if Collector.has_backend () then - scope.items <- - List.fold_left - (fun acc link -> Span_link (link, acc)) - scope.items (links ()) - - let set_status (scope : t) (status : Span_status.t) : unit = - if Collector.has_backend () then - scope.items <- Span_status (status, scope.items) - - let set_kind (scope : t) (k : Span_kind.t) : unit = - if Collector.has_backend () then scope.items <- Span_kind (k, scope.items) - - let ambient_scope_key : t Ambient_context.key = Ambient_context.create_key () - - let get_ambient_scope ?scope () : t option = - match scope with - | Some _ -> scope - | None -> Ambient_context.get ambient_scope_key - - let[@inline] with_ambient_scope (sc : t) (f : unit -> 'a) : 'a = - Ambient_context.with_binding ambient_scope_key sc (fun _ -> f ()) -end +module Scope = Scope (** {2 Traces} *) -(** Spans. - - A Span is the workhorse of traces, it indicates an operation that took place - over a given span of time (indicated by start_time and end_time) as part of - a hierarchical trace. All spans in a given trace are bound by the use of the - same {!Trace_id.t}. *) -module Span : sig - open Proto.Trace - - type t = span - - type id = Span_id.t - - type kind = Span_kind.t = - | Span_kind_unspecified - | Span_kind_internal - | Span_kind_server - | Span_kind_client - | Span_kind_producer - | Span_kind_consumer - - val id : t -> Span_id.t - - type key_value = - string - * [ `Int of int - | `String of string - | `Bool of bool - | `Float of float - | `None - ] - - val create : - ?kind:kind -> - ?id:id -> - ?trace_state:string -> - ?attrs:key_value list -> - ?events:Event.t list -> - ?status:status -> - trace_id:Trace_id.t -> - ?parent:id -> - ?links:Span_link.t list -> - start_time:Timestamp_ns.t -> - end_time:Timestamp_ns.t -> - string -> - t * id - (** [create ~trace_id name] creates a new span with its unique ID. - @param trace_id the trace this belongs to - @param parent parent span, if any - @param links - list of links to other spans, each with their trace state (see - {{:https://www.w3.org/TR/trace-context/#tracestate-header} w3.org}) *) -end = struct - open Proto.Trace - - type t = span - - type id = Span_id.t - - type kind = Span_kind.t = - | Span_kind_unspecified - | Span_kind_internal - | Span_kind_server - | Span_kind_client - | Span_kind_producer - | Span_kind_consumer - - type key_value = - string - * [ `Int of int - | `String of string - | `Bool of bool - | `Float of float - | `None - ] - - let id self = Span_id.of_bytes self.span_id - - let create ?(kind = !Globals.default_span_kind) ?(id = Span_id.create ()) - ?trace_state ?(attrs = []) ?(events = []) ?status ~trace_id ?parent - ?(links = []) ~start_time ~end_time name : t * id = - let trace_id = Trace_id.to_bytes trace_id in - let parent_span_id = Option.map Span_id.to_bytes parent in - let attributes = List.map _conv_key_value attrs in - let span = - make_span ~trace_id ?parent_span_id ~span_id:(Span_id.to_bytes id) - ~attributes ~events ?trace_state ?status ~kind ~name ~links - ~start_time_unix_nano:start_time ~end_time_unix_nano:end_time () - in - span, id -end - -(** Traces. - - See - {{:https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal} - the spec} *) -module Trace = struct - open Proto.Trace - - type span = Span.t - - let make_resource_spans ?service_name ?attrs spans : resource_spans = - let ils = - make_scope_spans ~scope:Globals.instrumentation_library ~spans () - in - let attributes = Globals.mk_attributes ?service_name ?attrs () in - let resource = Proto.Resource.make_resource ~attributes () in - make_resource_spans ~resource ~scope_spans:[ ils ] () - - (** Sync emitter. - - This instructs the collector to forward the spans to some backend at a - later point. - - {b NOTE} be careful not to call this inside a Gc alarm, as it can cause - deadlocks. *) - let emit ?service_name ?attrs (spans : span list) : unit = - let rs = make_resource_spans ?service_name ?attrs spans in - Collector.send_trace [ rs ] ~ret:(fun () -> ()) - - type scope = Scope.t = { - trace_id: Trace_id.t; - span_id: Span_id.t; - mutable items: Scope.item_list; - } - [@@deprecated "use Scope.t"] - - let (add_event [@deprecated "use Scope.add_event"]) = Scope.add_event - - let (add_attrs [@deprecated "use Scope.add_attrs"]) = Scope.add_attrs - - let with_' ?(force_new_trace_id = false) ?trace_state ?service_name - ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope - ?(links = []) name cb = - let scope = - if force_new_trace_id then - None - else - Scope.get_ambient_scope ?scope () - in - let trace_id = - match trace_id, scope with - | _ when force_new_trace_id -> Trace_id.create () - | Some trace_id, _ -> trace_id - | None, Some scope -> scope.trace_id - | None, None -> Trace_id.create () - in - let parent = - match parent, scope with - | _ when force_new_trace_id -> None - | Some span_id, _ -> Some span_id - | None, Some scope -> Some scope.span_id - | None, None -> None - in - let start_time = Timestamp_ns.now_unix_ns () in - let span_id = Span_id.create () in - let scope = Scope.make ~trace_id ~span_id ~attrs ~links () in - (* called once we're done, to emit a span *) - let finally res = - let status = - match Scope.status scope with - | Some status -> Some status - | None -> - (match res with - | Ok () -> - (* By default, all spans are Unset, which means a span completed without error. - The Ok status is reserved for when you need to explicitly mark a span as successful - rather than stick with the default of Unset (i.e., “without error”). - - https://opentelemetry.io/docs/languages/go/instrumentation/#set-span-status *) - None - | Error (e, bt) -> - Scope.record_exception scope e bt; - Some - (make_status ~code:Status_code_error - ~message:(Printexc.to_string e) ())) - in - let span, _ = - (* TODO: should the attrs passed to with_ go on the Span - (in Span.create) or on the ResourceSpan (in emit)? - (question also applies to Opentelemetry_lwt.Trace.with) *) - Span.create ?kind ~trace_id ?parent ~links:(Scope.links scope) - ~id:span_id ?trace_state ~attrs:(Scope.attrs scope) - ~events:(Scope.events scope) ~start_time - ~end_time:(Timestamp_ns.now_unix_ns ()) - ?status name - in - emit ?service_name [ span ] - in - let thunk () = - (* set global scope in this thread *) - Scope.with_ambient_scope scope @@ fun () -> cb scope - in - thunk, finally - - (** Sync span guard. - - Notably, this includes {e implicit} scope-tracking: if called without a - [~scope] argument (or [~parent]/[~trace_id]), it will check in the - {!Ambient_context} for a surrounding environment, and use that as the - scope. Similarly, it uses {!Scope.with_ambient_scope} to {e set} a new - scope in the ambient context, so that any logically-nested calls to - {!with_} will use this span as their parent. - - {b NOTE} be careful not to call this inside a Gc alarm, as it can cause - deadlocks. - - @param force_new_trace_id - if true (default false), the span will not use a ambient scope, the - [~scope] argument, nor [~trace_id], but will instead always create fresh - identifiers for this span *) - - let with_ ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind - ?trace_id ?parent ?scope ?links name (cb : Scope.t -> 'a) : 'a = - let thunk, finally = - with_' ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind - ?trace_id ?parent ?scope ?links name cb - in - - try - let rv = thunk () in - finally (Ok ()); - rv - with e -> - let bt = Printexc.get_raw_backtrace () in - finally (Error (e, bt)); - raise e -end +module Span = Span +module Tracer = Tracer +module Trace = Tracer [@@deprecated "use Tracer instead"] (** {2 Metrics} *) -(** Metrics. - - See - {{:https://opentelemetry.io/docs/reference/specification/overview/#metric-signal} - the spec} *) -module Metrics = struct - open Proto - open Proto.Metrics - - type t = Metrics.metric - (** A single metric, measuring some time-varying quantity or statistical - distribution. It is composed of one or more data points that have precise - values and time stamps. Each distinct metric should have a distinct name. - *) - - open struct - let _program_start = Timestamp_ns.now_unix_ns () - end - - (** Number data point, as a float *) - let float ?(start_time_unix_nano = _program_start) - ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (d : float) : - number_data_point = - let attributes = attrs |> List.map _conv_key_value in - make_number_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes - ~value:(As_double d) () - - (** Number data point, as an int *) - let int ?(start_time_unix_nano = _program_start) - ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) (i : int) : - number_data_point = - let attributes = attrs |> List.map _conv_key_value in - make_number_data_point ~start_time_unix_nano ~time_unix_nano:now ~attributes - ~value:(As_int (Int64.of_int i)) - () - - (** Aggregation of a scalar metric, always with the current value *) - let gauge ~name ?description ?unit_ (l : number_data_point list) : t = - let data = Gauge (make_gauge ~data_points:l ()) in - make_metric ~name ?description ?unit_ ~data () - - type aggregation_temporality = Metrics.aggregation_temporality = - | Aggregation_temporality_unspecified - | Aggregation_temporality_delta - | Aggregation_temporality_cumulative - - (** Sum of all reported measurements over a time interval *) - let sum ~name ?description ?unit_ - ?(aggregation_temporality = Aggregation_temporality_cumulative) - ?is_monotonic (l : number_data_point list) : t = - let data = - Sum (make_sum ~data_points:l ?is_monotonic ~aggregation_temporality ()) - in - make_metric ~name ?description ?unit_ ~data () - - (** Histogram data - @param count number of values in population (non negative) - @param sum sum of values in population (0 if count is 0) - @param bucket_counts - count value of histogram for each bucket. Sum of the counts must be - equal to [count]. length must be [1+length explicit_bounds] - @param explicit_bounds strictly increasing list of bounds for the buckets - *) - let histogram_data_point ?(start_time_unix_nano = _program_start) - ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = []) - ?(explicit_bounds = []) ?sum ~bucket_counts ~count () : - histogram_data_point = - let attributes = attrs |> List.map _conv_key_value in - make_histogram_data_point ~start_time_unix_nano ~time_unix_nano:now - ~attributes ~exemplars ~bucket_counts ~explicit_bounds ~count ?sum () - - let histogram ~name ?description ?unit_ ?aggregation_temporality - (l : histogram_data_point list) : t = - let data = - Histogram (make_histogram ~data_points:l ?aggregation_temporality ()) - in - make_metric ~name ?description ?unit_ ~data () - - (* TODO: exponential history *) - (* TODO: summary *) - (* TODO: exemplar *) - - (** Aggregate metrics into a {!Proto.Metrics.resource_metrics} *) - let make_resource_metrics ?service_name ?attrs (l : t list) : resource_metrics - = - let lm = - make_scope_metrics ~scope:Globals.instrumentation_library ~metrics:l () - in - let attributes = Globals.mk_attributes ?service_name ?attrs () in - let resource = Proto.Resource.make_resource ~attributes () in - make_resource_metrics ~scope_metrics:[ lm ] ~resource () - - (** Emit some metrics to the collector (sync). This blocks until the backend - has pushed the metrics into some internal queue, or discarded them. - - {b NOTE} be careful not to call this inside a Gc alarm, as it can cause - deadlocks. *) - let emit ?attrs (l : t list) : unit = - let rm = make_resource_metrics ?attrs l in - Collector.send_metrics [ rm ] ~ret:ignore -end - -(** A set of callbacks that produce metrics when called. The metrics are - automatically called regularly. - - This allows applications to register metrics callbacks from various points - in the program (or even in libraries), and not worry about setting - alarms/intervals to emit them. *) -module Metrics_callbacks = struct - open struct - (* [true] iff the initial list of metric callbacks has already been registered - with `on_tick`. This registration must only happen once, after which, - [registered_with_on_tick] will forever be [false]. *) - let registered_with_on_tick : bool Atomic.t = Atomic.make false - - let cbs_ : (unit -> Metrics.t list) AList.t = AList.make () - end - - (** [register f] adds the callback [f] to the list. - - [f] will be called at unspecified times and is expected to return a list - of metrics. It might be called regularly by the backend, in particular - (but not only) when {!Collector.tick} is called. *) - let register f : unit = - (* sets [registered_with_on_tick] to [true] atomically, iff it is currently - [false]. *) - if not (Atomic.exchange registered_with_on_tick true) then - (* make sure we call [f] (and others) at each tick *) - Collector.on_tick (fun () -> - let m = List.map (fun f -> f ()) (AList.get cbs_) |> List.flatten in - Metrics.emit m); - AList.add cbs_ f -end +module Metrics = Metrics +module Metrics_callbacks = Metrics_callbacks +module Metrics_emitter = Metrics_emitter (** {2 Logs} *) -(** Logs. - - See - {{:https://opentelemetry.io/docs/reference/specification/overview/#log-signal} - the spec} *) -module Logs = struct - open Opentelemetry_proto - open Logs - - type t = log_record - - (** Severity level of a log event *) - type severity = Logs.severity_number = - | Severity_number_unspecified - | Severity_number_trace - | Severity_number_trace2 - | Severity_number_trace3 - | Severity_number_trace4 - | Severity_number_debug - | Severity_number_debug2 - | Severity_number_debug3 - | Severity_number_debug4 - | Severity_number_info - | Severity_number_info2 - | Severity_number_info3 - | Severity_number_info4 - | Severity_number_warn - | Severity_number_warn2 - | Severity_number_warn3 - | Severity_number_warn4 - | Severity_number_error - | Severity_number_error2 - | Severity_number_error3 - | Severity_number_error4 - | Severity_number_fatal - | Severity_number_fatal2 - | Severity_number_fatal3 - | Severity_number_fatal4 - - let pp_severity = Logs.pp_severity_number - - type flags = Logs.log_record_flags = - | Log_record_flags_do_not_use - | Log_record_flags_trace_flags_mask - - let pp_flags = Logs.pp_log_record_flags - - (** Make a single log entry *) - let make ?time ?(observed_time_unix_nano = Timestamp_ns.now_unix_ns ()) - ?severity ?log_level ?flags ?trace_id ?span_id (body : value) : t = - let time_unix_nano = - match time with - | None -> observed_time_unix_nano - | Some t -> t - in - let trace_id = Option.map Trace_id.to_bytes trace_id in - let span_id = Option.map Span_id.to_bytes span_id in - let body = _conv_value body in - make_log_record ~time_unix_nano ~observed_time_unix_nano - ?severity_number:severity ?severity_text:log_level ?flags ?trace_id - ?span_id ?body () - - (** Make a log entry whose body is a string *) - let make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags - ?trace_id ?span_id (body : string) : t = - make ?time ?observed_time_unix_nano ?severity ?log_level ?flags ?trace_id - ?span_id (`String body) - - (** Make a log entry with format *) - let make_strf ?time ?observed_time_unix_nano ?severity ?log_level ?flags - ?trace_id ?span_id fmt = - Format.kasprintf - (fun bod -> - make_str ?time ?observed_time_unix_nano ?severity ?log_level ?flags - ?trace_id ?span_id bod) - fmt - - (** Emit logs. - - This instructs the collector to send the logs to some backend at a later - date. {b NOTE} be careful not to call this inside a Gc alarm, as it can - cause deadlocks. *) - let emit ?service_name ?attrs (l : t list) : unit = - let attributes = Globals.mk_attributes ?service_name ?attrs () in - let resource = Proto.Resource.make_resource ~attributes () in - let ll = - make_scope_logs ~scope:Globals.instrumentation_library ~log_records:l () - in - let rl = make_resource_logs ~resource ~scope_logs:[ ll ] () in - Collector.send_logs [ rl ] ~ret:ignore -end +module Log_record = Log_record +module Logger = Logger +module Logs = Logger [@@deprecated "use Logger"] (** {2 Utils} *) -(** Implementation of the W3C Trace Context spec +module Trace_context = Trace_context +module Gc_metrics = Gc_metrics - https://www.w3.org/TR/trace-context/ *) -module Trace_context = struct - (** The traceparent header - https://www.w3.org/TR/trace-context/#traceparent-header *) - module Traceparent = struct - let name = "traceparent" +(* *) - (** Parse the value of the traceparent header. - - The values are of the form: - - {[ - { version } - { trace_id } - { parent_id } - { flags } - ]} - - For example: - - {[ - 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 - ]} - - [{flags}] are currently ignored. *) - let of_value str : (Trace_id.t * Span_id.t, string) result = - match Span_ctx.of_w3c_trace_context (Bytes.unsafe_of_string str) with - | Ok sp -> Ok (Span_ctx.trace_id sp, Span_ctx.parent_id sp) - | Error _ as e -> e - - let to_value ?(sampled : bool option) ~(trace_id : Trace_id.t) - ~(parent_id : Span_id.t) () : string = - let span_ctx = Span_ctx.make ?sampled ~trace_id ~parent_id () in - Bytes.unsafe_to_string @@ Span_ctx.to_w3c_trace_context span_ctx - end -end - -(** Export GC metrics. - - These metrics are emitted after each GC collection. *) -module GC_metrics : sig - val basic_setup : unit -> unit - (** Setup a hook that will emit GC statistics on every tick (assuming a ticker - thread) *) - - val get_runtime_attributes : unit -> Span.key_value list - (** Get OCaml name and version runtime attributes *) - - val get_metrics : unit -> Metrics.t list - (** Get a few metrics from the current state of the GC *) -end = struct - (** See - https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes - *) - let runtime_attributes = - lazy - Conventions.Attributes. - [ - Process.Runtime.name, `String "ocaml"; - Process.Runtime.version, `String Sys.ocaml_version; - ] - - let get_runtime_attributes () = Lazy.force runtime_attributes - - let basic_setup () = - let on_tick () = - match Collector.get_backend () with - | None -> () - | Some (module C) -> C.signal_emit_gc_metrics () - in - Collector.on_tick on_tick - - let bytes_per_word = Sys.word_size / 8 - - let word_to_bytes n = n * bytes_per_word - - let word_to_bytes_f n = n *. float bytes_per_word - - let get_metrics () : Metrics.t list = - let gc = Gc.quick_stat () in - let now = Timestamp_ns.now_unix_ns () in - let open Metrics in - let open Conventions.Metrics in - [ - gauge ~name:Process.Runtime.Ocaml.GC.major_heap ~unit_:"B" - [ int ~now (word_to_bytes gc.Gc.heap_words) ]; - sum ~name:Process.Runtime.Ocaml.GC.minor_allocated - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true ~unit_:"B" - [ float ~now (word_to_bytes_f gc.Gc.minor_words) ]; - sum ~name:Process.Runtime.Ocaml.GC.minor_collections - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~now gc.Gc.minor_collections ]; - sum ~name:Process.Runtime.Ocaml.GC.major_collections - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~now gc.Gc.major_collections ]; - sum ~name:Process.Runtime.Ocaml.GC.compactions - ~aggregation_temporality:Metrics.Aggregation_temporality_cumulative - ~is_monotonic:true - [ int ~now gc.Gc.compactions ]; - ] -end +module GC_metrics = Gc_metrics +[@@deprecated "use Gc_metrics (beware capitalization)"] diff --git a/src/core/rand_bytes.ml b/src/core/rand_bytes.ml index 18cf7dc8..336020a5 100644 --- a/src/core/rand_bytes.ml +++ b/src/core/rand_bytes.ml @@ -1,38 +1,39 @@ -(* generate random IDs *) -let rand_ = Random.State.make_self_init () - -let ( let@ ) = ( @@ ) - let default_rand_bytes_8 () : bytes = - let@ () = Lock.with_lock in let b = Bytes.create 8 in for i = 0 to 1 do - let r = Random.State.bits rand_ in + (* rely on the stdlib's [Random] being thread-or-domain safe *) + let r = Random.bits () in (* 30 bits, of which we use 24 *) Bytes.set b (i * 3) (Char.chr (r land 0xff)); Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff)); Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff)) done; - let r = Random.State.bits rand_ in + let r = Random.bits () in Bytes.set b 6 (Char.chr (r land 0xff)); Bytes.set b 7 (Char.chr ((r lsr 8) land 0xff)); b let default_rand_bytes_16 () : bytes = - let@ () = Lock.with_lock in let b = Bytes.create 16 in for i = 0 to 4 do - let r = Random.State.bits rand_ in + (* rely on the stdlib's [Random] being thread-or-domain safe *) + let r = Random.bits () in (* 30 bits, of which we use 24 *) Bytes.set b (i * 3) (Char.chr (r land 0xff)); Bytes.set b ((i * 3) + 1) (Char.chr ((r lsr 8) land 0xff)); Bytes.set b ((i * 3) + 2) (Char.chr ((r lsr 16) land 0xff)) done; - let r = Random.State.bits rand_ in + let r = Random.bits () in Bytes.set b 15 (Char.chr (r land 0xff)); (* last byte *) b -let rand_bytes_16 = ref default_rand_bytes_16 +let rand_bytes_16_ref = ref default_rand_bytes_16 -let rand_bytes_8 = ref default_rand_bytes_8 +let rand_bytes_8_ref = ref default_rand_bytes_8 + +(** Generate a 16B identifier *) +let[@inline] rand_bytes_16 () = !rand_bytes_16_ref () + +(** Generate an 8B identifier *) +let[@inline] rand_bytes_8 () = !rand_bytes_8_ref () diff --git a/src/core/rand_bytes.mli b/src/core/rand_bytes.mli index 7c42ea35..4b015b27 100644 --- a/src/core/rand_bytes.mli +++ b/src/core/rand_bytes.mli @@ -2,12 +2,12 @@ We need random identifiers for trace IDs and span IDs. *) -val rand_bytes_16 : (unit -> bytes) ref +val rand_bytes_16_ref : (unit -> bytes) ref (** Generate 16 bytes of random data. The implementation can be swapped to use any random generator. *) -val rand_bytes_8 : (unit -> bytes) ref -(** Generate 16 bytes of random data. The implementation can be swapped to use +val rand_bytes_8_ref : (unit -> bytes) ref +(** Generate 8 bytes of random data. The implementation can be swapped to use any random generator. *) val default_rand_bytes_8 : unit -> bytes @@ -15,3 +15,9 @@ val default_rand_bytes_8 : unit -> bytes val default_rand_bytes_16 : unit -> bytes (** Default implementation using {!Random} *) + +val rand_bytes_16 : unit -> bytes +(** Call the current {!rand_bytes_16_ref} *) + +val rand_bytes_8 : unit -> bytes +(** Call the current {!rand_bytes_8_ref} *) diff --git a/src/core/scope.ml b/src/core/scope.ml new file mode 100644 index 00000000..aa5cb19a --- /dev/null +++ b/src/core/scope.ml @@ -0,0 +1,131 @@ +open Common_ + +type item_list = + | Nil + | Ev of Event.t * item_list + | Attr of Key_value.t * item_list + | Span_link of Span_link.t * item_list + | Span_status of Span_status.t * item_list + | Span_kind of Span_kind.t * item_list + +type t = { + trace_id: Trace_id.t; + span_id: Span_id.t; + mutable items: item_list; +} + +let attrs scope = + let rec loop acc = function + | Nil -> acc + | Attr (attr, l) -> loop (attr :: acc) l + | Ev (_, l) | Span_kind (_, l) | Span_link (_, l) | Span_status (_, l) -> + loop acc l + in + loop [] scope.items + +let events scope = + let rec loop acc = function + | Nil -> acc + | Ev (event, l) -> loop (event :: acc) l + | Attr (_, l) | Span_kind (_, l) | Span_link (_, l) | Span_status (_, l) -> + loop acc l + in + loop [] scope.items + +let links scope = + let rec loop acc = function + | Nil -> acc + | Span_link (span_link, l) -> loop (span_link :: acc) l + | Ev (_, l) | Span_kind (_, l) | Attr (_, l) | Span_status (_, l) -> + loop acc l + in + loop [] scope.items + +let status scope = + let rec loop = function + | Nil -> None + | Span_status (status, _) -> Some status + | Ev (_, l) | Attr (_, l) | Span_kind (_, l) | Span_link (_, l) -> loop l + in + loop scope.items + +let kind scope = + let rec loop = function + | Nil -> None + | Span_kind (k, _) -> Some k + | Ev (_, l) | Span_status (_, l) | Attr (_, l) | Span_link (_, l) -> loop l + in + loop scope.items + +let make ~trace_id ~span_id ?(events = []) ?(attrs = []) ?(links = []) ?status + () : t = + let items = + let items = + match status with + | None -> Nil + | Some status -> Span_status (status, Nil) + in + let items = List.fold_left (fun acc ev -> Ev (ev, acc)) items events in + let items = List.fold_left (fun acc attr -> Attr (attr, acc)) items attrs in + List.fold_left (fun acc link -> Span_link (link, acc)) items links + in + { trace_id; span_id; items } + +let[@inline] to_span_link ?trace_state ?attrs ?dropped_attributes_count + (self : t) : Span_link.t = + Span_link.make ?trace_state ?attrs ?dropped_attributes_count + ~trace_id:self.trace_id ~span_id:self.span_id () + +let[@inline] to_span_ctx (self : t) : Span_ctx.t = + Span_ctx.make ~trace_id:self.trace_id ~parent_id:self.span_id () + +open struct + let[@inline] is_not_dummy (self : t) : bool = Span_id.is_valid self.span_id +end + +let[@inline] add_event (self : t) (ev : unit -> Event.t) : unit = + if is_not_dummy self then self.items <- Ev (ev (), self.items) + +let[@inline] record_exception (self : t) (exn : exn) + (bt : Printexc.raw_backtrace) : unit = + if is_not_dummy self then ( + let ev = + Event.make "exception" + ~attrs: + [ + "exception.message", `String (Printexc.to_string exn); + "exception.type", `String (Printexc.exn_slot_name exn); + ( "exception.stacktrace", + `String (Printexc.raw_backtrace_to_string bt) ); + ] + in + self.items <- Ev (ev, self.items) + ) + +let[@inline] add_attrs (self : t) (attrs : unit -> Key_value.t list) : unit = + if is_not_dummy self then + self.items <- + List.fold_left (fun acc attr -> Attr (attr, acc)) self.items (attrs ()) + +let[@inline] add_links (self : t) (links : unit -> Span_link.t list) : unit = + if is_not_dummy self then + self.items <- + List.fold_left + (fun acc link -> Span_link (link, acc)) + self.items (links ()) + +let set_status (self : t) (status : Span_status.t) : unit = + if is_not_dummy self then self.items <- Span_status (status, self.items) + +let set_kind (self : t) (k : Span_kind.t) : unit = + if is_not_dummy self then self.items <- Span_kind (k, self.items) + +let ambient_scope_key : t Ambient_context.key = Ambient_context.create_key () + +let get_ambient_scope ?scope () : t option = + match scope with + | Some _ -> scope + | None -> Ambient_context.get ambient_scope_key + +let[@inline] with_ambient_scope (sc : t) (f : unit -> 'a) : 'a = + Ambient_context.with_binding ambient_scope_key sc (fun _ -> f ()) diff --git a/src/core/scope.mli b/src/core/scope.mli new file mode 100644 index 00000000..9ba60d0e --- /dev/null +++ b/src/core/scope.mli @@ -0,0 +1,89 @@ +(** Scopes. + + A scope is a trace ID and the span ID of the currently active span. *) + +open Common_ + +type item_list + +type t = { + trace_id: Trace_id.t; + span_id: Span_id.t; + mutable items: item_list; +} + +val attrs : t -> Key_value.t list + +val events : t -> Event.t list + +val links : t -> Span_link.t list + +val status : t -> Span_status.t option + +val kind : t -> Span_kind.t option + +val make : + trace_id:Trace_id.t -> + span_id:Span_id.t -> + ?events:Event.t list -> + ?attrs:Key_value.t list -> + ?links:Span_link.t list -> + ?status:Span_status.t -> + unit -> + t + +val to_span_link : + ?trace_state:string -> + ?attrs:Key_value.t list -> + ?dropped_attributes_count:int -> + t -> + Span_link.t +(** Turn the scope into a span link *) + +val to_span_ctx : t -> Span_ctx.t +(** Turn the scope into a span context *) + +val add_event : t -> (unit -> Event.t) -> unit +(** Add an event to the scope. It will be aggregated into the span. + + Note that this takes a function that produces an event, and will only call + it if there is an instrumentation backend. *) + +val record_exception : t -> exn -> Printexc.raw_backtrace -> unit + +val add_attrs : t -> (unit -> Key_value.t list) -> unit +(** Add attributes to the scope. It will be aggregated into the span. + + Note that this takes a function that produces attributes, and will only call + it if there is an instrumentation backend. *) + +val add_links : t -> (unit -> Span_link.t list) -> unit +(** Add links to the scope. It will be aggregated into the span. + + Note that this takes a function that produces links, and will only call it + if there is an instrumentation backend. *) + +val set_status : t -> Span_status.t -> unit +(** set the span status. + + Note that this function will be called only if there is an instrumentation + backend. *) + +val set_kind : t -> Span_kind.t -> unit +(** Set the span's kind. + @since 0.11 *) + +val ambient_scope_key : t Ambient_context.key +(** The opaque key necessary to access/set the ambient scope with + {!Ambient_context}. *) + +val get_ambient_scope : ?scope:t -> unit -> t option +(** Obtain current scope from {!Ambient_context}, if available. *) + +val with_ambient_scope : t -> (unit -> 'a) -> 'a +(** [with_ambient_scope sc thunk] calls [thunk()] in a context where [sc] is the + (thread|continuation)-local scope, then reverts to the previous local scope, + if any. + + @see + ambient-context docs *) diff --git a/src/core/span.ml b/src/core/span.ml new file mode 100644 index 00000000..1ea8cb0b --- /dev/null +++ b/src/core/span.ml @@ -0,0 +1,38 @@ +open Common_ +open Proto.Trace + +type t = span + +type id = Span_id.t + +type kind = Span_kind.t = + | Span_kind_unspecified + | Span_kind_internal + | Span_kind_server + | Span_kind_client + | Span_kind_producer + | Span_kind_consumer + +type key_value = + string + * [ `Int of int + | `String of string + | `Bool of bool + | `Float of float + | `None + ] + +let id self = Span_id.of_bytes self.span_id + +let create ?(kind = !Globals.default_span_kind) ?(id = Span_id.create ()) + ?trace_state ?(attrs = []) ?(events = []) ?status ~trace_id ?parent + ?(links = []) ~start_time ~end_time name : t * id = + let trace_id = Trace_id.to_bytes trace_id in + let parent_span_id = Option.map Span_id.to_bytes parent in + let attributes = List.map Key_value.conv attrs in + let span = + make_span ~trace_id ?parent_span_id ~span_id:(Span_id.to_bytes id) + ~attributes ~events ?trace_state ?status ~kind ~name ~links + ~start_time_unix_nano:start_time ~end_time_unix_nano:end_time () + in + span, id diff --git a/src/core/span.mli b/src/core/span.mli new file mode 100644 index 00000000..cfb9a2de --- /dev/null +++ b/src/core/span.mli @@ -0,0 +1,46 @@ +(** Spans. + + A Span is the workhorse of traces, it indicates an operation that took place + over a given span of time (indicated by start_time and end_time) as part of + a hierarchical trace. All spans in a given trace are bound by the use of the + same {!Trace_id.t}. *) + +open Common_ +open Proto.Trace + +type t = span + +type id = Span_id.t + +type kind = Span_kind.t = + | Span_kind_unspecified + | Span_kind_internal + | Span_kind_server + | Span_kind_client + | Span_kind_producer + | Span_kind_consumer + +val id : t -> Span_id.t + +type key_value = Key_value.t + +val create : + ?kind:kind -> + ?id:id -> + ?trace_state:string -> + ?attrs:key_value list -> + ?events:Event.t list -> + ?status:status -> + trace_id:Trace_id.t -> + ?parent:id -> + ?links:Span_link.t list -> + start_time:Timestamp_ns.t -> + end_time:Timestamp_ns.t -> + string -> + t * id +(** [create ~trace_id name] creates a new span with its unique ID. + @param trace_id the trace this belongs to + @param parent parent span, if any + @param links + list of links to other spans, each with their trace state (see + {{:https://www.w3.org/TR/trace-context/#tracestate-header} w3.org}) *) diff --git a/src/core/span_ctx.ml b/src/core/span_ctx.ml new file mode 100644 index 00000000..1be170f2 --- /dev/null +++ b/src/core/span_ctx.ml @@ -0,0 +1,91 @@ +open Common_ + +(* see: https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext *) + +(* TODO: trace state *) + +external int_of_bool : bool -> int = "%identity" + +module Flags = struct + let sampled = 1 + + let remote = 2 +end + +type t = { + trace_id: Trace_id.t; + parent_id: Span_id.t; + flags: int; +} + +let dummy = { trace_id = Trace_id.dummy; parent_id = Span_id.dummy; flags = 0 } + +let make ?(remote = false) ?(sampled = false) ~trace_id ~parent_id () : t = + let flags = + 0 + lor (int_of_bool remote lsl Flags.remote) + lor (int_of_bool sampled lsl Flags.sampled) + in + { trace_id; parent_id; flags } + +let[@inline] is_valid self = + Trace_id.is_valid self.trace_id && Span_id.is_valid self.parent_id + +let[@inline] sampled self = self.flags land (1 lsl Flags.sampled) != 0 + +let[@inline] is_remote self = self.flags land (1 lsl Flags.remote) != 0 + +let[@inline] trace_id self = self.trace_id + +let[@inline] parent_id self = self.parent_id + +let to_w3c_trace_context (self : t) : bytes = + let bs = Bytes.create 55 in + Bytes.set bs 0 '0'; + Bytes.set bs 1 '0'; + Bytes.set bs 2 '-'; + Trace_id.to_hex_into self.trace_id bs 3; + (* +32 *) + Bytes.set bs (3 + 32) '-'; + Span_id.to_hex_into self.parent_id bs 36; + (* +16 *) + Bytes.set bs 52 '-'; + Bytes.set bs 53 '0'; + Bytes.set bs 54 + (if sampled self then + '1' + else + '0'); + bs + +let of_w3c_trace_context bs : _ result = + try + if Bytes.length bs <> 55 then invalid_arg "trace context must be 55 bytes"; + (match int_of_string_opt (Bytes.sub_string bs 0 2) with + | Some 0 -> () + | Some n -> invalid_arg @@ spf "version is %d, expected 0" n + | None -> invalid_arg "expected 2-digit version"); + if Bytes.get bs 2 <> '-' then invalid_arg "expected '-' before trace_id"; + let trace_id = + try Trace_id.of_hex_substring (Bytes.unsafe_to_string bs) 3 + with Invalid_argument msg -> invalid_arg (spf "in trace id: %s" msg) + in + if Bytes.get bs (3 + 32) <> '-' then + invalid_arg "expected '-' before parent_id"; + let parent_id = + try Span_id.of_hex_substring (Bytes.unsafe_to_string bs) 36 + with Invalid_argument msg -> invalid_arg (spf "in span id: %s" msg) + in + if Bytes.get bs 52 <> '-' then invalid_arg "expected '-' after parent_id"; + let sampled = int_of_string_opt (Bytes.sub_string bs 53 2) = Some 1 in + + (* ignore other flags *) + Ok (make ~remote:true ~sampled ~trace_id ~parent_id ()) + with Invalid_argument msg -> Error msg + +let of_w3c_trace_context_exn bs = + match of_w3c_trace_context bs with + | Ok t -> t + | Error msg -> invalid_arg @@ spf "invalid w3c trace context: %s" msg + +let k_span_ctx : t Hmap.key = Hmap.Key.create () diff --git a/src/core/span_ctx.mli b/src/core/span_ctx.mli new file mode 100644 index 00000000..fb72046b --- /dev/null +++ b/src/core/span_ctx.mli @@ -0,0 +1,42 @@ +(** Span context. This bundles up a trace ID and parent ID. + + {{:https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} + https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} + @since 0.7 *) + +type t + +val make : + ?remote:bool -> + ?sampled:bool -> + trace_id:Trace_id.t -> + parent_id:Span_id.t -> + unit -> + t + +val dummy : t +(** Invalid span context, to be used as a placeholder *) + +val is_remote : t -> bool +(** Does this come from a remote parent? *) + +val is_valid : t -> bool +(** Are the span ID and trace ID valid (ie non-zero)? *) + +val trace_id : t -> Trace_id.t + +val parent_id : t -> Span_id.t + +val sampled : t -> bool + +val to_w3c_trace_context : t -> bytes + +val of_w3c_trace_context : bytes -> (t, string) result + +val of_w3c_trace_context_exn : bytes -> t +(** @raise Invalid_argument if parsing failed *) + +val k_span_ctx : t Hmap.key +(** Hmap key to carry around a {!Span_ctx.t}, e.g. to remember what the current + parent span is. + @since 0.8 *) diff --git a/src/core/span_id.ml b/src/core/span_id.ml new file mode 100644 index 00000000..bf9e7731 --- /dev/null +++ b/src/core/span_id.ml @@ -0,0 +1,33 @@ +open Common_ + +type t = bytes + +let[@inline] to_bytes self = self + +let dummy : t = Bytes.make 8 '\x00' + +let create () : t = + let b = Rand_bytes.rand_bytes_8 () in + assert (Bytes.length b = 8); + (* make sure the identifier is not all 0, which is a dummy identifier. *) + Bytes.set b 0 (Char.unsafe_chr (Char.code (Bytes.get b 0) lor 1)); + b + +let is_valid = Util_bytes_.bytes_non_zero + +let[@inline] of_bytes b = + if Bytes.length b = 8 then + b + else + invalid_arg "span IDs must be 8 bytes in length" + +let to_hex = Util_bytes_.bytes_to_hex + +let to_hex_into = Util_bytes_.bytes_to_hex_into + +let[@inline] of_hex s = of_bytes (Util_bytes_.bytes_of_hex s) + +let[@inline] of_hex_substring s off = + of_bytes (Util_bytes_.bytes_of_hex_substring s off 16) + +let pp fmt t = Format.fprintf fmt "%s" (to_hex t) diff --git a/src/core/span_id.mli b/src/core/span_id.mli new file mode 100644 index 00000000..db51d475 --- /dev/null +++ b/src/core/span_id.mli @@ -0,0 +1,23 @@ +(** Unique ID of a span. *) + +type t + +val create : unit -> t + +val dummy : t + +val pp : Format.formatter -> t -> unit + +val is_valid : t -> bool + +val to_bytes : t -> bytes + +val of_bytes : bytes -> t + +val to_hex : t -> string + +val to_hex_into : t -> bytes -> int -> unit + +val of_hex : string -> t + +val of_hex_substring : string -> int -> t diff --git a/src/core/span_kind.ml b/src/core/span_kind.ml new file mode 100644 index 00000000..d3ddace4 --- /dev/null +++ b/src/core/span_kind.ml @@ -0,0 +1,13 @@ +(** Span kind. + @since 0.11 *) + +open Common_ +open Proto.Trace + +type t = span_span_kind = + | Span_kind_unspecified + | Span_kind_internal + | Span_kind_server + | Span_kind_client + | Span_kind_producer + | Span_kind_consumer diff --git a/src/core/span_link.ml b/src/core/span_link.ml new file mode 100644 index 00000000..308d3598 --- /dev/null +++ b/src/core/span_link.ml @@ -0,0 +1,20 @@ +open Common_ +open Proto.Trace + +type t = span_link + +let make ~trace_id ~span_id ?trace_state ?(attrs = []) ?dropped_attributes_count + () : t = + let attributes = List.map Key_value.conv attrs in + let dropped_attributes_count = + Option.map Int32.of_int dropped_attributes_count + in + make_span_link + ~trace_id:(Trace_id.to_bytes trace_id) + ~span_id:(Span_id.to_bytes span_id) ?trace_state ~attributes + ?dropped_attributes_count () + +let[@inline] of_span_ctx ?trace_state ?attrs ?dropped_attributes_count + (ctx : Span_ctx.t) : t = + make ~trace_id:(Span_ctx.trace_id ctx) ~span_id:(Span_ctx.parent_id ctx) + ?trace_state ?attrs ?dropped_attributes_count () diff --git a/src/core/span_link.mli b/src/core/span_link.mli new file mode 100644 index 00000000..402ff0ec --- /dev/null +++ b/src/core/span_link.mli @@ -0,0 +1,27 @@ +(** Span Link + + A pointer from the current span to another span in the same trace or in a + different trace. For example, this can be used in batching operations, where + a single batch handler processes multiple requests from different traces or + when the handler receives a request from a different project. *) + +open Common_ +open Proto.Trace + +type t = span_link + +val make : + trace_id:Trace_id.t -> + span_id:Span_id.t -> + ?trace_state:string -> + ?attrs:Key_value.t list -> + ?dropped_attributes_count:int -> + unit -> + t + +val of_span_ctx : + ?trace_state:string -> + ?attrs:Key_value.t list -> + ?dropped_attributes_count:int -> + Span_ctx.t -> + t diff --git a/src/core/span_status.ml b/src/core/span_status.ml new file mode 100644 index 00000000..388d20ac --- /dev/null +++ b/src/core/span_status.ml @@ -0,0 +1,15 @@ +open Common_ +open Proto.Trace + +type t = Proto.Trace.status = private { + mutable _presence: Pbrt.Bitfield.t; + mutable message: string; + mutable code: status_status_code; +} + +type code = status_status_code = + | Status_code_unset + | Status_code_ok + | Status_code_error + +let[@inline] make ~message ~code : t = make_status ~message ~code () diff --git a/src/core/span_status.mli b/src/core/span_status.mli new file mode 100644 index 00000000..da2e11b9 --- /dev/null +++ b/src/core/span_status.mli @@ -0,0 +1,15 @@ +open Common_ +open Proto.Trace + +type t = Proto.Trace.status = private { + mutable _presence: Pbrt.Bitfield.t; + mutable message: string; + mutable code: status_status_code; +} + +type code = status_status_code = + | Status_code_unset + | Status_code_ok + | Status_code_error + +val make : message:string -> code:code -> t diff --git a/src/core/tick_callbacks.ml b/src/core/tick_callbacks.ml new file mode 100644 index 00000000..d81a3a0f --- /dev/null +++ b/src/core/tick_callbacks.ml @@ -0,0 +1,9 @@ +type cb = unit -> unit + +type t = { cbs: cb AList.t } [@@unboxed] + +let create () : t = { cbs = AList.make () } + +let[@inline] on_tick self f = AList.add self.cbs f + +let[@inline] tick self = List.iter (fun f -> f ()) (AList.get self.cbs) diff --git a/src/core/tick_callbacks.mli b/src/core/tick_callbacks.mli new file mode 100644 index 00000000..ad7ff5bb --- /dev/null +++ b/src/core/tick_callbacks.mli @@ -0,0 +1,9 @@ +(** A collection of callbacks that are regularly called. *) + +type t + +val create : unit -> t + +val on_tick : t -> (unit -> unit) -> unit + +val tick : t -> unit diff --git a/src/core/timestamp_ns.ml b/src/core/timestamp_ns.ml new file mode 100644 index 00000000..52f7cfa2 --- /dev/null +++ b/src/core/timestamp_ns.ml @@ -0,0 +1,29 @@ +(** Unix timestamp. + + These timestamps measure time since the Unix epoch (jan 1, 1970) UTC in + nanoseconds. *) + +type t = int64 + +open struct + let ns_in_a_day = Int64.(mul 1_000_000_000L (of_int (24 * 3600))) +end + +(** Current unix timestamp in nanoseconds *) +let[@inline] now_unix_ns () : t = + let span = Ptime_clock.now () |> Ptime.to_span in + let d, ps = Ptime.Span.to_d_ps span in + let d = Int64.(mul (of_int d) ns_in_a_day) in + let ns = Int64.(div ps 1_000L) in + Int64.(add d ns) + +let pp_debug out (self : t) = + let d = Int64.(to_int (div self ns_in_a_day)) in + let ns = Int64.(rem self ns_in_a_day) in + let ps = Int64.(mul ns 1_000L) in + match Ptime.Span.of_d_ps (d, ps) with + | None -> Format.fprintf out "ts: <%Ld ns>" self + | Some span -> + (match Ptime.add_span Ptime.epoch span with + | None -> Format.fprintf out "ts: <%Ld ns>" self + | Some ptime -> Ptime.pp_human () out ptime) diff --git a/src/core/trace_context.ml b/src/core/trace_context.ml new file mode 100644 index 00000000..9c8b141d --- /dev/null +++ b/src/core/trace_context.ml @@ -0,0 +1,34 @@ +(** Implementation of the W3C Trace Context spec + + https://www.w3.org/TR/trace-context/ *) + +(** The traceparent header + https://www.w3.org/TR/trace-context/#traceparent-header *) +module Traceparent = struct + let name = "traceparent" + + (** Parse the value of the traceparent header. + + The values are of the form: + + {[ + { version } - { trace_id } - { parent_id } - { flags } + ]} + + For example: + + {[ + 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 + ]} + + [{flags}] are currently ignored. *) + let of_value str : (Trace_id.t * Span_id.t, string) result = + match Span_ctx.of_w3c_trace_context (Bytes.unsafe_of_string str) with + | Ok sp -> Ok (Span_ctx.trace_id sp, Span_ctx.parent_id sp) + | Error _ as e -> e + + let to_value ?(sampled : bool option) ~(trace_id : Trace_id.t) + ~(parent_id : Span_id.t) () : string = + let span_ctx = Span_ctx.make ?sampled ~trace_id ~parent_id () in + Bytes.unsafe_to_string @@ Span_ctx.to_w3c_trace_context span_ctx +end diff --git a/src/core/trace_id.ml b/src/core/trace_id.ml new file mode 100644 index 00000000..8893a757 --- /dev/null +++ b/src/core/trace_id.ml @@ -0,0 +1,35 @@ +open Common_ + +type t = bytes + +let[@inline] to_bytes self = self + +let dummy : t = Bytes.make 16 '\x00' + +let create () : t = + let b = Rand_bytes.rand_bytes_16 () in + assert (Bytes.length b = 16); + (* make sure the identifier is not all 0, which is a dummy identifier. *) + Bytes.set b 0 (Char.unsafe_chr (Char.code (Bytes.get b 0) lor 1)); + b + +let[@inline] of_bytes b = + if Bytes.length b = 16 then + b + else + invalid_arg "trace ID must be 16 bytes in length" + +let is_valid = Util_bytes_.bytes_non_zero + +let to_hex = Util_bytes_.bytes_to_hex + +let to_hex_into = Util_bytes_.bytes_to_hex_into + +let[@inline] of_hex s = of_bytes (Util_bytes_.bytes_of_hex s) + +let[@inline] of_hex_substring s off = + of_bytes (Util_bytes_.bytes_of_hex_substring s off 32) + +let pp fmt t = Format.fprintf fmt "%s" (to_hex t) + +let k_trace_id : t Hmap.key = Hmap.Key.create () diff --git a/src/core/trace_id.mli b/src/core/trace_id.mli new file mode 100644 index 00000000..487c901b --- /dev/null +++ b/src/core/trace_id.mli @@ -0,0 +1,30 @@ +(** Trace ID. + + This 16 bytes identifier is shared by all spans in one trace. *) + +type t + +val create : unit -> t + +val dummy : t + +val pp : Format.formatter -> t -> unit + +val is_valid : t -> bool + +val to_bytes : t -> bytes + +val of_bytes : bytes -> t + +val to_hex : t -> string + +val to_hex_into : t -> bytes -> int -> unit + +val of_hex : string -> t + +val of_hex_substring : string -> int -> t + +val k_trace_id : t Hmap.key +(** Hmap key to carry around a {!Trace_id.t}, to remember what the current trace + is. + @since 0.8 *) diff --git a/src/core/tracer.ml b/src/core/tracer.ml new file mode 100644 index 00000000..6045df9d --- /dev/null +++ b/src/core/tracer.ml @@ -0,0 +1,165 @@ +(** Traces. + + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal} + the spec} *) + +open Common_ +open Proto.Trace + +type span = Span.t + +(** A tracer. + + https://opentelemetry.io/docs/specs/otel/trace/api/#tracer *) +class type t = object + method is_enabled : unit -> bool + + method emit : span list -> unit +end + +(** Dummy tracer, always disabled *) +let dummy : t = + object + method is_enabled () = false + + method emit _ = () + end + +(** A simple exporter that directly calls the exporter. *) +class simple (exp : #Exporter.t) : t = + object + method is_enabled () = true + + method emit spans = if spans <> [] then Exporter.send_trace exp spans + end + +(** A tracer that uses {!Exporter.Main_exporter} *) +let simple_main_exporter : t = + object + method is_enabled () = Exporter.Main_exporter.present () + + method emit spans = + match Exporter.Main_exporter.get () with + | None -> () + | Some exp -> exp#send_trace spans + end + +(** Directly emit to the main exporter. + + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. *) +let emit ?service_name:_ ?attrs:_ (spans : span list) : unit = + match Exporter.Main_exporter.get () with + | None -> () + | Some exp -> exp#send_trace spans +[@@deprecated "use an explicit tracer"] + +(* TODO: remove scope, use span directly *) +type scope = Scope.t = { + trace_id: Trace_id.t; + span_id: Span_id.t; + mutable items: Scope.item_list; +} +[@@deprecated "use Scope.t"] + +let (add_event [@deprecated "use Scope.add_event"]) = Scope.add_event + +let (add_attrs [@deprecated "use Scope.add_attrs"]) = Scope.add_attrs + +let with_' ?(tracer = simple_main_exporter) ?(force_new_trace_id = false) + ?trace_state ?(attrs : (string * [< Value.t ]) list = []) ?kind ?trace_id + ?parent ?scope ?(links = []) name cb = + let scope = + if force_new_trace_id then + None + else + Scope.get_ambient_scope ?scope () + in + let trace_id = + match trace_id, scope with + | _ when force_new_trace_id -> Trace_id.create () + | Some trace_id, _ -> trace_id + | None, Some scope -> scope.trace_id + | None, None -> Trace_id.create () + in + let parent = + match parent, scope with + | _ when force_new_trace_id -> None + | Some span_id, _ -> Some span_id + | None, Some scope -> Some scope.span_id + | None, None -> None + in + let start_time = Timestamp_ns.now_unix_ns () in + let span_id = Span_id.create () in + let scope = Scope.make ~trace_id ~span_id ~attrs ~links () in + (* called once we're done, to emit a span *) + let finally res = + let status = + match Scope.status scope with + | Some status -> Some status + | None -> + (match res with + | Ok () -> + (* By default, all spans are Unset, which means a span completed without error. + The Ok status is reserved for when you need to explicitly mark a span as successful + rather than stick with the default of Unset (i.e., “without error”). + + https://opentelemetry.io/docs/languages/go/instrumentation/#set-span-status *) + None + | Error (e, bt) -> + Scope.record_exception scope e bt; + Some + (make_status ~code:Status_code_error ~message:(Printexc.to_string e) + ())) + in + let span, _ = + (* TODO: should the attrs passed to with_ go on the Span + (in Span.create) or on the ResourceSpan (in emit)? + (question also applies to Opentelemetry_lwt.Trace.with) *) + Span.create ?kind ~trace_id ?parent ~links:(Scope.links scope) ~id:span_id + ?trace_state ~attrs:(Scope.attrs scope) ~events:(Scope.events scope) + ~start_time + ~end_time:(Timestamp_ns.now_unix_ns ()) + ?status name + in + + tracer#emit [ span ] + in + let thunk () = + (* set global scope in this thread *) + Scope.with_ambient_scope scope @@ fun () -> cb scope + in + thunk, finally + +(** Sync span guard. + + Notably, this includes {e implicit} scope-tracking: if called without a + [~scope] argument (or [~parent]/[~trace_id]), it will check in the + {!Ambient_context} for a surrounding environment, and use that as the scope. + Similarly, it uses {!Scope.with_ambient_scope} to {e set} a new scope in the + ambient context, so that any logically-nested calls to {!with_} will use + this span as their parent. + + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. + + @param force_new_trace_id + if true (default false), the span will not use a ambient scope, the + [~scope] argument, nor [~trace_id], but will instead always create fresh + identifiers for this span *) +let with_ ?tracer ?force_new_trace_id ?trace_state ?attrs ?kind ?trace_id + ?parent ?scope ?links name (cb : Scope.t -> 'a) : 'a = + let thunk, finally = + with_' ?tracer ?force_new_trace_id ?trace_state ?attrs ?kind ?trace_id + ?parent ?scope ?links name cb + in + + try + let rv = thunk () in + finally (Ok ()); + rv + with e -> + let bt = Printexc.get_raw_backtrace () in + finally (Error (e, bt)); + raise e diff --git a/src/core/util_bytes_.ml b/src/core/util_bytes_.ml new file mode 100644 index 00000000..9326991a --- /dev/null +++ b/src/core/util_bytes_.ml @@ -0,0 +1,47 @@ +open Common_ + +let int_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + +let bytes_to_hex_into b res off : unit = + for i = 0 to Bytes.length b - 1 do + let n = Char.code (Bytes.get b i) in + Bytes.set res ((2 * i) + off) (int_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1 + off) (int_to_hex (n land 0x0f)) + done + +let bytes_to_hex (b : bytes) : string = + let res = Bytes.create (2 * Bytes.length b) in + bytes_to_hex_into b res 0; + Bytes.unsafe_to_string res + +let int_of_hex = function + | '0' .. '9' as c -> Char.code c - Char.code '0' + | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' + | c -> raise (Invalid_argument (spf "invalid hex char: %C" c)) + +let bytes_of_hex_substring (s : string) off len = + if len mod 2 <> 0 then + raise (Invalid_argument "hex sequence must be of even length"); + let res = Bytes.make (len / 2) '\x00' in + for i = 0 to (len / 2) - 1 do + let n1 = int_of_hex (String.get s (off + (2 * i))) in + let n2 = int_of_hex (String.get s (off + (2 * i) + 1)) in + let n = (n1 lsl 4) lor n2 in + Bytes.set res i (Char.chr n) + done; + res + +let bytes_of_hex (s : string) : bytes = + bytes_of_hex_substring s 0 (String.length s) + +let bytes_non_zero (self : bytes) : bool = + try + for i = 0 to Bytes.length self - 1 do + if Char.code (Bytes.unsafe_get self i) <> 0 then raise_notrace Exit + done; + false + with Exit -> true diff --git a/src/core/value.ml b/src/core/value.ml new file mode 100644 index 00000000..97fc0503 --- /dev/null +++ b/src/core/value.ml @@ -0,0 +1,19 @@ +open Common_ + +type t = + [ `Int of int + | `String of string + | `Bool of bool + | `Float of float + | `None + ] +(** A value in a key/value attribute *) + +let conv = + let open Proto.Common in + function + | `Int i -> Some (Int_value (Int64.of_int i)) + | `String s -> Some (String_value s) + | `Bool b -> Some (Bool_value b) + | `Float f -> Some (Double_value f) + | `None -> None