Merge pull request #86 from imandra-ai/simon/cps-based-collector-cleanup-2025-04-17

CPS-based collector for cleanup
This commit is contained in:
Simon Cruanes 2025-05-05 14:41:47 -04:00 committed by GitHub
commit 92de45a2ec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 363 additions and 351 deletions

View file

@ -2,23 +2,24 @@
The ambient context, like the Matrix, is everywhere around you.
It is responsible for keeping track of that context in a manner that's consistent with
the program's choice of control flow paradigm:
It is responsible for keeping track of that context in a manner that's
consistent with the program's choice of control flow paradigm:
- for synchronous/threaded/direct style code, {b TLS} ("thread local storage") keeps
track of a global variable per thread. Each thread has its own copy of the variable
and updates it independently of other threads.
- for synchronous/threaded/direct style code, {b TLS} ("thread local
storage") keeps track of a global variable per thread. Each thread has its
own copy of the variable and updates it independently of other threads.
- for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> )] will
inherit the [k := v] assignment.
- for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> )]
will inherit the [k := v] assignment.
- for Eio, fibers created inside [with_binding k v (fun () -> )] will inherit the
[k := v] assignment. This is consistent with the structured concurrency approach of
Eio.
- for Eio, fibers created inside [with_binding k v (fun () -> )] will
inherit the [k := v] assignment. This is consistent with the structured
concurrency approach of Eio.
The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map. Various
users (libraries, user code, etc.) can create their own {!key} to store what they are
interested in, without affecting other parts of the storage. *)
The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map.
Various users (libraries, user code, etc.) can create their own {!key} to
store what they are interested in, without affecting other parts of the
storage. *)
module Types := Opentelemetry_ambient_context_types
@ -42,13 +43,13 @@ val create_key : unit -> 'a key
(** Create a new fresh key, distinct from any previously created key. *)
val get : 'a key -> 'a option
(** Get the current value for a given key, or [None] if no value was associated with the
key in the ambient context. *)
(** Get the current value for a given key, or [None] if no value was associated
with the key in the ambient context. *)
val with_binding : 'a key -> 'a -> (unit -> 'r) -> 'r
(** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to [v]. This
does not affect storage outside of [cb()]. *)
(** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to
[v]. This does not affect storage outside of [cb()]. *)
val without_binding : 'a key -> (unit -> 'b) -> 'b
(** [without_binding k cb] calls [cb()] in a context where [k] has no binding (possibly
shadowing the current ambient binding of [k] if it exists). *)
(** [without_binding k cb] calls [cb()] in a context where [k] has no binding
(possibly shadowing the current ambient binding of [k] if it exists). *)

View file

@ -1,7 +1,7 @@
(** Storage implementation.
There is a singleton storage for a given program, responsible for providing ambient
context to the rest of the program. *)
There is a singleton storage for a given program, responsible for providing
ambient context to the rest of the program. *)
type 'a key = 'a Hmap.key
@ -10,15 +10,17 @@ module type STORAGE = sig
(** Name of the storage implementation. *)
val get_map : unit -> Hmap.t option
(** Get the hmap from the current ambient context, or [None] if there is no ambient
context. *)
(** Get the hmap from the current ambient context, or [None] if there is no
ambient context. *)
val with_map : Hmap.t -> (unit -> 'b) -> 'b
(** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()] will return
[h]. Once [cb()] returns, the storage is reset to its previous value. *)
(** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()]
will return [h]. Once [cb()] returns, the storage is reset to its previous
value. *)
val create_key : unit -> 'a key
(** Create a new storage key, guaranteed to be distinct from any previously created key. *)
(** Create a new storage key, guaranteed to be distinct from any previously
created key. *)
val get : 'a key -> 'a option

View file

