This commit is contained in:
Simon Cruanes 2025-05-05 14:41:20 -04:00
parent 26691eca20
commit 51af3a4105
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
13 changed files with 148 additions and 149 deletions

View file

@ -2,23 +2,24 @@
The ambient context, like the Matrix, is everywhere around you. The ambient context, like the Matrix, is everywhere around you.
It is responsible for keeping track of that context in a manner that's consistent with It is responsible for keeping track of that context in a manner that's
the program's choice of control flow paradigm: consistent with the program's choice of control flow paradigm:
- for synchronous/threaded/direct style code, {b TLS} ("thread local storage") keeps - for synchronous/threaded/direct style code, {b TLS} ("thread local
track of a global variable per thread. Each thread has its own copy of the variable storage") keeps track of a global variable per thread. Each thread has its
and updates it independently of other threads. own copy of the variable and updates it independently of other threads.
- for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> )] will - for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> )]
inherit the [k := v] assignment. will inherit the [k := v] assignment.
- for Eio, fibers created inside [with_binding k v (fun () -> )] will inherit the - for Eio, fibers created inside [with_binding k v (fun () -> )] will
[k := v] assignment. This is consistent with the structured concurrency approach of inherit the [k := v] assignment. This is consistent with the structured
Eio. concurrency approach of Eio.
The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map. Various The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map.
users (libraries, user code, etc.) can create their own {!key} to store what they are Various users (libraries, user code, etc.) can create their own {!key} to
interested in, without affecting other parts of the storage. *) store what they are interested in, without affecting other parts of the
storage. *)
module Types := Opentelemetry_ambient_context_types module Types := Opentelemetry_ambient_context_types
@ -42,13 +43,13 @@ val create_key : unit -> 'a key
(** Create a new fresh key, distinct from any previously created key. *) (** Create a new fresh key, distinct from any previously created key. *)
val get : 'a key -> 'a option val get : 'a key -> 'a option
(** Get the current value for a given key, or [None] if no value was associated with the (** Get the current value for a given key, or [None] if no value was associated
key in the ambient context. *) with the key in the ambient context. *)
val with_binding : 'a key -> 'a -> (unit -> 'r) -> 'r val with_binding : 'a key -> 'a -> (unit -> 'r) -> 'r
(** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to [v]. This (** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to
does not affect storage outside of [cb()]. *) [v]. This does not affect storage outside of [cb()]. *)
val without_binding : 'a key -> (unit -> 'b) -> 'b val without_binding : 'a key -> (unit -> 'b) -> 'b
(** [without_binding k cb] calls [cb()] in a context where [k] has no binding (possibly (** [without_binding k cb] calls [cb()] in a context where [k] has no binding
shadowing the current ambient binding of [k] if it exists). *) (possibly shadowing the current ambient binding of [k] if it exists). *)

View file

@ -1,7 +1,7 @@
(** Storage implementation. (** Storage implementation.
There is a singleton storage for a given program, responsible for providing ambient There is a singleton storage for a given program, responsible for providing
context to the rest of the program. *) ambient context to the rest of the program. *)
type 'a key = 'a Hmap.key type 'a key = 'a Hmap.key
@ -10,15 +10,17 @@ module type STORAGE = sig
(** Name of the storage implementation. *) (** Name of the storage implementation. *)
val get_map : unit -> Hmap.t option val get_map : unit -> Hmap.t option
(** Get the hmap from the current ambient context, or [None] if there is no ambient (** Get the hmap from the current ambient context, or [None] if there is no
context. *) ambient context. *)
val with_map : Hmap.t -> (unit -> 'b) -> 'b val with_map : Hmap.t -> (unit -> 'b) -> 'b
(** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()] will return (** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()]
[h]. Once [cb()] returns, the storage is reset to its previous value. *) will return [h]. Once [cb()] returns, the storage is reset to its previous
value. *)
val create_key : unit -> 'a key val create_key : unit -> 'a key
(** Create a new storage key, guaranteed to be distinct from any previously created key. *) (** Create a new storage key, guaranteed to be distinct from any previously
created key. *)
val get : 'a key -> 'a option val get : 'a key -> 'a option

View file

@ -15,8 +15,7 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** Atomic references. (** Atomic references. *)
*)
type 'a t = 'a Stdlib.Atomic.t type 'a t = 'a Stdlib.Atomic.t
(** An atomic (mutable) reference to a value of type ['a]. *) (** An atomic (mutable) reference to a value of type ['a]. *)
@ -34,15 +33,14 @@ val exchange : 'a t -> 'a -> 'a
(** Set a new value for the atomic reference, and return the current value. *) (** Set a new value for the atomic reference, and return the current value. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r seen v] sets the new value of [r] to [v] only (** [compare_and_set r seen v] sets the new value of [r] to [v] only if its
if its current value is physically equal to [seen] -- the current value is physically equal to [seen] -- the comparison and the set
comparison and the set occur atomically. Returns [true] if the occur atomically. Returns [true] if the comparison succeeded (so the set
comparison succeeded (so the set happened) and [false] happened) and [false] otherwise. *)
otherwise. *)
val fetch_and_add : int t -> int -> int val fetch_and_add : int t -> int -> int
(** [fetch_and_add r n] atomically increments the value of [r] by [n], (** [fetch_and_add r n] atomically increments the value of [r] by [n], and
and returns the current value (before the increment). *) returns the current value (before the increment). *)
val incr : int t -> unit val incr : int t -> unit
(** [incr r] atomically increments the value of [r] by [1]. *) (** [incr r] atomically increments the value of [r] by [1]. *)

