From 6a378e49cec1c883cf82f84c5e58bddfa868409c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 17 Apr 2025 14:59:28 -0400 Subject: [PATCH 1/4] format --- src/core/opentelemetry.ml | 228 +++++++++++++++++++------------------- 1 file changed, 117 insertions(+), 111 deletions(-) diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index 4de03c5d..640a67d2 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -22,14 +22,14 @@ module AList = AList module Proto = Opentelemetry_proto (** Protobuf types. - This is mostly useful internally. Users should not need to touch it. *) + This is mostly useful internally. Users should not need to touch it. *) (** {2 Timestamps} *) (** Unix timestamp. - These timestamps measure time since the Unix epoch (jan 1, 1970) UTC - in nanoseconds. *) + These timestamps measure time since the Unix epoch (jan 1, 1970) UTC in + nanoseconds. *) module Timestamp_ns = struct type t = int64 @@ -56,18 +56,15 @@ module Collector = struct open Opentelemetry_proto type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a } - (** Sender interface for a message of type [msg]. - Inspired from Logs' reporter - (see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) - but without [over] as it doesn't make much sense in presence - of batching. + (** Sender interface for a message of type [msg]. Inspired from Logs' reporter + (see + {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc}) + but without [over] as it doesn't make much sense in presence of batching. - The [ret] callback is used to return the desired type (unit, or - a Lwt promise, or anything else) once the event has been transferred - to the backend. - It doesn't mean the event has been collected yet, it - could sit in a batch queue for a little while. - *) + The [ret] callback is used to return the desired type (unit, or a Lwt + promise, or anything else) once the event has been transferred to the + backend. It doesn't mean the event has been collected yet, it could sit in + a batch queue for a little while. *) (** Collector client interface. *) module type BACKEND = sig @@ -79,18 +76,18 @@ module Collector = struct val signal_emit_gc_metrics : unit -> unit (** Signal the backend that it should emit GC metrics when it has the - chance. This should be installed in a GC alarm or another form - of regular trigger. *) + chance. This should be installed in a GC alarm or another form of + regular trigger. *) val tick : unit -> unit - (** Should be called regularly for background processing, - timeout checks, etc. *) + (** Should be called regularly for background processing, timeout checks, + etc. *) val set_on_tick_callbacks : (unit -> unit) AList.t -> unit - (** Give the collector the list of callbacks to be executed - when [tick()] is called. Each such callback should be short and - reentrant. Depending on the collector's implementation, it might be - called from a thread that is not the one that called [on_tick]. *) + (** Give the collector the list of callbacks to be executed when [tick()] is + called. Each such callback should be short and reentrant. Depending on + the collector's implementation, it might be called from a thread that is + not the one that called [on_tick]. *) val cleanup : unit -> unit end @@ -209,8 +206,8 @@ module Collector = struct let[@inline] on_tick f = AList.add on_tick_cbs_ f - (** Do background work. Call this regularly if the collector doesn't - already have a ticker thread or internal timer. *) + (** Do background work. Call this regularly if the collector doesn't already + have a ticker thread or internal timer. *) let tick () = match Atomic.get backend with | None -> () @@ -338,8 +335,8 @@ end = struct let pp fmt t = Format.fprintf fmt "%s" (to_hex t) end -(** Hmap key to carry around a {!Trace_id.t}, to remember what the current - trace is. +(** Hmap key to carry around a {!Trace_id.t}, to remember what the current trace + is. @since 0.8 *) let k_trace_id : Trace_id.t Hmap.key = Hmap.Key.create () @@ -402,7 +399,8 @@ end (** Span context. This bundles up a trace ID and parent ID. - {{: https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} + {{:https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} + https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} @since 0.7 *) module Span_ctx : sig type t @@ -510,7 +508,8 @@ let k_span_ctx : Span_ctx.t Hmap.key = Hmap.Key.create () (** Semantic conventions - {{: https://opentelemetry.io/docs/specs/semconv/} https://opentelemetry.io/docs/specs/semconv/} *) + {{:https://opentelemetry.io/docs/specs/semconv/} + https://opentelemetry.io/docs/specs/semconv/} *) module Conventions = struct module Attributes = struct module Process = struct @@ -570,7 +569,8 @@ module Conventions = struct let url_scheme = "url.scheme" end - (** https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/host.md *) + (** https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/host.md + *) module Host = struct let id = "host.id" @@ -684,9 +684,9 @@ module Globals = struct default_instrumentation_scope ~version:"%%VERSION_NUM%%" ~name:"ocaml-otel" () - (** Global attributes, initially set - via OTEL_RESOURCE_ATTRIBUTES and modifiable - by the user code. They will be attached to each outgoing metrics/traces. *) + (** Global attributes, initially set via OTEL_RESOURCE_ATTRIBUTES and + modifiable by the user code. They will be attached to each outgoing + metrics/traces. *) let global_attributes : key_value list ref = let parse_pair s = match String.split_on_char '=' s with @@ -709,10 +709,10 @@ module Globals = struct let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in List.rev_append (List.filter not_redundant !global_attributes) into - (** Default span kind in {!Span.create}. - This will be used in all spans that do not specify [~kind] explicitly; - it is set to "internal", following directions from the [.proto] file. - It can be convenient to set "client" or "server" uniformly in here. + (** Default span kind in {!Span.create}. This will be used in all spans that + do not specify [~kind] explicitly; it is set to "internal", following + directions from the [.proto] file. It can be convenient to set "client" or + "server" uniformly in here. @since 0.4 *) let default_span_kind = ref Proto.Trace.Span_kind_internal @@ -746,8 +746,8 @@ end (** Events. - Events occur at a given time and can carry attributes. They always - belong in a span. *) + Events occur at a given time and can carry attributes. They always belong in + a span. *) module Event : sig open Proto.Trace @@ -768,11 +768,10 @@ end (** Span Link - A pointer from the current span to another span in the same trace or in a - different trace. For example, this can be used in batching operations, - where a single batch handler processes multiple requests from different - traces or when the handler receives a request from a different project. -*) + A pointer from the current span to another span in the same trace or in a + different trace. For example, this can be used in batching operations, where + a single batch handler processes multiple requests from different traces or + when the handler receives a request from a different project. *) module Span_link : sig open Proto.Trace @@ -872,8 +871,7 @@ end (** Scopes. - A scope is a trace ID and the span ID of the currently active span. -*) + A scope is a trace ID and the span ID of the currently active span. *) module Scope : sig type item_list @@ -917,28 +915,28 @@ module Scope : sig val add_event : t -> (unit -> Event.t) -> unit (** Add an event to the scope. It will be aggregated into the span. - Note that this takes a function that produces an event, and will only - call it if there is an instrumentation backend. *) + Note that this takes a function that produces an event, and will only call + it if there is an instrumentation backend. *) val record_exception : t -> exn -> Printexc.raw_backtrace -> unit val add_attrs : t -> (unit -> key_value list) -> unit (** Add attributes to the scope. It will be aggregated into the span. - Note that this takes a function that produces attributes, and will only - call it if there is an instrumentation backend. *) + Note that this takes a function that produces attributes, and will only + call it if there is an instrumentation backend. *) val add_links : t -> (unit -> Span_link.t list) -> unit (** Add links to the scope. It will be aggregated into the span. - Note that this takes a function that produces links, and will only - call it if there is an instrumentation backend. *) + Note that this takes a function that produces links, and will only call it + if there is an instrumentation backend. *) val set_status : t -> Span_status.t -> unit (** set the span status. - Note that this function will be - called only if there is an instrumentation backend. *) + Note that this function will be called only if there is an instrumentation + backend. *) val set_kind : t -> Span_kind.t -> unit (** Set the span's kind. @@ -946,17 +944,18 @@ module Scope : sig val ambient_scope_key : t Ambient_context.key (** The opaque key necessary to access/set the ambient scope with - {!Ambient_context}. *) + {!Ambient_context}. *) val get_ambient_scope : ?scope:t -> unit -> t option (** Obtain current scope from {!Ambient_context}, if available. *) val with_ambient_scope : t -> (unit -> 'a) -> 'a (** [with_ambient_scope sc thunk] calls [thunk()] in a context where [sc] is - the (thread|continuation)-local scope, then reverts to the previous local - scope, if any. + the (thread|continuation)-local scope, then reverts to the previous local + scope, if any. - @see ambient-context docs *) + @see + ambient-context docs *) end = struct type item_list = | Nil @@ -1093,10 +1092,10 @@ end (** Spans. - A Span is the workhorse of traces, it indicates an operation that - took place over a given span of time (indicated by start_time and end_time) - as part of a hierarchical trace. All spans in a given trace are bound by - the use of the same {!Trace_id.t}. *) + A Span is the workhorse of traces, it indicates an operation that took place + over a given span of time (indicated by start_time and end_time) as part of + a hierarchical trace. All spans in a given trace are bound by the use of the + same {!Trace_id.t}. *) module Span : sig open Proto.Trace @@ -1138,10 +1137,11 @@ module Span : sig string -> t * id (** [create ~trace_id name] creates a new span with its unique ID. - @param trace_id the trace this belongs to - @param parent parent span, if any - @param links list of links to other spans, each with their trace state - (see {{: https://www.w3.org/TR/trace-context/#tracestate-header} w3.org}) *) + @param trace_id the trace this belongs to + @param parent parent span, if any + @param links + list of links to other spans, each with their trace state (see + {{:https://www.w3.org/TR/trace-context/#tracestate-header} w3.org}) *) end = struct open Proto.Trace @@ -1184,7 +1184,9 @@ end (** Traces. - See {{: https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal} the spec} *) + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal} + the spec} *) module Trace = struct open Proto.Trace @@ -1201,11 +1203,11 @@ module Trace = struct (** Sync emitter. - This instructs the collector to forward - the spans to some backend at a later point. + This instructs the collector to forward the spans to some backend at a + later point. - {b NOTE} be careful not to call this inside a Gc alarm, as it can - cause deadlocks. *) + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. *) let emit ?service_name ?attrs (spans : span list) : unit = let rs = make_resource_spans ?service_name ?attrs spans in Collector.send_trace [ rs ] ~ret:(fun () -> ()) @@ -1294,12 +1296,13 @@ module Trace = struct scope in the ambient context, so that any logically-nested calls to {!with_} will use this span as their parent. - {b NOTE} be careful not to call this inside a Gc alarm, as it can - cause deadlocks. + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. - @param force_new_trace_id if true (default false), the span will not use a - ambient scope, the [~scope] argument, nor [~trace_id], but will instead - always create fresh identifiers for this span *) + @param force_new_trace_id + if true (default false), the span will not use a ambient scope, the + [~scope] argument, nor [~trace_id], but will instead always create fresh + identifiers for this span *) let with_ ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind ?trace_id ?parent ?scope ?links name (cb : Scope.t -> 'a) : 'a = @@ -1322,16 +1325,18 @@ end (** Metrics. - See {{: https://opentelemetry.io/docs/reference/specification/overview/#metric-signal} the spec} *) + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#metric-signal} + the spec} *) module Metrics = struct open Proto open Proto.Metrics type t = Metrics.metric (** A single metric, measuring some time-varying quantity or statistical - distribution. It is composed of one or more data points that have - precise values and time stamps. Each distinct metric should have a - distinct name. *) + distribution. It is composed of one or more data points that have precise + values and time stamps. Each distinct metric should have a distinct name. + *) open struct let _program_start = Timestamp_ns.now_unix_ns () @@ -1377,10 +1382,11 @@ module Metrics = struct (** Histogram data @param count number of values in population (non negative) @param sum sum of values in population (0 if count is 0) - @param bucket_counts count value of histogram for each bucket. Sum of - the counts must be equal to [count]. - length must be [1+length explicit_bounds] - @param explicit_bounds strictly increasing list of bounds for the buckets *) + @param bucket_counts + count value of histogram for each bucket. Sum of the counts must be + equal to [count]. length must be [1+length explicit_bounds] + @param explicit_bounds strictly increasing list of bounds for the buckets + *) let histogram_data_point ?(start_time_unix_nano = _program_start) ?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = []) ?(explicit_bounds = []) ?sum ~bucket_counts ~count () : @@ -1411,20 +1417,18 @@ module Metrics = struct let resource = Proto.Resource.default_resource ~attributes () in default_resource_metrics ~scope_metrics:[ lm ] ~resource:(Some resource) () - (** Emit some metrics to the collector (sync). This blocks until - the backend has pushed the metrics into some internal queue, or - discarded them. + (** Emit some metrics to the collector (sync). This blocks until the backend + has pushed the metrics into some internal queue, or discarded them. - {b NOTE} be careful not to call this inside a Gc alarm, as it can - cause deadlocks. - *) + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause + deadlocks. *) let emit ?attrs (l : t list) : unit = let rm = make_resource_metrics ?attrs l in Collector.send_metrics [ rm ] ~ret:ignore end -(** A set of callbacks that produce metrics when called. - The metrics are automatically called regularly. +(** A set of callbacks that produce metrics when called. The metrics are + automatically called regularly. This allows applications to register metrics callbacks from various points in the program (or even in libraries), and not worry about setting @@ -1436,13 +1440,13 @@ module Metrics_callbacks = struct (** [register f] adds the callback [f] to the list. - [f] will be called at unspecified times and is expected to return - a list of metrics. It might be called regularly by the backend, - in particular (but not only) when {!Collector.tick} is called. *) + [f] will be called at unspecified times and is expected to return a list + of metrics. It might be called regularly by the backend, in particular + (but not only) when {!Collector.tick} is called. *) let register f : unit = if !cbs_ = [] then (* make sure we call [f] (and others) at each tick *) - Collector.on_tick (fun () -> + Collector.on_tick (fun () -> let m = List.map (fun f -> f ()) !cbs_ |> List.flatten in Metrics.emit m); cbs_ := f :: !cbs_ @@ -1452,7 +1456,9 @@ end (** Logs. - See {{: https://opentelemetry.io/docs/reference/specification/overview/#log-signal} the spec} *) + See + {{:https://opentelemetry.io/docs/reference/specification/overview/#log-signal} + the spec} *) module Logs = struct open Opentelemetry_proto open Logs @@ -1527,10 +1533,9 @@ module Logs = struct (** Emit logs. - This instructs the collector to send the logs to some backend at - a later date. - {b NOTE} be careful not to call this inside a Gc alarm, as it can - cause deadlocks. *) + This instructs the collector to send the logs to some backend at a later + date. {b NOTE} be careful not to call this inside a Gc alarm, as it can + cause deadlocks. *) let emit ?service_name ?attrs (l : t list) : unit = let attributes = Globals.mk_attributes ?service_name ?attrs () in let resource = Proto.Resource.default_resource ~attributes () in @@ -1548,12 +1553,10 @@ end (** Implementation of the W3C Trace Context spec - https://www.w3.org/TR/trace-context/ -*) + https://www.w3.org/TR/trace-context/ *) module Trace_context = struct (** The traceparent header - https://www.w3.org/TR/trace-context/#traceparent-header - *) + https://www.w3.org/TR/trace-context/#traceparent-header *) module Traceparent = struct let name = "traceparent" @@ -1562,15 +1565,16 @@ module Trace_context = struct The values are of the form: {[ - {version}-{trace_id}-{parent_id}-{flags} + { version } - { trace_id } - { parent_id } - { flags } ]} For example: - {[ 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 ]} + {[ + 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 + ]} - [{flags}] are currently ignored. - *) + [{flags}] are currently ignored. *) let of_value str : (Trace_id.t * Span_id.t, string) result = match Span_ctx.of_w3c_trace_context (Bytes.unsafe_of_string str) with | Ok sp -> Ok (Span_ctx.trace_id sp, Span_ctx.parent_id sp) @@ -1588,8 +1592,8 @@ end These metrics are emitted after each GC collection. *) module GC_metrics : sig val basic_setup : unit -> unit - (** Setup a hook that will emit GC statistics on every tick (assuming - a ticker thread) *) + (** Setup a hook that will emit GC statistics on every tick (assuming a ticker + thread) *) val get_runtime_attributes : unit -> Span.key_value list (** Get OCaml name and version runtime attributes *) @@ -1597,7 +1601,9 @@ module GC_metrics : sig val get_metrics : unit -> Metrics.t list (** Get a few metrics from the current state of the GC *) end = struct - (** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *) + (** See + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes + *) let runtime_attributes = lazy Conventions.Attributes. From 5788492946d863b56c53a41f167d122781159781 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 17 Apr 2025 16:09:14 -0400 Subject: [PATCH 2/4] breaking: change `Collector.cleanup` so it takes a callback this callback can be used to resolve a Lwt future, for example, to make sure we indeed wait for the cleanup to be done before exiting. --- .../opentelemetry_client_cohttp_lwt.ml | 80 ++++++++----------- .../opentelemetry_client_cohttp_lwt.mli | 26 +++--- .../opentelemetry_client_ocurl.ml | 44 +++++----- .../opentelemetry_client_ocurl.mli | 20 ++--- src/core/opentelemetry.ml | 23 ++++-- 5 files changed, 100 insertions(+), 93 deletions(-) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml index f46050d8..bd78c983 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml @@ -9,8 +9,8 @@ open Opentelemetry include Common_ external reraise : exn -> 'a = "%reraise" -(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force - to use Lwt's latest version *) +(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to + use Lwt's latest version *) let needs_gc_metrics = Atomic.make false @@ -133,7 +133,8 @@ end = struct let bt = Printexc.get_backtrace () in Error (`Failure - (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt)) + (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) + bt)) in Lwt.return r ) else ( @@ -147,12 +148,12 @@ end = struct let bt = Printexc.get_backtrace () in Error (`Failure - (spf - "httpc: decoding of status (url=%S, code=%d) failed with:\n\ - %s\n\ - status: %S\n\ - %s" - url code (Printexc.to_string e) body bt)) + (spf + "httpc: decoding of status (url=%S, code=%d) failed with:\n\ + %s\n\ + status: %S\n\ + %s" + url code (Printexc.to_string e) body bt)) in Lwt.return r ) @@ -167,10 +168,10 @@ module Batch : sig val push' : 'a t -> 'a -> unit val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option - (** Is the batch ready to be emitted? If batching is disabled, - this is true as soon as {!is_empty} is false. If a timeout is provided - for this batch, then it will be ready if an element has been in it - for at least the timeout. + (** Is the batch ready to be emitted? If batching is disabled, this is true as + soon as {!is_empty} is false. If a timeout is provided for this batch, + then it will be ready if an element has been in it for at least the + timeout. @param now passed to implement timeout *) val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t @@ -255,15 +256,14 @@ module type EMITTER = sig val tick : unit -> unit - val cleanup : unit -> unit + val cleanup : on_done:(unit -> unit) -> unit -> unit end (* make an emitter. exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) -let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t) - () : (module EMITTER) = +let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in let open Lwt.Syntax in (* local helpers *) @@ -448,13 +448,12 @@ let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t) (* if called in a blocking context: work in the background *) let tick () = Lwt.async tick_ - let cleanup () = + let cleanup ~on_done () = if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!"; Lwt.async (fun () -> let* () = emit_all_force httpc encoder in Httpc.cleanup httpc; - (* resolve [after_cleanup], if provided *) - Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup; + on_done (); Lwt.return ()) end in (module M) @@ -464,13 +463,9 @@ module Backend val stop : bool Atomic.t val config : Config.t - - val after_cleanup : unit Lwt.u option end) () : Opentelemetry.Collector.BACKEND = struct - include - (val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop - ~config:Arg.config ()) + include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector @@ -562,8 +557,7 @@ module Backend } end -let create_backend ?after_cleanup ?(stop = Atomic.make false) - ?(config = Config.make ()) () = +let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = debug_ := config.debug; let module B = @@ -572,43 +566,37 @@ let create_backend ?after_cleanup ?(stop = Atomic.make false) let stop = stop let config = config - - let after_cleanup = after_cleanup end) () in (module B : OT.Collector.BACKEND) -let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t = - let cleanup_done, cleanup_done_prom = Lwt.wait () in - let backend = - create_backend ~after_cleanup:cleanup_done_prom ?stop ?config () - in +let setup_ ?stop ?config () : unit = + let backend = create_backend ?stop ?config () in OT.Collector.set_backend backend; - - OT.Collector.remove_backend, cleanup_done + () let setup ?stop ?config ?(enable = true) () = - if enable then ( - let cleanup, _lwt = setup_ ?stop ?config () in - at_exit cleanup - ) + if enable then setup_ ?stop ?config () + +let remove_backend () : unit Lwt.t = + let done_fut, done_u = Lwt.wait () in + OT.Collector.remove_backend ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); + done_fut let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t = - if enable then + if enable then ( let open Lwt.Syntax in - let cleanup, cleanup_done = setup_ ?stop ~config () in + setup_ ?stop ~config (); Lwt.catch (fun () -> let* res = f () in - cleanup (); - let+ () = cleanup_done in + let+ () = remove_backend () in res) (fun exn -> - cleanup (); - let* () = cleanup_done in + let* () = remove_backend () in reraise exn) - else + ) else f () diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index 37c60261..92f588c3 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -13,24 +13,29 @@ val set_headers : (string * string) list -> unit module Config = Config val create_backend : - ?after_cleanup:unit Lwt.u -> ?stop:bool Atomic.t -> ?config:Config.t -> unit -> (module Opentelemetry.Collector.BACKEND) (** Create a new backend using lwt and cohttp - @param after_cleanup if provided, this is resolved into [()] after cleanup is done (since 0.11) *) + @param after_cleanup + if provided, this is resolved into [()] after cleanup is done (since 0.11) +*) val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. - @param enable actually setup the backend (default true). This can - be used to enable/disable the setup depending on CLI arguments - or environment. + @param enable + actually setup the backend (default true). This can be used to + enable/disable the setup depending on CLI arguments or environment. @param config configuration to use - @param stop an atomic boolean. When it becomes true, background threads - will all stop after a little while. -*) + @param stop + an atomic boolean. When it becomes true, background threads will all stop + after a little while. *) + +val remove_backend : unit -> unit Lwt.t +(** Shutdown current backend + @since NEXT_RELEASE *) val with_setup : ?stop:bool Atomic.t -> @@ -39,6 +44,5 @@ val with_setup : unit -> (unit -> 'a Lwt.t) -> 'a Lwt.t -(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up - after [f()] returns - See {!setup} for more details. *) +(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after + [f()] returns See {!setup} for more details. *) diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index e0e89c92..5502388b 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -39,9 +39,9 @@ module Self_trace = struct ) end -(** capture current GC metrics if {!needs_gc_metrics} is true - or it has been a long time since the last GC metrics collection, - and push them into {!gc_metrics} for later collection *) +(** capture current GC metrics if {!needs_gc_metrics} is true or it has been a + long time since the last GC metrics collection, and push them into + {!gc_metrics} for later collection *) let sample_gc_metrics_if_needed () = let now = Mtime_clock.now () in let alarm = Atomic.exchange needs_gc_metrics false in @@ -102,7 +102,12 @@ let start_bg_thread (f : unit -> unit) : Thread.t = f () in (* no signals on Windows *) - let run () = if Sys.win32 then f () else unix_run () in + let run () = + if Sys.win32 then + f () + else + unix_run () + in Thread.create run () let str_to_hex (s : string) : string = @@ -128,7 +133,7 @@ module Backend_impl : sig val send_event : t -> Event.t -> unit - val shutdown : t -> unit + val shutdown : t -> on_done:(unit -> unit) -> unit end = struct open Opentelemetry.Proto @@ -250,8 +255,8 @@ end = struct let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev - (** Thread that, in a loop, reads from [q] to get the - next message to send via http *) + (** Thread that, in a loop, reads from [q] to get the next message to send via + http *) let bg_thread_loop (self : t) : unit = Ezcurl.with_client ?set_opts:None @@ fun client -> let stop = self.stop in @@ -379,7 +384,7 @@ end = struct self - let shutdown self : unit = + let shutdown self ~on_done : unit = Atomic.set self.stop true; if not (Atomic.exchange self.cleaned true) then ( (* empty batches *) @@ -392,7 +397,8 @@ end = struct (* close send queues, then wait for all threads *) B_queue.close self.send_q; Array.iter Thread.join self.send_threads - ) + ); + on_done () end let create_backend ?(stop = Atomic.make false) @@ -480,7 +486,7 @@ let create_backend ?(stop = Atomic.make false) Backend_impl.send_event backend Event.E_tick; List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_) - let cleanup () = Backend_impl.shutdown backend + let cleanup ~on_done () = Backend_impl.shutdown backend ~on_done end in (module M) @@ -498,7 +504,7 @@ let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () = start_bg_thread tick_loop let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () - = + : unit = let backend = create_backend ~stop ~config () in Opentelemetry.Collector.set_backend backend; @@ -508,18 +514,18 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) () (* at most a minute *) let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t) - ); - OT.Collector.remove_backend + ) + +let remove_backend () : unit = + (* we don't need the callback, this runs in the same thread *) + OT.Collector.remove_backend () ~on_done:ignore let setup ?stop ?config ?(enable = true) () = - if enable then ( - let cleanup = setup_ ?stop ?config () in - at_exit cleanup - ) + if enable then setup_ ?stop ?config () let with_setup ?stop ?config ?(enable = true) () f = if enable then ( - let cleanup = setup_ ?stop ?config () in - Fun.protect ~finally:cleanup f + setup_ ?stop ?config (); + Fun.protect ~finally:remove_backend f ) else f () diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index 8a963692..566adf2b 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -20,13 +20,16 @@ val create_backend : val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit (** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}. - @param enable actually setup the backend (default true). This can - be used to enable/disable the setup depending on CLI arguments - or environment. + @param enable + actually setup the backend (default true). This can be used to + enable/disable the setup depending on CLI arguments or environment. @param config configuration to use - @param stop an atomic boolean. When it becomes true, background threads - will all stop after a little while. -*) + @param stop + an atomic boolean. When it becomes true, background threads will all stop + after a little while. *) + +val remove_backend : unit -> unit +(** @since NEXT_RELEASE *) val with_setup : ?stop:bool Atomic.t -> @@ -35,6 +38,5 @@ val with_setup : unit -> (unit -> 'a) -> 'a -(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up - after [f()] returns - See {!setup} for more details. *) +(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after + [f()] returns See {!setup} for more details. *) diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index 640a67d2..5d80d1fa 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -89,7 +89,11 @@ module Collector = struct the collector's implementation, it might be called from a thread that is not the one that called [on_tick]. *) - val cleanup : unit -> unit + val cleanup : on_done:(unit -> unit) -> unit -> unit + (** [cleanup ~on_done ()] is called when the collector is shut down, and is + responsible for sending remaining batches, flushing sockets, etc. + @param on_done + callback invoked after the cleanup is done. since NEXT_RELEASE *) end type backend = (module BACKEND) @@ -110,7 +114,9 @@ module Collector = struct let set_on_tick_callbacks _cbs = () - let cleanup () = () + let cleanup ~on_done () = + on_done (); + () end module Debug_backend (B : BACKEND) : BACKEND = struct @@ -152,7 +158,7 @@ module Collector = struct let set_on_tick_callbacks cbs = B.set_on_tick_callbacks cbs - let cleanup () = B.cleanup () + let cleanup ~on_done () = B.cleanup ~on_done () end let debug_backend : backend = (module Debug_backend (Noop_backend)) @@ -171,13 +177,14 @@ module Collector = struct Atomic.set backend (Some b) (** Remove current backend, if any. - @since 0.11 *) - let remove_backend () : unit = + @since 0.11 + @param on_done see {!BACKEND.cleanup}, since NEXT_RELEASE *) + let remove_backend ~on_done () : unit = match Atomic.exchange backend None with | None -> () | Some (module B) -> B.tick (); - B.cleanup () + B.cleanup ~on_done () (** Is there a configured backend? *) let[@inline] has_backend () : bool = Atomic.get backend != None @@ -213,11 +220,11 @@ module Collector = struct | None -> () | Some (module B) -> B.tick () - let with_setup_debug_backend b ?(enable = true) () f = + let with_setup_debug_backend ?(on_done = ignore) b ?(enable = true) () f = let (module B : BACKEND) = b in if enable then ( set_backend b; - Fun.protect ~finally:B.cleanup f + Fun.protect ~finally:(B.cleanup ~on_done) f ) else f () end From 26691eca20fbcc2aa08f815fe74457cd4cd6f383 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 5 May 2025 14:36:31 -0400 Subject: [PATCH 3/4] remove obsolete comment --- src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index 92f588c3..ba45e3cc 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -18,8 +18,8 @@ val create_backend : unit -> (module Opentelemetry.Collector.BACKEND) (** Create a new backend using lwt and cohttp - @param after_cleanup - if provided, this is resolved into [()] after cleanup is done (since 0.11) + + NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE *) val setup : From 51af3a41056a0fc915d8ca58291a2787156dfe98 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 5 May 2025 14:41:20 -0400 Subject: [PATCH 4/4] format --- .../opentelemetry_ambient_context.mli | 39 +++++----- .../opentelemetry_ambient_context_types.mli | 16 ++-- src/atomic/atomic.post412.mli | 16 ++-- src/atomic/atomic.pre412.mli | 16 ++-- src/client-cohttp-lwt/config.mli | 78 ++++++++++--------- .../opentelemetry_client_cohttp_lwt.mli | 3 +- src/client-ocurl/b_queue.mli | 8 +- src/client-ocurl/config.mli | 62 ++++++++------- src/core/lock.mli | 8 +- src/core/rand_bytes.mli | 8 +- .../cohttp/opentelemetry_cohttp_lwt.ml | 32 ++++---- src/lwt/opentelemetry_lwt.ml | 4 +- src/trace/opentelemetry_trace.ml | 7 +- 13 files changed, 148 insertions(+), 149 deletions(-) diff --git a/src/ambient-context/opentelemetry_ambient_context.mli b/src/ambient-context/opentelemetry_ambient_context.mli index 4cb1cc51..8f19ff0c 100644 --- a/src/ambient-context/opentelemetry_ambient_context.mli +++ b/src/ambient-context/opentelemetry_ambient_context.mli @@ -2,23 +2,24 @@ The ambient context, like the Matrix, is everywhere around you. - It is responsible for keeping track of that context in a manner that's consistent with - the program's choice of control flow paradigm: + It is responsible for keeping track of that context in a manner that's + consistent with the program's choice of control flow paradigm: - - for synchronous/threaded/direct style code, {b TLS} ("thread local storage") keeps - track of a global variable per thread. Each thread has its own copy of the variable - and updates it independently of other threads. + - for synchronous/threaded/direct style code, {b TLS} ("thread local + storage") keeps track of a global variable per thread. Each thread has its + own copy of the variable and updates it independently of other threads. - - for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> …)] will - inherit the [k := v] assignment. + - for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> …)] + will inherit the [k := v] assignment. - - for Eio, fibers created inside [with_binding k v (fun () -> …)] will inherit the - [k := v] assignment. This is consistent with the structured concurrency approach of - Eio. + - for Eio, fibers created inside [with_binding k v (fun () -> …)] will + inherit the [k := v] assignment. This is consistent with the structured + concurrency approach of Eio. - The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map. Various - users (libraries, user code, etc.) can create their own {!key} to store what they are - interested in, without affecting other parts of the storage. *) + The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map. + Various users (libraries, user code, etc.) can create their own {!key} to + store what they are interested in, without affecting other parts of the + storage. *) module Types := Opentelemetry_ambient_context_types @@ -42,13 +43,13 @@ val create_key : unit -> 'a key (** Create a new fresh key, distinct from any previously created key. *) val get : 'a key -> 'a option -(** Get the current value for a given key, or [None] if no value was associated with the - key in the ambient context. *) +(** Get the current value for a given key, or [None] if no value was associated + with the key in the ambient context. *) val with_binding : 'a key -> 'a -> (unit -> 'r) -> 'r -(** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to [v]. This - does not affect storage outside of [cb()]. *) +(** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to + [v]. This does not affect storage outside of [cb()]. *) val without_binding : 'a key -> (unit -> 'b) -> 'b -(** [without_binding k cb] calls [cb()] in a context where [k] has no binding (possibly - shadowing the current ambient binding of [k] if it exists). *) +(** [without_binding k cb] calls [cb()] in a context where [k] has no binding + (possibly shadowing the current ambient binding of [k] if it exists). *) diff --git a/src/ambient-context/types/opentelemetry_ambient_context_types.mli b/src/ambient-context/types/opentelemetry_ambient_context_types.mli index cded6589..738b7520 100644 --- a/src/ambient-context/types/opentelemetry_ambient_context_types.mli +++ b/src/ambient-context/types/opentelemetry_ambient_context_types.mli @@ -1,7 +1,7 @@ (** Storage implementation. - There is a singleton storage for a given program, responsible for providing ambient - context to the rest of the program. *) + There is a singleton storage for a given program, responsible for providing + ambient context to the rest of the program. *) type 'a key = 'a Hmap.key @@ -10,15 +10,17 @@ module type STORAGE = sig (** Name of the storage implementation. *) val get_map : unit -> Hmap.t option - (** Get the hmap from the current ambient context, or [None] if there is no ambient - context. *) + (** Get the hmap from the current ambient context, or [None] if there is no + ambient context. *) val with_map : Hmap.t -> (unit -> 'b) -> 'b - (** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()] will return - [h]. Once [cb()] returns, the storage is reset to its previous value. *) + (** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()] + will return [h]. Once [cb()] returns, the storage is reset to its previous + value. *) val create_key : unit -> 'a key - (** Create a new storage key, guaranteed to be distinct from any previously created key. *) + (** Create a new storage key, guaranteed to be distinct from any previously + created key. *) val get : 'a key -> 'a option diff --git a/src/atomic/atomic.post412.mli b/src/atomic/atomic.post412.mli index 806271b2..7058f56d 100644 --- a/src/atomic/atomic.post412.mli +++ b/src/atomic/atomic.post412.mli @@ -15,8 +15,7 @@ (* *) (**************************************************************************) -(** Atomic references. -*) +(** Atomic references. *) type 'a t = 'a Stdlib.Atomic.t (** An atomic (mutable) reference to a value of type ['a]. *) @@ -34,15 +33,14 @@ val exchange : 'a t -> 'a -> 'a (** Set a new value for the atomic reference, and return the current value. *) val compare_and_set : 'a t -> 'a -> 'a -> bool -(** [compare_and_set r seen v] sets the new value of [r] to [v] only - if its current value is physically equal to [seen] -- the - comparison and the set occur atomically. Returns [true] if the - comparison succeeded (so the set happened) and [false] - otherwise. *) +(** [compare_and_set r seen v] sets the new value of [r] to [v] only if its + current value is physically equal to [seen] -- the comparison and the set + occur atomically. Returns [true] if the comparison succeeded (so the set + happened) and [false] otherwise. *) val fetch_and_add : int t -> int -> int -(** [fetch_and_add r n] atomically increments the value of [r] by [n], - and returns the current value (before the increment). *) +(** [fetch_and_add r n] atomically increments the value of [r] by [n], and + returns the current value (before the increment). *) val incr : int t -> unit (** [incr r] atomically increments the value of [r] by [1]. *) diff --git a/src/atomic/atomic.pre412.mli b/src/atomic/atomic.pre412.mli index f19bef29..231e6734 100644 --- a/src/atomic/atomic.pre412.mli +++ b/src/atomic/atomic.pre412.mli @@ -15,8 +15,7 @@ (* *) (**************************************************************************) -(** Atomic references. -*) +(** Atomic references. *) type 'a t (** An atomic (mutable) reference to a value of type ['a]. *) @@ -34,15 +33,14 @@ val exchange : 'a t -> 'a -> 'a (** Set a new value for the atomic reference, and return the current value. *) val compare_and_set : 'a t -> 'a -> 'a -> bool -(** [compare_and_set r seen v] sets the new value of [r] to [v] only - if its current value is physically equal to [seen] -- the - comparison and the set occur atomically. Returns [true] if the - comparison succeeded (so the set happened) and [false] - otherwise. *) +(** [compare_and_set r seen v] sets the new value of [r] to [v] only if its + current value is physically equal to [seen] -- the comparison and the set + occur atomically. Returns [true] if the comparison succeeded (so the set + happened) and [false] otherwise. *) val fetch_and_add : int t -> int -> int -(** [fetch_and_add r n] atomically increments the value of [r] by [n], - and returns the current value (before the increment). *) +(** [fetch_and_add r n] atomically increments the value of [r] by [n], and + returns the current value (before the increment). *) val incr : int t -> unit (** [incr r] atomically increments the value of [r] by [1]. *) diff --git a/src/client-cohttp-lwt/config.mli b/src/client-cohttp-lwt/config.mli index ecdc73f0..6312ae87 100644 --- a/src/client-cohttp-lwt/config.mli +++ b/src/client-cohttp-lwt/config.mli @@ -5,34 +5,30 @@ type t = private { url_logs: string; (** Url to send logs *) headers: (string * string) list; (** API headers sent to the endpoint. Default is none or - "OTEL_EXPORTER_OTLP_HEADERS" if set. *) + "OTEL_EXPORTER_OTLP_HEADERS" if set. *) batch_traces: int option; - (** Batch traces? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. + (** Batch traces? If [Some i], then this produces batches of (at most) [i] + items. If [None], there is no batching. - Note that traces and metrics are batched separately. - Default [Some 400]. - *) + Note that traces and metrics are batched separately. Default + [Some 400]. *) batch_metrics: int option; (** Batch metrics? If [Some i], then this produces batches of (at most) - [i] items. If [None], there is no batching. + [i] items. If [None], there is no batching. - Note that traces and metrics are batched separately. - Default [None]. - *) + Note that traces and metrics are batched separately. Default [None]. + *) batch_logs: int option; - (** Batch logs? See {!batch_metrics} for details. - Default [Some 400] *) + (** Batch logs? See {!batch_metrics} for details. Default [Some 400] *) batch_timeout_ms: int; (** Number of milliseconds after which we will emit a batch, even - incomplete. - Note that the batch might take longer than that, because this is - only checked when a new event occurs. Default 500. *) + incomplete. Note that the batch might take longer than that, because + this is only checked when a new event occurs. Default 500. *) } (** Configuration. - To build one, use {!make} below. This might be extended with more - fields in the future. *) + To build one, use {!make} below. This might be extended with more fields in + the future. *) val make : ?debug:bool -> @@ -49,28 +45,34 @@ val make : t (** Make a configuration. - @param thread if true and [bg_threads] is not provided, we will pick a number - of bg threads. Otherwise the number of [bg_threads] superseeds this option. + @param thread + if true and [bg_threads] is not provided, we will pick a number of bg + threads. Otherwise the number of [bg_threads] superseeds this option. - @param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url. - Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. - - Example of constructed per-signal urls with the base url http://localhost:4318 - - Traces: http://localhost:4318/v1/traces - - Metrics: http://localhost:4318/v1/metrics - - Logs: http://localhost:4318/v1/logs - - Use per-signal url options if different urls are needed for each signal type. - - @param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. - The url is used as-is without any modification. - - @param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. - The url is used as-is without any modification. - - @param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. - The url is used as-is without any modification. + @param url + base url used to construct per-signal urls. Per-signal url options take + precedence over this base url. Default is "http://localhost:4318", or + "OTEL_EXPORTER_OTLP_ENDPOINT" if set. - *) + Example of constructed per-signal urls with the base url + http://localhost:4318 + - Traces: http://localhost:4318/v1/traces + - Metrics: http://localhost:4318/v1/metrics + - Logs: http://localhost:4318/v1/logs + + Use per-signal url options if different urls are needed for each signal + type. + + @param url_traces + url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The + url is used as-is without any modification. + + @param url_metrics + url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The + url is used as-is without any modification. + + @param url_logs + url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is + used as-is without any modification. *) val pp : Format.formatter -> t -> unit diff --git a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli index ba45e3cc..675dbd52 100644 --- a/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli +++ b/src/client-cohttp-lwt/opentelemetry_client_cohttp_lwt.mli @@ -19,8 +19,7 @@ val create_backend : (module Opentelemetry.Collector.BACKEND) (** Create a new backend using lwt and cohttp - NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE -*) + NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE *) val setup : ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit diff --git a/src/client-ocurl/b_queue.mli b/src/client-ocurl/b_queue.mli index 73bf3edc..d020dfb3 100644 --- a/src/client-ocurl/b_queue.mli +++ b/src/client-ocurl/b_queue.mli @@ -12,12 +12,12 @@ val push : 'a t -> 'a -> unit val pop : 'a t -> 'a (** [pop q] pops the next element in [q]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) + @raise Closed if the queue was closed before a new element was available. *) val pop_all : 'a t -> 'a Queue.t -> unit -(** [pop_all q into] pops all the elements of [q] - and moves them into [into]. It might block until an element comes. - @raise Closed if the queue was closed before a new element was available. *) +(** [pop_all q into] pops all the elements of [q] and moves them into [into]. It + might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) val close : _ t -> unit (** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/client-ocurl/config.mli b/src/client-ocurl/config.mli index 7d2a5514..dda875b1 100644 --- a/src/client-ocurl/config.mli +++ b/src/client-ocurl/config.mli @@ -7,34 +7,32 @@ type t = private { url_logs: string; (** Url to send logs *) headers: (string * string) list; (** API headers sent to the endpoint. Default is none or - "OTEL_EXPORTER_OTLP_HEADERS" if set. *) + "OTEL_EXPORTER_OTLP_HEADERS" if set. *) batch_timeout_ms: int; (** Number of milliseconds after which we will emit a batch, even - incomplete. - Note that the batch might take longer than that, because this is - only checked when a new event occurs or when a tick - is emitted. Default 2_000. *) + incomplete. Note that the batch might take longer than that, because + this is only checked when a new event occurs or when a tick is + emitted. Default 2_000. *) bg_threads: int; - (** Are there background threads, and how many? Default [4]. - This will be adjusted to be at least [1] and at most [32]. *) + (** Are there background threads, and how many? Default [4]. This will be + adjusted to be at least [1] and at most [32]. *) ticker_thread: bool; - (** If true, start a thread that regularly checks if signals should - be sent to the collector. Default [true] *) + (** If true, start a thread that regularly checks if signals should be + sent to the collector. Default [true] *) ticker_interval_ms: int; - (** Interval for ticker thread, in milliseconds. This is - only useful if [ticker_thread] is [true]. - This will be clamped between [2 ms] and some longer - interval (maximum [60s] currently). - Default 500. - @since 0.7 *) + (** Interval for ticker thread, in milliseconds. This is only useful if + [ticker_thread] is [true]. This will be clamped between [2 ms] and + some longer interval (maximum [60s] currently). Default 500. + @since 0.7 *) self_trace: bool; - (** If true, the OTEL library will also emit its own spans. Default [false]. + (** If true, the OTEL library will also emit its own spans. Default + [false]. @since 0.7 *) } (** Configuration. - To build one, use {!make} below. This might be extended with more - fields in the future. *) + To build one, use {!make} below. This might be extended with more fields in + the future. *) val make : ?debug:bool -> @@ -52,24 +50,30 @@ val make : t (** Make a configuration. - @param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url. - Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. + @param url + base url used to construct per-signal urls. Per-signal url options take + precedence over this base url. Default is "http://localhost:4318", or + "OTEL_EXPORTER_OTLP_ENDPOINT" if set. - Example of constructed per-signal urls with the base url http://localhost:4318 + Example of constructed per-signal urls with the base url + http://localhost:4318 - Traces: http://localhost:4318/v1/traces - Metrics: http://localhost:4318/v1/metrics - Logs: http://localhost:4318/v1/logs - Use per-signal url options if different urls are needed for each signal type. + Use per-signal url options if different urls are needed for each signal + type. - @param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. - The url is used as-is without any modification. + @param url_traces + url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The + url is used as-is without any modification. - @param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. - The url is used as-is without any modification. + @param url_metrics + url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The + url is used as-is without any modification. - @param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. - The url is used as-is without any modification. - *) + @param url_logs + url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is + used as-is without any modification. *) val pp : Format.formatter -> t -> unit diff --git a/src/core/lock.mli b/src/core/lock.mli index 1b6c8c4b..6ef1946e 100644 --- a/src/core/lock.mli +++ b/src/core/lock.mli @@ -1,7 +1,7 @@ val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit -(** Set a pair of lock/unlock functions that are used to - protect access to global state, if needed. By default these do nothing. *) +(** Set a pair of lock/unlock functions that are used to protect access to + global state, if needed. By default these do nothing. *) val with_lock : (unit -> 'a) -> 'a -(** Call [f()] while holding the mutex defined {!set_mutex}, then - release the mutex. *) +(** Call [f()] while holding the mutex defined {!set_mutex}, then release the + mutex. *) diff --git a/src/core/rand_bytes.mli b/src/core/rand_bytes.mli index bea76349..7c42ea35 100644 --- a/src/core/rand_bytes.mli +++ b/src/core/rand_bytes.mli @@ -3,12 +3,12 @@ We need random identifiers for trace IDs and span IDs. *) val rand_bytes_16 : (unit -> bytes) ref -(** Generate 16 bytes of random data. - The implementation can be swapped to use any random generator. *) +(** Generate 16 bytes of random data. The implementation can be swapped to use + any random generator. *) val rand_bytes_8 : (unit -> bytes) ref -(** Generate 16 bytes of random data. - The implementation can be swapped to use any random generator. *) +(** Generate 16 bytes of random data. The implementation can be swapped to use + any random generator. *) val default_rand_bytes_8 : unit -> bytes (** Default implementation using {!Random} *) diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index 12661022..fd2ff1a4 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -17,17 +17,15 @@ module Server : sig Use it like this: {[ - let my_server callback = - let callback_traced = - Opentelemetry_cohttp_lwt.Server.trace - ~service_name:"my-service" - (fun _scope -> callback) - in - Cohttp_lwt_unix.Server.create - ~mode:(`TCP (`Port 8080)) - (Server.make () ~callback:callback_traced) - ]} - *) + let my_server callback = + let callback_traced = + Opentelemetry_cohttp_lwt.Server.trace ~service_name:"my-service" + (fun _scope -> callback) + in + Cohttp_lwt_unix.Server.create + ~mode:(`TCP (`Port 8080)) + (Server.make () ~callback:callback_traced) + ]} *) val with_ : ?trace_state:string -> @@ -43,24 +41,20 @@ module Server : sig Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace scope in the [x-ocaml-otel-traceparent] header in the request for - convenience. - *) + convenience. *) val get_trace_context : ?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header - added by [trace] and [with_]. - *) + added by [trace] and [with_]. *) val set_trace_context : Otel.Scope.t -> Request.t -> Request.t (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used - by [trace] and [with_]. - *) + by [trace] and [with_]. *) val remove_trace_context : Request.t -> Request.t (** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and - [with_]. - *) + [with_]. *) end = struct let attrs_of_request (req : Request.t) = let meth = req |> Request.meth |> Code.string_of_method in diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index e6d53ed8..7b443c20 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -12,8 +12,8 @@ module Metrics_callbacks = Metrics_callbacks module Trace_context = Trace_context external reraise : exn -> 'a = "%reraise" -(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force - to use Lwt's latest version *) +(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to + use Lwt's latest version *) module Trace = struct include Trace diff --git a/src/trace/opentelemetry_trace.ml b/src/trace/opentelemetry_trace.ml index dd423d84..0126257b 100644 --- a/src/trace/opentelemetry_trace.ml +++ b/src/trace/opentelemetry_trace.ml @@ -144,8 +144,8 @@ module Internal = struct assert (Bytes.length bs = 8); Bytes.get_int64_le bs 0 - let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name - = + let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option) + ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name = let open Otel in let otel_id = Span_id.create () in let otrace_id = otrace_of_otel otel_id in @@ -159,7 +159,8 @@ module Internal = struct let parent = match explicit_parent, parent_scope with | Some p, _ -> - Some (Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ()) + Some + (Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ()) | None, Some parent -> Some (Otel.Scope.to_span_ctx parent) | None, None -> None in