@ -15,8 +15,7 @@
(* *)
(**************************************************************************)
(** Atomic references.
*)
(** Atomic references. *)
type 'a t = 'a Stdlib.Atomic.t
(** An atomic (mutable) reference to a value of type ['a]. *)
@ -34,15 +33,14 @@ val exchange : 'a t -> 'a -> 'a
(** Set a new value for the atomic reference, and return the current value. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r seen v] sets the new value of [r] to [v] only
if its current value is physically equal to [seen] -- the
comparison and the set occur atomically. Returns [true] if the
comparison succeeded (so the set happened) and [false]
otherwise. *)
(** [compare_and_set r seen v] sets the new value of [r] to [v] only if its
current value is physically equal to [seen] -- the comparison and the set
occur atomically. Returns [true] if the comparison succeeded (so the set
happened) and [false] otherwise. *)
val fetch_and_add : int t -> int -> int
(** [fetch_and_add r n] atomically increments the value of [r] by [n],
and returns the current value (before the increment). *)
(** [fetch_and_add r n] atomically increments the value of [r] by [n], and
returns the current value (before the increment). *)
val incr : int t -> unit
(** [incr r] atomically increments the value of [r] by [1]. *)

View file

@ -15,8 +15,7 @@
(* *)
(**************************************************************************)
(** Atomic references.
*)
(** Atomic references. *)
type 'a t
(** An atomic (mutable) reference to a value of type ['a]. *)
@ -34,15 +33,14 @@ val exchange : 'a t -> 'a -> 'a
(** Set a new value for the atomic reference, and return the current value. *)
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r seen v] sets the new value of [r] to [v] only
if its current value is physically equal to [seen] -- the
comparison and the set occur atomically. Returns [true] if the
comparison succeeded (so the set happened) and [false]
otherwise. *)
(** [compare_and_set r seen v] sets the new value of [r] to [v] only if its
current value is physically equal to [seen] -- the comparison and the set
occur atomically. Returns [true] if the comparison succeeded (so the set
happened) and [false] otherwise. *)
val fetch_and_add : int t -> int -> int
(** [fetch_and_add r n] atomically increments the value of [r] by [n],
and returns the current value (before the increment). *)
(** [fetch_and_add r n] atomically increments the value of [r] by [n], and
returns the current value (before the increment). *)
val incr : int t -> unit
(** [incr r] atomically increments the value of [r] by [1]. *)

View file

@ -7,32 +7,28 @@ type t = private {
(** API headers sent to the endpoint. Default is none or
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_traces: int option;
(** Batch traces? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
(** Batch traces? If [Some i], then this produces batches of (at most) [i]
items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [Some 400].
*)
Note that traces and metrics are batched separately. Default
[Some 400]. *)
batch_metrics: int option;
(** Batch metrics? If [Some i], then this produces batches of (at most)
[i] items. If [None], there is no batching.
Note that traces and metrics are batched separately.
Default [None].
Note that traces and metrics are batched separately. Default [None].
*)
batch_logs: int option;
(** Batch logs? See {!batch_metrics} for details.
Default [Some 400] *)
(** Batch logs? See {!batch_metrics} for details. Default [Some 400] *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs. Default 500. *)
incomplete. Note that the batch might take longer than that, because
this is only checked when a new event occurs. Default 500. *)
}
(** Configuration.
To build one, use {!make} below. This might be extended with more
fields in the future. *)
To build one, use {!make} below. This might be extended with more fields in
the future. *)
val make :
?debug:bool ->
@ -49,28 +45,34 @@ val make :
t
(** Make a configuration.
@param thread if true and [bg_threads] is not provided, we will pick a number
of bg threads. Otherwise the number of [bg_threads] superseeds this option.
@param thread
if true and [bg_threads] is not provided, we will pick a number of bg
threads. Otherwise the number of [bg_threads] superseeds this option.
@param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url.
Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set.
@param url
base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url http://localhost:4318
Example of constructed per-signal urls with the base url
http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal type.
Use per-signal url options if different urls are needed for each signal
type.
@param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_traces
url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_metrics
url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set.
The url is used as-is without any modification.
*)
@param url_logs
url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is
used as-is without any modification. *)
val pp : Format.formatter -> t -> unit

View file