View file

@ -15,8 +15,7 @@
(* *) (* *)
(**************************************************************************) (**************************************************************************)
(** Atomic references. (** Atomic references. *)
*)
type 'a t type 'a t
(** An atomic (mutable) reference to a value of type ['a]. *) (** An atomic (mutable) reference to a value of type ['a]. *)
@ -34,15 +33,14 @@ val exchange : 'a t -> 'a -> 'a
(** Set a new value for the atomic reference, and return the current value. *) (** Set a new value for the atomic reference, and return the current value. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r seen v] sets the new value of [r] to [v] only (** [compare_and_set r seen v] sets the new value of [r] to [v] only if its
if its current value is physically equal to [seen] -- the current value is physically equal to [seen] -- the comparison and the set
comparison and the set occur atomically. Returns [true] if the occur atomically. Returns [true] if the comparison succeeded (so the set
comparison succeeded (so the set happened) and [false] happened) and [false] otherwise. *)
otherwise. *)
val fetch_and_add : int t -> int -> int val fetch_and_add : int t -> int -> int
(** [fetch_and_add r n] atomically increments the value of [r] by [n], (** [fetch_and_add r n] atomically increments the value of [r] by [n], and
and returns the current value (before the increment). *) returns the current value (before the increment). *)
val incr : int t -> unit val incr : int t -> unit
(** [incr r] atomically increments the value of [r] by [1]. *) (** [incr r] atomically increments the value of [r] by [1]. *)

View file

@ -5,34 +5,30 @@ type t = private {
url_logs: string; (** Url to send logs *) url_logs: string; (** Url to send logs *)
headers: (string * string) list; headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or (** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *) "OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option; batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most) (** Batch traces? If [Some i], then this produces batches of (at most) [i]
[i] items. If [None], there is no batching. items. If [None], there is no batching.
Note that traces and metrics are batched separately. Note that traces and metrics are batched separately. Default
Default [Some 400]. [Some 400]. *)
*)
batch_metrics: int option; batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most) (** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching. [i] items. If [None], there is no batching.
Note that traces and metrics are batched separately. Note that traces and metrics are batched separately. Default [None].
Default [None]. *)
*)
batch_logs: int option; batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details. (** Batch logs? See {!batch_metrics} for details. Default [Some 400] *)
Default [Some 400] *)
batch_timeout_ms: int; batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even (** Number of milliseconds after which we will emit a batch, even
incomplete. incomplete. Note that the batch might take longer than that, because
Note that the batch might take longer than that, because this is this is only checked when a new event occurs. Default 500. *)
only checked when a new event occurs. Default 500. *)
} }
(** Configuration. (** Configuration.
To build one, use {!make} below. This might be extended with more To build one, use {!make} below. This might be extended with more fields in
fields in the future. *) the future. *)
val make : val make :
?debug:bool -> ?debug:bool ->
@ -49,28 +45,34 @@ val make :
t t
(** Make a configuration. (** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number @param thread
of bg threads. Otherwise the number of [bg_threads] superseeds this option. if true and [bg_threads] is not provided, we will pick a number of bg
threads. Otherwise the number of [bg_threads] superseeds this option.
@param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url. @param url
Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url http://localhost:4318 Example of constructed per-signal urls with the base url
- Traces: http://localhost:4318/v1/traces http://localhost:4318
- Metrics: http://localhost:4318/v1/metrics - Traces: http://localhost:4318/v1/traces
- Logs: http://localhost:4318/v1/logs - Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal type. Use per-signal url options if different urls are needed for each signal
type.
@param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. @param url_traces
The url is used as-is without any modification. url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. @param url_metrics
The url is used as-is without any modification. url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. @param url_logs
The url is used as-is without any modification. url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is
used as-is without any modification. *)
*)
val pp : Format.formatter -> t -> unit val pp : Format.formatter -> t -> unit

View file

@ -19,8 +19,7 @@ val create_backend :
(module Opentelemetry.Collector.BACKEND) (module Opentelemetry.Collector.BACKEND)
(** Create a new backend using lwt and cohttp (** Create a new backend using lwt and cohttp
NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE *)
*)
val setup : val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit ?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit

View file

@ -12,12 +12,12 @@ val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes. (** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *) @raise Closed if the queue was closed before a new element was available. *)
val pop_all : 'a t -> 'a Queue.t -> unit val pop_all : 'a t -> 'a Queue.t -> unit
(** [pop_all q into] pops all the elements of [q] (** [pop_all q into] pops all the elements of [q] and moves them into [into]. It
and moves them into [into]. It might block until an element comes. might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *) @raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *) (** Close the queue, meaning there won't be any more [push] allowed. *)

View file

@ -7,34 +7,32 @@ type t = private {
url_logs: string; (** Url to send logs *) url_logs: string; (** Url to send logs *)
headers: (string * string) list; headers: (string * string) list;
(** API headers sent to the endpoint. Default is none or (** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *) "OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_timeout_ms: int; batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even (** Number of milliseconds after which we will emit a batch, even
incomplete. incomplete. Note that the batch might take longer than that, because
Note that the batch might take longer than that, because this is this is only checked when a new event occurs or when a tick is
only checked when a new event occurs or when a tick emitted. Default 2_000. *)
is emitted. Default 2_000. *)
bg_threads: int; bg_threads: int;
(** Are there background threads, and how many? Default [4]. (** Are there background threads, and how many? Default [4]. This will be
This will be adjusted to be at least [1] and at most [32]. *) adjusted to be at least [1] and at most [32]. *)
ticker_thread: bool; ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should (** If true, start a thread that regularly checks if signals should be
be sent to the collector. Default [true] *) sent to the collector. Default [true] *)
ticker_interval_ms: int; ticker_interval_ms: int;
(** Interval for ticker thread, in milliseconds. This is (** Interval for ticker thread, in milliseconds. This is only useful if
only useful if [ticker_thread] is [true]. [ticker_thread] is [true]. This will be clamped between [2 ms] and
This will be clamped between [2 ms] and some longer some longer interval (maximum [60s] currently). Default 500.
interval (maximum [60s] currently). @since 0.7 *)
Default 500.
@since 0.7 *)
self_trace: bool; self_trace: bool;
(** If true, the OTEL library will also emit its own spans. Default [false]. (** If true, the OTEL library will also emit its own spans. Default
[false].
@since 0.7 *) @since 0.7 *)
} }
(** Configuration. (** Configuration.
To build one, use {!make} below. This might be extended with more To build one, use {!make} below. This might be extended with more fields in
fields in the future. *) the future. *)
val make : val make :
?debug:bool -> ?debug:bool ->
@ -52,24 +50,30 @@ val make :
t t
(** Make a configuration. (** Make a configuration.
@param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url. @param url
Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set. base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url http://localhost:4318 Example of constructed per-signal urls with the base url
http://localhost:4318
- Traces: http://localhost:4318/v1/traces - Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics - Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs - Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal type. Use per-signal url options if different urls are needed for each signal
type.
@param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. @param url_traces
The url is used as-is without any modification. url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. @param url_metrics
The url is used as-is without any modification. url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. @param url_logs
The url is used as-is without any modification. url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is
*) used as-is without any modification. *)
val pp : Format.formatter -> t -> unit val pp : Format.formatter -> t -> unit

View file