@ -9,8 +9,8 @@ open Opentelemetry
include Common_
external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force
to use Lwt's latest version *)
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
use Lwt's latest version *)
let needs_gc_metrics = Atomic.make false
@ -133,7 +133,8 @@ end = struct
let bt = Printexc.get_backtrace () in
Error
(`Failure
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt))
(spf "decoding failed with:\n%s\n%s" (Printexc.to_string e)
bt))
in
Lwt.return r
) else (
@ -167,10 +168,10 @@ module Batch : sig
val push' : 'a t -> 'a -> unit
val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
(** Is the batch ready to be emitted? If batching is disabled,
this is true as soon as {!is_empty} is false. If a timeout is provided
for this batch, then it will be ready if an element has been in it
for at least the timeout.
(** Is the batch ready to be emitted? If batching is disabled, this is true as
soon as {!is_empty} is false. If a timeout is provided for this batch,
then it will be ready if an element has been in it for at least the
timeout.
@param now passed to implement timeout *)
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
@ -255,15 +256,14 @@ module type EMITTER = sig
val tick : unit -> unit
val cleanup : unit -> unit
val cleanup : on_done:(unit -> unit) -> unit -> unit
end
(* make an emitter.
exceptions inside should be caught, see
https://opentelemetry.io/docs/reference/specification/error-handling/ *)
let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t)
() : (module EMITTER) =
let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) =
let open Proto in
let open Lwt.Syntax in
(* local helpers *)
@ -448,13 +448,12 @@ let mk_emitter ~(after_cleanup : unit Lwt.u option) ~stop ~(config : Config.t)
(* if called in a blocking context: work in the background *)
let tick () = Lwt.async tick_
let cleanup () =
let cleanup ~on_done () =
if !debug_ then Printf.eprintf "opentelemetry: exiting…\n%!";
Lwt.async (fun () ->
let* () = emit_all_force httpc encoder in
Httpc.cleanup httpc;
(* resolve [after_cleanup], if provided *)
Option.iter (fun prom -> Lwt.wakeup_later prom ()) after_cleanup;
on_done ();
Lwt.return ())
end in
(module M)
@ -464,13 +463,9 @@ module Backend
val stop : bool Atomic.t
val config : Config.t
val after_cleanup : unit Lwt.u option
end)
() : Opentelemetry.Collector.BACKEND = struct
include
(val mk_emitter ~after_cleanup:Arg.after_cleanup ~stop:Arg.stop
~config:Arg.config ())
include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ())
open Opentelemetry.Proto
open Opentelemetry.Collector
@ -562,8 +557,7 @@ module Backend
}
end
let create_backend ?after_cleanup ?(stop = Atomic.make false)
?(config = Config.make ()) () =
let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
debug_ := config.debug;
let module B =
@ -572,43 +566,37 @@ let create_backend ?after_cleanup ?(stop = Atomic.make false)
let stop = stop
let config = config
let after_cleanup = after_cleanup
end)
()
in
(module B : OT.Collector.BACKEND)
let setup_ ?stop ?config () : (unit -> unit) * unit Lwt.t =
let cleanup_done, cleanup_done_prom = Lwt.wait () in
let backend =
create_backend ~after_cleanup:cleanup_done_prom ?stop ?config ()
in
let setup_ ?stop ?config () : unit =
let backend = create_backend ?stop ?config () in
OT.Collector.set_backend backend;
OT.Collector.remove_backend, cleanup_done
()
let setup ?stop ?config ?(enable = true) () =
if enable then (
let cleanup, _lwt = setup_ ?stop ?config () in
at_exit cleanup
)
if enable then setup_ ?stop ?config ()
let remove_backend () : unit Lwt.t =
let done_fut, done_u = Lwt.wait () in
OT.Collector.remove_backend ~on_done:(fun () -> Lwt.wakeup_later done_u ()) ();
done_fut
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t
=
if enable then
if enable then (
let open Lwt.Syntax in
let cleanup, cleanup_done = setup_ ?stop ~config () in
setup_ ?stop ~config ();
Lwt.catch
(fun () ->
let* res = f () in
cleanup ();
let+ () = cleanup_done in
let+ () = remove_backend () in
res)
(fun exn ->
cleanup ();
let* () = cleanup_done in
let* () = remove_backend () in
reraise exn)
else
) else
f ()

View file

@ -13,24 +13,28 @@ val set_headers : (string * string) list -> unit
module Config = Config
val create_backend :
?after_cleanup:unit Lwt.u ->
?stop:bool Atomic.t ->
?config:Config.t ->
unit ->
(module Opentelemetry.Collector.BACKEND)
(** Create a new backend using lwt and cohttp
@param after_cleanup if provided, this is resolved into [()] after cleanup is done (since 0.11) *)
NOTE [after_cleanup] optional parameter removed since NEXT_RELEASE *)
val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can
be used to enable/disable the setup depending on CLI arguments
or environment.
@param enable
actually setup the backend (default true). This can be used to
enable/disable the setup depending on CLI arguments or environment.
@param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads
will all stop after a little while.
*)
@param stop
an atomic boolean. When it becomes true, background threads will all stop
after a little while. *)
val remove_backend : unit -> unit Lwt.t
(** Shutdown current backend
@since NEXT_RELEASE *)
val with_setup :
?stop:bool Atomic.t ->
@ -39,6 +43,5 @@ val with_setup :
unit ->
(unit -> 'a Lwt.t) ->
'a Lwt.t
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns
See {!setup} for more details. *)
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
[f()] returns See {!setup} for more details. *)

View file

@ -15,8 +15,8 @@ val pop : 'a t -> 'a
@raise Closed if the queue was closed before a new element was available. *)
val pop_all : 'a t -> 'a Queue.t -> unit
(** [pop_all q into] pops all the elements of [q]
and moves them into [into]. It might block until an element comes.
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. It
might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit

View file

@ -10,31 +10,29 @@ type t = private {
"OTEL_EXPORTER_OTLP_HEADERS" if set. *)
batch_timeout_ms: int;
(** Number of milliseconds after which we will emit a batch, even
incomplete.
Note that the batch might take longer than that, because this is
only checked when a new event occurs or when a tick
is emitted. Default 2_000. *)
incomplete. Note that the batch might take longer than that, because
this is only checked when a new event occurs or when a tick is
emitted. Default 2_000. *)
bg_threads: int;
(** Are there background threads, and how many? Default [4].
This will be adjusted to be at least [1] and at most [32]. *)
(** Are there background threads, and how many? Default [4]. This will be
adjusted to be at least [1] and at most [32]. *)
ticker_thread: bool;
(** If true, start a thread that regularly checks if signals should
be sent to the collector. Default [true] *)
(** If true, start a thread that regularly checks if signals should be
sent to the collector. Default [true] *)
ticker_interval_ms: int;
(** Interval for ticker thread, in milliseconds. This is
only useful if [ticker_thread] is [true].
This will be clamped between [2 ms] and some longer
interval (maximum [60s] currently).
Default 500.
(** Interval for ticker thread, in milliseconds. This is only useful if
[ticker_thread] is [true]. This will be clamped between [2 ms] and
some longer interval (maximum [60s] currently). Default 500.
@since 0.7 *)
self_trace: bool;
(** If true, the OTEL library will also emit its own spans. Default [false].
(** If true, the OTEL library will also emit its own spans. Default
[false].
@since 0.7 *)
}
(** Configuration.
To build one, use {!make} below. This might be extended with more
fields in the future. *)
To build one, use {!make} below. This might be extended with more fields in
the future. *)
val make :
?debug:bool ->
@ -52,24 +50,30 @@ val make :
t
(** Make a configuration.
@param url base url used to construct per-signal urls. Per-signal url options take precedence over this base url.
Default is "http://localhost:4318", or "OTEL_EXPORTER_OTLP_ENDPOINT" if set.
@param url
base url used to construct per-signal urls. Per-signal url options take
precedence over this base url. Default is "http://localhost:4318", or
"OTEL_EXPORTER_OTLP_ENDPOINT" if set.
Example of constructed per-signal urls with the base url http://localhost:4318
Example of constructed per-signal urls with the base url
http://localhost:4318
- Traces: http://localhost:4318/v1/traces
- Metrics: http://localhost:4318/v1/metrics
- Logs: http://localhost:4318/v1/logs
Use per-signal url options if different urls are needed for each signal type.
Use per-signal url options if different urls are needed for each signal
type.
@param url_traces url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_traces
url to send traces, or "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_metrics url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set.
The url is used as-is without any modification.
@param url_metrics
url to send metrics, or "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT" if set. The
url is used as-is without any modification.
@param url_logs url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set.
The url is used as-is without any modification.
*)
@param url_logs
url to send logs, or "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" if set. The url is
used as-is without any modification. *)
val pp : Format.formatter -> t -> unit

View file

@ -39,9 +39,9 @@ module Self_trace = struct
)
end
(** capture current GC metrics if {!needs_gc_metrics} is true
or it has been a long time since the last GC metrics collection,
and push them into {!gc_metrics} for later collection *)
(** capture current GC metrics if {!needs_gc_metrics} is true or it has been a
long time since the last GC metrics collection, and push them into
{!gc_metrics} for later collection *)
let sample_gc_metrics_if_needed () =
let now = Mtime_clock.now () in
let alarm = Atomic.exchange needs_gc_metrics false in
@ -102,7 +102,12 @@ let start_bg_thread (f : unit -> unit) : Thread.t =
f ()
in
(* no signals on Windows *)
let run () = if Sys.win32 then f () else unix_run () in
let run () =
if Sys.win32 then
f ()
else
unix_run ()
in
Thread.create run ()
let str_to_hex (s : string) : string =
@ -128,7 +133,7 @@ module Backend_impl : sig
val send_event : t -> Event.t -> unit
val shutdown : t -> unit
val shutdown : t -> on_done:(unit -> unit) -> unit
end = struct
open Opentelemetry.Proto
@ -250,8 +255,8 @@ end = struct
let[@inline] send_event (self : t) ev : unit = B_queue.push self.q ev
(** Thread that, in a loop, reads from [q] to get the
next message to send via http *)
(** Thread that, in a loop, reads from [q] to get the next message to send via
http *)
let bg_thread_loop (self : t) : unit =
Ezcurl.with_client ?set_opts:None @@ fun client ->
let stop = self.stop in
@ -379,7 +384,7 @@ end = struct
self
let shutdown self : unit =
let shutdown self ~on_done : unit =
Atomic.set self.stop true;
if not (Atomic.exchange self.cleaned true) then (
(* empty batches *)
@ -392,7 +397,8 @@ end = struct
(* close send queues, then wait for all threads *)
B_queue.close self.send_q;
Array.iter Thread.join self.send_threads
)
);
on_done ()
end
let create_backend ?(stop = Atomic.make false)
@ -480,7 +486,7 @@ let create_backend ?(stop = Atomic.make false)
Backend_impl.send_event backend Event.E_tick;
List.iter (fun f -> f ()) (AList.get @@ Atomic.get on_tick_cbs_)
let cleanup () = Backend_impl.shutdown backend
let cleanup ~on_done () = Backend_impl.shutdown backend ~on_done
end in
(module M)
@ -498,7 +504,7 @@ let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () =
start_bg_thread tick_loop
let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
=
: unit =
let backend = create_backend ~stop ~config () in
Opentelemetry.Collector.set_backend backend;
@ -508,18 +514,18 @@ let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
(* at most a minute *)
let sleep_ms = min 60_000 (max 2 config.ticker_interval_ms) in
ignore (setup_ticker_thread ~stop ~sleep_ms backend () : Thread.t)
);
OT.Collector.remove_backend
)
let remove_backend () : unit =
(* we don't need the callback, this runs in the same thread *)
OT.Collector.remove_backend () ~on_done:ignore
let setup ?stop ?config ?(enable = true) () =
if enable then (
let cleanup = setup_ ?stop ?config () in
at_exit cleanup
)
if enable then setup_ ?stop ?config ()
let with_setup ?stop ?config ?(enable = true) () f =
if enable then (
let cleanup = setup_ ?stop ?config () in
Fun.protect ~finally:cleanup f
setup_ ?stop ?config ();
Fun.protect ~finally:remove_backend f
) else
f ()

View file

@ -20,13 +20,16 @@ val create_backend :
val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.
@param enable actually setup the backend (default true). This can
be used to enable/disable the setup depending on CLI arguments
or environment.
@param enable
actually setup the backend (default true). This can be used to
enable/disable the setup depending on CLI arguments or environment.
@param config configuration to use
@param stop an atomic boolean. When it becomes true, background threads
will all stop after a little while.
*)
@param stop
an atomic boolean. When it becomes true, background threads will all stop
after a little while. *)
val remove_backend : unit -> unit
(** @since NEXT_RELEASE *)
val with_setup :
?stop:bool Atomic.t ->
@ -35,6 +38,5 @@ val with_setup :
unit ->
(unit -> 'a) ->
'a
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up
after [f()] returns
See {!setup} for more details. *)
(** [with_setup () f] is like [setup(); f()] but takes care of cleaning up after
[f()] returns See {!setup} for more details. *)

View file

@ -1,7 +1,7 @@
val set_mutex : lock:(unit -> unit) -> unlock:(unit -> unit) -> unit
(** Set a pair of lock/unlock functions that are used to
protect access to global state, if needed. By default these do nothing. *)
(** Set a pair of lock/unlock functions that are used to protect access to
global state, if needed. By default these do nothing. *)
val with_lock : (unit -> 'a) -> 'a
(** Call [f()] while holding the mutex defined {!set_mutex}, then
release the mutex. *)
(** Call [f()] while holding the mutex defined {!set_mutex}, then release the
mutex. *)

View file

@ -28,8 +28,8 @@ module Proto = Opentelemetry_proto
(** Unix timestamp.
These timestamps measure time since the Unix epoch (jan 1, 1970) UTC
in nanoseconds. *)
These timestamps measure time since the Unix epoch (jan 1, 1970) UTC in
nanoseconds. *)
module Timestamp_ns = struct
type t = int64
@ -56,18 +56,15 @@ module Collector = struct
open Opentelemetry_proto
type 'msg sender = { send: 'a. 'msg -> ret:(unit -> 'a) -> 'a }
(** Sender interface for a message of type [msg].
Inspired from Logs' reporter
(see {{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc})
but without [over] as it doesn't make much sense in presence
of batching.
(** Sender interface for a message of type [msg]. Inspired from Logs' reporter
(see
{{:https://erratique.ch/software/logs/doc/Logs/index.html#sync} its doc})
but without [over] as it doesn't make much sense in presence of batching.
The [ret] callback is used to return the desired type (unit, or
a Lwt promise, or anything else) once the event has been transferred
to the backend.
It doesn't mean the event has been collected yet, it
could sit in a batch queue for a little while.
*)
The [ret] callback is used to return the desired type (unit, or a Lwt
promise, or anything else) once the event has been transferred to the
backend. It doesn't mean the event has been collected yet, it could sit in
a batch queue for a little while. *)
(** Collector client interface. *)
module type BACKEND = sig
@ -79,20 +76,24 @@ module Collector = struct
val signal_emit_gc_metrics : unit -> unit
(** Signal the backend that it should emit GC metrics when it has the
chance. This should be installed in a GC alarm or another form
of regular trigger. *)
chance. This should be installed in a GC alarm or another form of
regular trigger. *)
val tick : unit -> unit
(** Should be called regularly for background processing,
timeout checks, etc. *)
(** Should be called regularly for background processing, timeout checks,
etc. *)
val set_on_tick_callbacks : (unit -> unit) AList.t -> unit
(** Give the collector the list of callbacks to be executed
when [tick()] is called. Each such callback should be short and
reentrant. Depending on the collector's implementation, it might be
called from a thread that is not the one that called [on_tick]. *)
(** Give the collector the list of callbacks to be executed when [tick()] is
called. Each such callback should be short and reentrant. Depending on
the collector's implementation, it might be called from a thread that is
not the one that called [on_tick]. *)
val cleanup : unit -> unit
val cleanup : on_done:(unit -> unit) -> unit -> unit
(** [cleanup ~on_done ()] is called when the collector is shut down, and is
responsible for sending remaining batches, flushing sockets, etc.
@param on_done
callback invoked after the cleanup is done. since NEXT_RELEASE *)
end
type backend = (module BACKEND)
@ -113,7 +114,9 @@ module Collector = struct
let set_on_tick_callbacks _cbs = ()
let cleanup () = ()
let cleanup ~on_done () =
on_done ();
()
end
module Debug_backend (B : BACKEND) : BACKEND = struct
@ -155,7 +158,7 @@ module Collector = struct
let set_on_tick_callbacks cbs = B.set_on_tick_callbacks cbs
let cleanup () = B.cleanup ()
let cleanup ~on_done () = B.cleanup ~on_done ()
end
let debug_backend : backend = (module Debug_backend (Noop_backend))
@ -174,13 +177,14 @@ module Collector = struct
Atomic.set backend (Some b)
(** Remove current backend, if any.
@since 0.11 *)
let remove_backend () : unit =
@since 0.11
@param on_done see {!BACKEND.cleanup}, since NEXT_RELEASE *)
let remove_backend ~on_done () : unit =
match Atomic.exchange backend None with
| None -> ()
| Some (module B) ->
B.tick ();
B.cleanup ()
B.cleanup ~on_done ()
(** Is there a configured backend? *)
let[@inline] has_backend () : bool = Atomic.get backend != None
@ -209,18 +213,18 @@ module Collector = struct
let[@inline] on_tick f = AList.add on_tick_cbs_ f
(** Do background work. Call this regularly if the collector doesn't
already have a ticker thread or internal timer. *)
(** Do background work. Call this regularly if the collector doesn't already
have a ticker thread or internal timer. *)
let tick () =
match Atomic.get backend with
| None -> ()
| Some (module B) -> B.tick ()
let with_setup_debug_backend b ?(enable = true) () f =
let with_setup_debug_backend ?(on_done = ignore) b ?(enable = true) () f =
let (module B : BACKEND) = b in
if enable then (
set_backend b;
Fun.protect ~finally:B.cleanup f
Fun.protect ~finally:(B.cleanup ~on_done) f
) else
f ()
end
@ -338,8 +342,8 @@ end = struct
let pp fmt t = Format.fprintf fmt "%s" (to_hex t)
end
(** Hmap key to carry around a {!Trace_id.t}, to remember what the current
trace is.
(** Hmap key to carry around a {!Trace_id.t}, to remember what the current trace
is.
@since 0.8 *)
let k_trace_id : Trace_id.t Hmap.key = Hmap.Key.create ()
@ -402,7 +406,8 @@ end
(** Span context. This bundles up a trace ID and parent ID.
{{: https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext} https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext}
{{:https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext}
https://opentelemetry.io/docs/specs/otel/trace/api/#spancontext}
@since 0.7 *)
module Span_ctx : sig
type t
@ -510,7 +515,8 @@ let k_span_ctx : Span_ctx.t Hmap.key = Hmap.Key.create ()
(** Semantic conventions
{{: https://opentelemetry.io/docs/specs/semconv/} https://opentelemetry.io/docs/specs/semconv/} *)
{{:https://opentelemetry.io/docs/specs/semconv/}
https://opentelemetry.io/docs/specs/semconv/} *)
module Conventions = struct
module Attributes = struct
module Process = struct
@ -570,7 +576,8 @@ module Conventions = struct
let url_scheme = "url.scheme"
end
(** https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/host.md *)
(** https://github.com/open-telemetry/semantic-conventions/blob/main/docs/resource/host.md
*)
module Host = struct
let id = "host.id"
@ -684,9 +691,9 @@ module Globals = struct
default_instrumentation_scope ~version:"%%VERSION_NUM%%" ~name:"ocaml-otel"
()
(** Global attributes, initially set
via OTEL_RESOURCE_ATTRIBUTES and modifiable
by the user code. They will be attached to each outgoing metrics/traces. *)
(** Global attributes, initially set via OTEL_RESOURCE_ATTRIBUTES and
modifiable by the user code. They will be attached to each outgoing
metrics/traces. *)
let global_attributes : key_value list ref =
let parse_pair s =
match String.split_on_char '=' s with
@ -709,10 +716,10 @@ module Globals = struct
let not_redundant kv = List.for_all (fun kv' -> kv.key <> kv'.key) into in
List.rev_append (List.filter not_redundant !global_attributes) into
(** Default span kind in {!Span.create}.
This will be used in all spans that do not specify [~kind] explicitly;
it is set to "internal", following directions from the [.proto] file.
It can be convenient to set "client" or "server" uniformly in here.
(** Default span kind in {!Span.create}. This will be used in all spans that
do not specify [~kind] explicitly; it is set to "internal", following
directions from the [.proto] file. It can be convenient to set "client" or
"server" uniformly in here.
@since 0.4 *)
let default_span_kind = ref Proto.Trace.Span_kind_internal
@ -746,8 +753,8 @@ end
(** Events.
Events occur at a given time and can carry attributes. They always
belong in a span. *)
Events occur at a given time and can carry attributes. They always belong in
a span. *)
module Event : sig
open Proto.Trace
@ -769,10 +776,9 @@ end
(** Span Link
A pointer from the current span to another span in the same trace or in a
different trace. For example, this can be used in batching operations,
where a single batch handler processes multiple requests from different
traces or when the handler receives a request from a different project.
*)
different trace. For example, this can be used in batching operations, where
a single batch handler processes multiple requests from different traces or
when the handler receives a request from a different project. *)
module Span_link : sig
open Proto.Trace
@ -872,8 +878,7 @@ end
(** Scopes.
A scope is a trace ID and the span ID of the currently active span.
*)
A scope is a trace ID and the span ID of the currently active span. *)
module Scope : sig
type item_list
@ -917,8 +922,8 @@ module Scope : sig
val add_event : t -> (unit -> Event.t) -> unit
(** Add an event to the scope. It will be aggregated into the span.
Note that this takes a function that produces an event, and will only
call it if there is an instrumentation backend. *)
Note that this takes a function that produces an event, and will only call
it if there is an instrumentation backend. *)
val record_exception : t -> exn -> Printexc.raw_backtrace -> unit
@ -931,14 +936,14 @@ module Scope : sig
val add_links : t -> (unit -> Span_link.t list) -> unit
(** Add links to the scope. It will be aggregated into the span.
Note that this takes a function that produces links, and will only
call it if there is an instrumentation backend. *)
Note that this takes a function that produces links, and will only call it
if there is an instrumentation backend. *)
val set_status : t -> Span_status.t -> unit
(** set the span status.
Note that this function will be
called only if there is an instrumentation backend. *)
Note that this function will be called only if there is an instrumentation
backend. *)
val set_kind : t -> Span_kind.t -> unit
(** Set the span's kind.
@ -956,7 +961,8 @@ module Scope : sig
the (thread|continuation)-local scope, then reverts to the previous local
scope, if any.
@see <https://github.com/ELLIOTTCABLE/ocaml-ambient-context> ambient-context docs *)
@see <https://github.com/ELLIOTTCABLE/ocaml-ambient-context>
ambient-context docs *)
end = struct
type item_list =
| Nil
@ -1093,10 +1099,10 @@ end
(** Spans.
A Span is the workhorse of traces, it indicates an operation that
took place over a given span of time (indicated by start_time and end_time)
as part of a hierarchical trace. All spans in a given trace are bound by
the use of the same {!Trace_id.t}. *)
A Span is the workhorse of traces, it indicates an operation that took place
over a given span of time (indicated by start_time and end_time) as part of
a hierarchical trace. All spans in a given trace are bound by the use of the
same {!Trace_id.t}. *)
module Span : sig
open Proto.Trace
@ -1140,8 +1146,9 @@ module Span : sig
(** [create ~trace_id name] creates a new span with its unique ID.
@param trace_id the trace this belongs to
@param parent parent span, if any
@param links list of links to other spans, each with their trace state
(see {{: https://www.w3.org/TR/trace-context/#tracestate-header} w3.org}) *)
@param links
list of links to other spans, each with their trace state (see
{{:https://www.w3.org/TR/trace-context/#tracestate-header} w3.org}) *)
end = struct
open Proto.Trace
@ -1184,7 +1191,9 @@ end
(** Traces.
See {{: https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal} the spec} *)
See
{{:https://opentelemetry.io/docs/reference/specification/overview/#tracing-signal}
the spec} *)
module Trace = struct
open Proto.Trace
@ -1201,11 +1210,11 @@ module Trace = struct
(** Sync emitter.
This instructs the collector to forward
the spans to some backend at a later point.
This instructs the collector to forward the spans to some backend at a
later point.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
{b NOTE} be careful not to call this inside a Gc alarm, as it can cause
deadlocks. *)
let emit ?service_name ?attrs (spans : span list) : unit =
let rs = make_resource_spans ?service_name ?attrs spans in
Collector.send_trace [ rs ] ~ret:(fun () -> ())
@ -1294,12 +1303,13 @@ module Trace = struct
scope in the ambient context, so that any logically-nested calls to
{!with_} will use this span as their parent.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks.
{b NOTE} be careful not to call this inside a Gc alarm, as it can cause
deadlocks.
@param force_new_trace_id if true (default false), the span will not use a
ambient scope, the [~scope] argument, nor [~trace_id], but will instead
always create fresh identifiers for this span *)
@param force_new_trace_id
if true (default false), the span will not use a ambient scope, the
[~scope] argument, nor [~trace_id], but will instead always create fresh
identifiers for this span *)
let with_ ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind
?trace_id ?parent ?scope ?links name (cb : Scope.t -> 'a) : 'a =
@ -1322,16 +1332,18 @@ end
(** Metrics.
See {{: https://opentelemetry.io/docs/reference/specification/overview/#metric-signal} the spec} *)
See
{{:https://opentelemetry.io/docs/reference/specification/overview/#metric-signal}
the spec} *)
module Metrics = struct
open Proto
open Proto.Metrics
type t = Metrics.metric
(** A single metric, measuring some time-varying quantity or statistical
distribution. It is composed of one or more data points that have
precise values and time stamps. Each distinct metric should have a
distinct name. *)
distribution. It is composed of one or more data points that have precise
values and time stamps. Each distinct metric should have a distinct name.
*)
open struct
let _program_start = Timestamp_ns.now_unix_ns ()
@ -1377,10 +1389,11 @@ module Metrics = struct
(** Histogram data
@param count number of values in population (non negative)
@param sum sum of values in population (0 if count is 0)
@param bucket_counts count value of histogram for each bucket. Sum of
the counts must be equal to [count].
length must be [1+length explicit_bounds]
@param explicit_bounds strictly increasing list of bounds for the buckets *)
@param bucket_counts
count value of histogram for each bucket. Sum of the counts must be
equal to [count]. length must be [1+length explicit_bounds]
@param explicit_bounds strictly increasing list of bounds for the buckets
*)
let histogram_data_point ?(start_time_unix_nano = _program_start)
?(now = Timestamp_ns.now_unix_ns ()) ?(attrs = []) ?(exemplars = [])
?(explicit_bounds = []) ?sum ~bucket_counts ~count () :
@ -1411,20 +1424,18 @@ module Metrics = struct
let resource = Proto.Resource.default_resource ~attributes () in
default_resource_metrics ~scope_metrics:[ lm ] ~resource:(Some resource) ()
(** Emit some metrics to the collector (sync). This blocks until
the backend has pushed the metrics into some internal queue, or
discarded them.
(** Emit some metrics to the collector (sync). This blocks until the backend
has pushed the metrics into some internal queue, or discarded them.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks.
*)
{b NOTE} be careful not to call this inside a Gc alarm, as it can cause
deadlocks. *)
let emit ?attrs (l : t list) : unit =
let rm = make_resource_metrics ?attrs l in
Collector.send_metrics [ rm ] ~ret:ignore
end
(** A set of callbacks that produce metrics when called.
The metrics are automatically called regularly.
(** A set of callbacks that produce metrics when called. The metrics are
automatically called regularly.
This allows applications to register metrics callbacks from various points
in the program (or even in libraries), and not worry about setting
@ -1436,9 +1447,9 @@ module Metrics_callbacks = struct
(** [register f] adds the callback [f] to the list.
[f] will be called at unspecified times and is expected to return
a list of metrics. It might be called regularly by the backend,
in particular (but not only) when {!Collector.tick} is called. *)
[f] will be called at unspecified times and is expected to return a list
of metrics. It might be called regularly by the backend, in particular
(but not only) when {!Collector.tick} is called. *)
let register f : unit =
if !cbs_ = [] then
(* make sure we call [f] (and others) at each tick *)
@ -1452,7 +1463,9 @@ end
(** Logs.
See {{: https://opentelemetry.io/docs/reference/specification/overview/#log-signal} the spec} *)
See
{{:https://opentelemetry.io/docs/reference/specification/overview/#log-signal}
the spec} *)
module Logs = struct
open Opentelemetry_proto
open Logs
@ -1527,9 +1540,8 @@ module Logs = struct
(** Emit logs.
This instructs the collector to send the logs to some backend at
a later date.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
This instructs the collector to send the logs to some backend at a later
date. {b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
let emit ?service_name ?attrs (l : t list) : unit =
let attributes = Globals.mk_attributes ?service_name ?attrs () in
@ -1548,12 +1560,10 @@ end
(** Implementation of the W3C Trace Context spec
https://www.w3.org/TR/trace-context/
*)
https://www.w3.org/TR/trace-context/ *)
module Trace_context = struct
(** The traceparent header
https://www.w3.org/TR/trace-context/#traceparent-header
*)
https://www.w3.org/TR/trace-context/#traceparent-header *)
module Traceparent = struct
let name = "traceparent"
@ -1562,15 +1572,16 @@ module Trace_context = struct
The values are of the form:
{[
{version}-{trace_id}-{parent_id}-{flags}
{ version } - { trace_id } - { parent_id } - { flags }
]}
For example:
{[ 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 ]}
{[
00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
]}
[{flags}] are currently ignored.
*)
[{flags}] are currently ignored. *)
let of_value str : (Trace_id.t * Span_id.t, string) result =
match Span_ctx.of_w3c_trace_context (Bytes.unsafe_of_string str) with
| Ok sp -> Ok (Span_ctx.trace_id sp, Span_ctx.parent_id sp)
@ -1588,8 +1599,8 @@ end
These metrics are emitted after each GC collection. *)
module GC_metrics : sig
val basic_setup : unit -> unit
(** Setup a hook that will emit GC statistics on every tick (assuming
a ticker thread) *)
(** Setup a hook that will emit GC statistics on every tick (assuming a ticker
thread) *)
val get_runtime_attributes : unit -> Span.key_value list
(** Get OCaml name and version runtime attributes *)
@ -1597,7 +1608,9 @@ module GC_metrics : sig
val get_metrics : unit -> Metrics.t list
(** Get a few metrics from the current state of the GC *)
end = struct
(** See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes *)
(** See
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/process.md#process-runtimes
*)
let runtime_attributes =
lazy
Conventions.Attributes.

View file

@ -3,12 +3,12 @@
We need random identifiers for trace IDs and span IDs. *)
val rand_bytes_16 : (unit -> bytes) ref
(** Generate 16 bytes of random data.
The implementation can be swapped to use any random generator. *)
(** Generate 16 bytes of random data. The implementation can be swapped to use
any random generator. *)
val rand_bytes_8 : (unit -> bytes) ref
(** Generate 16 bytes of random data.
The implementation can be swapped to use any random generator. *)
(** Generate 16 bytes of random data. The implementation can be swapped to use
any random generator. *)
val default_rand_bytes_8 : unit -> bytes
(** Default implementation using {!Random} *)

View file

@ -19,15 +19,13 @@ module Server : sig
{[
let my_server callback =
let callback_traced =
Opentelemetry_cohttp_lwt.Server.trace
~service_name:"my-service"
Opentelemetry_cohttp_lwt.Server.trace ~service_name:"my-service"
(fun _scope -> callback)
in
Cohttp_lwt_unix.Server.create
~mode:(`TCP (`Port 8080))
(Server.make () ~callback:callback_traced)
]}
*)
]} *)
val with_ :
?trace_state:string ->
@ -43,24 +41,20 @@ module Server : sig
Identical to [Opentelemetry_lwt.Trace.with_], but fetches/stores the trace
scope in the [x-ocaml-otel-traceparent] header in the request for
convenience.
*)
convenience. *)
val get_trace_context :
?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option
(** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header
added by [trace] and [with_].
*)
added by [trace] and [with_]. *)
val set_trace_context : Otel.Scope.t -> Request.t -> Request.t
(** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used
by [trace] and [with_].
*)
by [trace] and [with_]. *)
val remove_trace_context : Request.t -> Request.t
(** Strip the custom [x-ocaml-otel-traceparent] header added by [trace] and
[with_].
*)
[with_]. *)
end = struct
let attrs_of_request (req : Request.t) =
let meth = req |> Request.meth |> Code.string_of_method in

View file

@ -12,8 +12,8 @@ module Metrics_callbacks = Metrics_callbacks
module Trace_context = Trace_context
external reraise : exn -> 'a = "%reraise"
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force
to use Lwt's latest version *)
(** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to
use Lwt's latest version *)
module Trace = struct
include Trace

View file

@ -144,8 +144,8 @@ module Internal = struct
assert (Bytes.length bs = 8);
Bytes.get_int64_le bs 0
let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
=
let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option)
~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name =
let open Otel in
let otel_id = Span_id.create () in
let otrace_id = otrace_of_otel otel_id in
@ -159,7 +159,8 @@ module Internal = struct
let parent =
match explicit_parent, parent_scope with
| Some p, _ ->
Some (Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ())
Some
(Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ())
| None, Some parent -> Some (Otel.Scope.to_span_ctx parent)
| None, None -> None
in