@ -1,7 +1,7 @@
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
(** Set a pair of lock/unlock functions that are used to (** Set a pair of lock/unlock functions that are used to protect access to
protect access to global state, if needed. By default these do nothing. *) global state, if needed. By default these do nothing. *)
val with_lock : (unit -> 'a) -> 'a val with_lock : (unit -> 'a) -> 'a
(** Call [f()] while holding the mutex defined {!set_mutex}, then (** Call [f()] while holding the mutex defined {!set_mutex}, then release the
release the mutex. *) mutex. *)

View file

@ -3,12 +3,12 @@
We need random identifiers for trace IDs and span IDs. *) We need random identifiers for trace IDs and span IDs. *)
val rand_bytes_16 : (unit -> bytes) ref val rand_bytes_16 : (unit -> bytes) ref
(** Generate 16 bytes of random data. (** Generate 16 bytes of random data. The implementation can be swapped to use
The implementation can be swapped to use any random generator. *) any random generator. *)
val rand_bytes_8 : (unit -> bytes) ref val rand_bytes_8 : (unit -> bytes) ref
(** Generate 16 bytes of random data. (** Generate 16 bytes of random data. The implementation can be swapped to use
The implementation can be swapped to use any random generator. *) any random generator. *)
val default_rand_bytes_8 : unit -> bytes val default_rand_bytes_8 : unit -> bytes
(** Default implementation using {!Random} *) (** Default implementation using {!Random} *)

View file

@ -17,17 +17,15 @@ module Server : sig
Use it like this: Use it like this:
{[ {[
let my_server callback = let my_server callback =
let callback_traced = let callback_traced =
Opentelemetry_cohttp_lwt.Server.trace Opentelemetry_cohttp_lwt.Server.trace ~service_name:"my-service"
~service_name:"my-service" (fun _scope -> callback)
(fun _scope -> callback) in
in Cohttp_lwt_unix.Server.create
Cohttp_lwt_unix.Server.create ~mode:(`TCP (`Port 8080))
~mode:(`TCP (`Port 8080)) (Server.make () ~callback:callback_traced)
(Server.make () ~callback:callback_traced) ]} *)
]}
*)
val with_ : val with_ :
?trace_state:string -> ?trace_state:string ->
@ -43,24 +41,20 @@ module Server : sig
Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace
scope in the [x-ocaml-otel-traceparent] header in the request for scope in the [x-ocaml-otel-traceparent] header in the request for
convenience. convenience. *)
*)
val get_trace_context : val get_trace_context :
?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option ?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option
(** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header
added by [trace] and [with_]. added by [trace] and [with_]. *)
*)
val set_trace_context : Otel.Scope.t -> Request.t -> Request.t val set_trace_context : Otel.Scope.t -> Request.t -> Request.t
(** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used
by [trace] and [with_]. by [trace] and [with_]. *)
*)
val remove_trace_context : Request.t -> Request.t val remove_trace_context : Request.t -> Request.t
(** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and (** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and
[with_]. [with_]. *)
*)
end = struct end = struct
let attrs_of_request (req : Request.t) = let attrs_of_request (req : Request.t) =
let meth = req |> Request.meth |> Code.string_of_method in let meth = req |> Request.meth |> Code.string_of_method in

View file

@ -12,8 +12,8 @@ module Metrics_callbacks = Metrics_callbacks
module Trace_context = Trace_context module Trace_context = Trace_context
external reraise : exn -> 'a = "%reraise" external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
to use Lwt's latest version *) use Lwt's latest version *)
module Trace = struct module Trace = struct
include Trace include Trace

View file

@ -144,8 +144,8 @@ module Internal = struct
assert (Bytes.length bs = 8); assert (Bytes.length bs = 8);
Bytes.get_int64_le bs 0 Bytes.get_int64_le bs 0
let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option)
= ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name =
let open Otel in let open Otel in
let otel_id = Span_id.create () in let otel_id = Span_id.create () in
let otrace_id = otrace_of_otel otel_id in let otrace_id = otrace_of_otel otel_id in
@ -159,7 +159,8 @@ module Internal = struct
let parent = let parent =
match explicit_parent, parent_scope with match explicit_parent, parent_scope with
| Some p, _ -> | Some p, _ ->
Some (Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ()) Some
(Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ())
| None, Some parent -> Some (Otel.Scope.to_span_ctx parent) | None, Some parent -> Some (Otel.Scope.to_span_ctx parent)
| None, None -> None | None, None -> None
in in