Merge pull request #34 from ELLIOTTCABLE/configurable-scope-storage

This commit is contained in:
ELLIOTTCABLE 2023-09-22 12:45:50 -05:00 committed by GitHub
commit d578de0ceb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 788 additions and 366 deletions

View file

@ -1,9 +1,13 @@
## next version
- replace `Thread_local` with `ocaml-ambient-context`, allowing for implicit scope in Lwt/Eio contexts (#34)
- update `ocaml-trace` interface to use the new `trace.0.3`-style API (breaking, see #34)
## 0.5
- new implementation for ocurl backend, using ezcurl and queues
- refactor lwt: Use `try%lwt` over `Lwt.catch`
- add `opentelemetry.trace` (optional, depends on `trace`)
- add `opentelemetry.trace` (optional, depends on `ocaml-trace`)
## 0.4

View file

@ -26,10 +26,11 @@ MIT
* [x] batching, perf, etc.
- [ ] async collector relying on ocurl-multi
- [ ] interface with `logs` (carry context around)
- [x] implicit scope (via [ambient-context][])
## Use
For now, instrument manually:
For now, instrument traces/spans, logs, and metrics manually:
```ocaml
module Otel = Opentelemetry
@ -45,16 +46,34 @@ let foo () =
]);
do_more_work();
()
```
### Setup
If you're writing a top-level application, you need to perform some initial configuration.
1. Set the [`service_name`][];
2. configure our [ambient-context][] dependency with the appropriate storage for your environment — TLS, Lwt, Eio ... (see [their docs][install-ambient-storage] for more details);
3. and install a [`Collector`][] (usually by calling your collector's `with_setup` function.)
For example, if your application is using Lwt, and you're using `ocurl` as your collector, you might do something like this:
```ocaml
let main () =
Otel.Globals.service_name := "my_service";
Otel.GC_metrics.basic_setup();
Ambient_context.with_storage_provider (Ambient_context_lwt.storage ()) @@ fun () ->
Opentelemetry_client_ocurl.with_setup () @@ fun () ->
(* … *)
foo ();
(* … *)
```
```
[`service_name`]: <https://v3.ocaml.org/p/opentelemetry/0.5/doc/Opentelemetry/Globals/index.html#val-service_name>
[`Collector`]: <https://v3.ocaml.org/p/opentelemetry/0.5/doc/Opentelemetry/Collector/index.html>
[ambient-context]: <https://v3.ocaml.org/p/ambient-context>
[install-ambient-storage]: <https://github.com/ELLIOTTCABLE/ocaml-ambient-context#-as-a-top-level-application>
## Configuration

3
dune
View file

@ -1,3 +1,4 @@
(env
(_
(flags :standard -warn-error -a+8 -w +a-4-30-40-41-42-44-48-70 -strict-sequence)))
(flags :standard -warn-error -a+8 -w +a-4-30-40-41-42-44-48-70
-strict-sequence)))

View file

@ -1,26 +1,46 @@
(lang dune 2.7)
(name opentelemetry)
(generate_opam_files true)
(source
(github imandra-ai/ocaml-opentelemetry))
(version 0.5)
(authors "the Imandra team and contributors")
(maintainers "the Imandra team and contributors")
(license MIT)
;(documentation https://url/to/documentation)
(package
(name opentelemetry)
(synopsis "Instrumentation for https://opentelemetry.io")
(depends
(ocaml (>= "4.08"))
ptime
(odoc :with-doc)
(pbrt (>= 2.3)))
(depopts
(trace (>= 0.1)))
(ocaml
(>= "4.08"))
ptime
ambient-context
(odoc :with-doc)
(alcotest :with-test)
(pbrt
(>= 2.3))
(ocaml-lsp-server :with-dev-setup)
(ocamlformat
(and
:with-dev-setup
(>= 0.24)
(< 0.25))))
(depopts trace)
(conflicts
(trace
(or
(< 0.4)
(>= 0.5))))
(tags
(instrumentation tracing opentelemetry datadog jaeger)))
@ -28,49 +48,72 @@
(name opentelemetry-lwt)
(synopsis "Lwt-compatible instrumentation for https://opentelemetry.io")
(depends
(ocaml (>= "4.08"))
(opentelemetry (= :version))
(cohttp-lwt-unix :with-test)
(odoc :with-doc)
(lwt (>= "5.3"))
(lwt_ppx (>= "2.0")))
(ocaml
(>= "4.08"))
ambient-context
(opentelemetry
(= :version))
(cohttp-lwt-unix :with-test)
(odoc :with-doc)
(lwt
(>= "5.3"))
(lwt_ppx
(>= "2.0")))
(tags
(instrumentation tracing opentelemetry datadog lwt)))
(package
(name opentelemetry-client-ocurl)
(depends
(ocaml (>= "4.08"))
(mtime (>= "1.4")) ; for spans
; atomic ; vendored
(opentelemetry (= :version))
(pbrt (>= 2.3))
(odoc :with-doc)
(ezcurl (>= 0.2.3))
ocurl)
(synopsis "Collector client for opentelemetry, using http + ezcurl"))
(name opentelemetry-client-ocurl)
(depends
(ocaml
(>= "4.08"))
(mtime
(>= "1.4"))
; for spans
; atomic ; vendored
(opentelemetry
(= :version))
(pbrt
(>= 2.3))
(odoc :with-doc)
(ezcurl
(>= 0.2.3))
ocurl)
(synopsis "Collector client for opentelemetry, using http + ezcurl"))
(package
(name opentelemetry-cohttp-lwt)
(depends
(ocaml (>= "4.08"))
(opentelemetry (= :version))
(opentelemetry-lwt (= :version))
(ocaml
(>= "4.08"))
(opentelemetry
(= :version))
(opentelemetry-lwt
(= :version))
(odoc :with-doc)
(lwt (>= "5.3"))
(cohttp-lwt (>= "4.0.0")))
(lwt
(>= "5.3"))
(cohttp-lwt
(>= "4.0.0")))
(synopsis "Opentelemetry tracing for Cohttp HTTP servers"))
(package
(name opentelemetry-client-cohttp-lwt)
(depends
(ocaml (>= "4.08"))
(mtime (>= "1.4")) ; for spans
(opentelemetry (= :version))
(pbrt (>= 2.2))
(odoc :with-doc)
(lwt (>= "5.3"))
(lwt_ppx (>= "2.0"))
cohttp-lwt
cohttp-lwt-unix)
(synopsis "Collector client for opentelemetry, using cohttp + lwt"))
(name opentelemetry-client-cohttp-lwt)
(depends
(ocaml
(>= "4.08"))
(mtime
(>= "1.4"))
; for spans
(opentelemetry
(= :version))
(pbrt
(>= 2.2))
(odoc :with-doc)
(lwt
(>= "5.3"))
(lwt_ppx
(>= "2.0"))
cohttp-lwt
cohttp-lwt-unix)
(synopsis "Collector client for opentelemetry, using cohttp + lwt"))

View file

@ -11,6 +11,7 @@ bug-reports: "https://github.com/imandra-ai/ocaml-opentelemetry/issues"
depends: [
"dune" {>= "2.7"}
"ocaml" {>= "4.08"}
"ambient-context"
"opentelemetry" {= version}
"cohttp-lwt-unix" {with-test}
"odoc" {with-doc}

View file

@ -12,11 +12,16 @@ depends: [
"dune" {>= "2.7"}
"ocaml" {>= "4.08"}
"ptime"
"ambient-context"
"odoc" {with-doc}
"alcotest" {with-test}
"pbrt" {>= "2.3"}
"ocaml-lsp-server" {with-dev-setup}
"ocamlformat" {with-dev-setup & >= "0.24" & < "0.25"}
]
depopts: [
"trace" {>= "0.1"}
depopts: ["trace"]
conflicts: [
"trace" {< "0.4" | >= "0.5"}
]
build: [
["dune" "subst"] {dev}

View file

@ -2,6 +2,7 @@
(name opentelemetry_client_cohttp_lwt)
(public_name opentelemetry-client-cohttp-lwt)
(synopsis "Opentelemetry collector using cohttp+lwt+unix")
(preprocess (pps lwt_ppx))
(preprocess
(pps lwt_ppx))
(libraries opentelemetry lwt cohttp-lwt cohttp-lwt-unix pbrt mtime
mtime.clock.os))

View file

@ -557,7 +557,7 @@ end)
}
end
let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () =
debug_ := config.debug;
if config.url <> get_url () then set_url config.url;
@ -571,12 +571,17 @@ let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
end)
()
in
Opentelemetry.Collector.set_backend (module B);
(module B : OT.Collector.BACKEND)
let setup_ ?stop ?config () =
let backend = create_backend ?stop ?config () in
let (module B : OT.Collector.BACKEND) = backend in
OT.Collector.set_backend backend;
B.cleanup
let setup ?stop ?(config = Config.make ()) ?(enable = true) () =
let setup ?stop ?config ?(enable = true) () =
if enable then (
let cleanup = setup_ ?stop ~config () in
let cleanup = setup_ ?stop ?config () in
at_exit cleanup
)

View file

@ -18,6 +18,12 @@ val set_headers : (string * string) list -> unit
module Config = Config
val create_backend :
?stop:bool Atomic.t ->
?config:Config.t ->
unit ->
(module Opentelemetry.Collector.BACKEND)
val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.

View file

@ -341,7 +341,8 @@ end = struct
)
end
let mk_backend ~stop ~config () : (module Collector.BACKEND) =
let create_backend ?(stop = Atomic.make false)
?(config : Config.t = Config.make ()) () : (module Collector.BACKEND) =
let module M = struct
open Opentelemetry.Proto
open Opentelemetry.Collector
@ -444,8 +445,9 @@ let setup_ticker_thread ~stop ~sleep_ms (module B : Collector.BACKEND) () =
in
start_bg_thread tick_loop
let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
let ((module B) as backend) = mk_backend ~stop ~config () in
let setup_ ?(stop = Atomic.make false) ?(config : Config.t = Config.make ()) ()
=
let ((module B) as backend) = create_backend ~stop ~config () in
Opentelemetry.Collector.set_backend backend;
if config.url <> get_url () then set_url config.url;
@ -457,15 +459,15 @@ let setup_ ?(stop = Atomic.make false) ~(config : Config.t) () =
B.cleanup
let setup ?stop ?(config = Config.make ()) ?(enable = true) () =
let setup ?stop ?config ?(enable = true) () =
if enable then (
let cleanup = setup_ ?stop ~config () in
let cleanup = setup_ ?stop ?config () in
at_exit cleanup
)
let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f =
let with_setup ?stop ?config ?(enable = true) () f =
if enable then (
let cleanup = setup_ ?stop ~config () in
let cleanup = setup_ ?stop ?config () in
Fun.protect ~finally:cleanup f
) else
f ()

View file

@ -17,6 +17,12 @@ val set_headers : (string * string) list -> unit
module Atomic = Opentelemetry_atomic.Atomic
module Config = Config
val create_backend :
?stop:bool Atomic.t ->
?config:Config.t ->
unit ->
(module Opentelemetry.Collector.BACKEND)
val setup :
?stop:bool Atomic.t -> ?config:Config.t -> ?enable:bool -> unit -> unit
(** Setup endpoint. This modifies {!Opentelemetry.Collector.backend}.

View file

@ -2,7 +2,8 @@
(name opentelemetry)
(synopsis "API for opentelemetry instrumentation")
(flags :standard -warn-error -a+8)
(libraries ptime ptime.clock.os pbrt threads opentelemetry.atomic)
(libraries ambient-context ptime ptime.clock.os pbrt threads
opentelemetry.atomic)
(public_name opentelemetry))
; ### protobuf rules ###

View file

@ -12,45 +12,21 @@ module Metrics_callbacks = Metrics_callbacks
module Trace_context = Trace_context
module Trace = struct
open Proto.Trace
include Trace
(** Sync span guard *)
let with_ ?trace_state ?service_name ?(attrs = []) ?kind ?trace_id ?parent
?scope ?links name (f : Scope.t -> 'a Lwt.t) : 'a Lwt.t =
let trace_id =
match trace_id, scope with
| Some trace_id, _ -> trace_id
| None, Some scope -> scope.trace_id
| None, None -> Trace_id.create ()
in
let parent =
match parent, scope with
| Some span_id, _ -> Some span_id
| None, Some scope -> Some scope.span_id
| None, None -> None
in
let start_time = Timestamp_ns.now_unix_ns () in
let span_id = Span_id.create () in
let scope = { trace_id; span_id; events = []; attrs } in
let finally ok =
let status =
match ok with
| Ok () -> default_status ~code:Status_code_ok ()
| Error e -> default_status ~code:Status_code_error ~message:e ()
in
let span, _ =
Span.create ?kind ~trace_id ?parent ?links ~id:span_id ?trace_state
~attrs:scope.attrs ~events:scope.events ~start_time
~end_time:(Timestamp_ns.now_unix_ns ())
~status name
in
emit ?service_name [ span ]
let with_ ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind
?trace_id ?parent ?scope ?links name (cb : Scope.t -> 'a Lwt.t) : 'a Lwt.t
=
let thunk, finally =
with_' ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind
?trace_id ?parent ?scope ?links name cb
in
try%lwt
let* x = f scope in
let* rv = thunk () in
let () = finally (Ok ()) in
Lwt.return x
Lwt.return rv
with e ->
let () = finally (Error (Printexc.to_string e)) in
Lwt.fail e

View file

@ -1,7 +1,5 @@
(** Opentelemetry types and instrumentation *)
module Thread_local = Thread_local
module Lock = Lock
(** Global lock. *)
@ -149,6 +147,69 @@ module Collector = struct
type backend = (module BACKEND)
module Noop_backend : BACKEND = struct
let noop_sender _ ~ret = ret ()
let send_trace : Trace.resource_spans list sender = { send = noop_sender }
let send_metrics : Metrics.resource_metrics list sender =
{ send = noop_sender }
let send_logs : Logs.resource_logs list sender = { send = noop_sender }
let signal_emit_gc_metrics () = ()
let tick () = ()
let set_on_tick_callbacks _cbs = ()
let cleanup () = ()
end
module Debug_backend (B : BACKEND) : BACKEND = struct
open Proto
let send_trace : Trace.resource_spans list sender =
{
send =
(fun l ~ret ->
Format.eprintf "SPANS: %a@."
(Format.pp_print_list Trace.pp_resource_spans)
l;
B.send_trace.send l ~ret);
}
let send_metrics : Metrics.resource_metrics list sender =
{
send =
(fun l ~ret ->
Format.eprintf "METRICS: %a@."
(Format.pp_print_list Metrics.pp_resource_metrics)
l;
B.send_metrics.send l ~ret);
}
let send_logs : Logs.resource_logs list sender =
{
send =
(fun l ~ret ->
Format.eprintf "LOGS: %a@."
(Format.pp_print_list Logs.pp_resource_logs)
l;
B.send_logs.send l ~ret);
}
let signal_emit_gc_metrics () = B.signal_emit_gc_metrics ()
let tick () = B.tick ()
let set_on_tick_callbacks cbs = B.set_on_tick_callbacks cbs
let cleanup () = B.cleanup ()
end
let debug_backend : backend = (module Debug_backend (Noop_backend))
(* hidden *)
open struct
let on_tick_cbs_ = ref []
@ -195,6 +256,14 @@ module Collector = struct
match !backend with
| None -> ()
| Some (module B) -> B.tick ()
let with_setup_debug_backend b ?(enable = true) () f =
let (module B : BACKEND) = b in
if enable then (
set_backend b;
Fun.protect ~finally:B.cleanup f
) else
f ()
end
module Util_ = struct
@ -366,6 +435,7 @@ type value =
[ `Int of int
| `String of string
| `Bool of bool
| `Float of float
| `None
]
@ -379,6 +449,7 @@ let _conv_value =
| `Int i -> Some (Int_value (Int64.of_int i))
| `String s -> Some (String_value s)
| `Bool b -> Some (Bool_value b)
| `Float f -> Some (Double_value f)
| `None -> None
(**/**)
@ -521,26 +592,23 @@ module Scope = struct
if Collector.has_backend () then
scope.attrs <- List.rev_append (attrs ()) scope.attrs
(**/**)
(** The opaque key necessary to access/set the ambient scope with
{!Ambient_context}. *)
let ambient_scope_key : t Ambient_context.key = Ambient_context.create_key ()
let _global_scope : t Thread_local.t = Thread_local.create ()
(**/**)
(** Obtain current scope from thread-local storage, if available *)
let get_surrounding ?scope () : t option =
(** Obtain current scope from {!Ambient_context}, if available. *)
let get_ambient_scope ?scope () : t option =
match scope with
| Some _ -> scope
| None -> Thread_local.get _global_scope
| None -> Ambient_context.get ambient_scope_key
(** [with_scope sc f] calls [f()] in a context where [sc] is the
(thread)-local scope, then reverts to the previous local scope, if any. *)
let[@inline] with_scope (sc : t) (f : unit -> 'a) : 'a =
Thread_local.with_ _global_scope sc (fun _ -> f ())
end
(** [with_ambient_scope sc thunk] calls [thunk()] in a context where [sc] is
the (thread|continuation)-local scope, then reverts to the previous local
scope, if any.
open struct
let get_surrounding_scope = Scope.get_surrounding
@see <https://github.com/ELLIOTTCABLE/ocaml-ambient-context> ambient-context docs *)
let[@inline] with_ambient_scope (sc : t) (f : unit -> 'a) : 'a =
Ambient_context.with_binding ambient_scope_key sc (fun _ -> f ())
end
(** Span Link
@ -614,7 +682,13 @@ module Span : sig
val id : t -> Span_id.t
type key_value =
string * [ `Int of int | `String of string | `Bool of bool | `None ]
string
* [ `Int of int
| `String of string
| `Bool of bool
| `Float of float
| `None
]
val create :
?kind:kind ->
@ -651,7 +725,13 @@ end = struct
| Span_kind_consumer
type key_value =
string * [ `Int of int | `String of string | `Bool of bool | `None ]
string
* [ `Int of int
| `String of string
| `Bool of bool
| `Float of float
| `None
]
type nonrec status_code = status_status_code =
| Status_code_unset
@ -719,22 +799,14 @@ module Trace = struct
let add_attrs = Scope.add_attrs [@@deprecated "use Scope.add_attrs"]
(** Sync span guard.
@param force_new_trace_id if true (default false), the span will not use a
surrounding context, or [scope], or [trace_id], but will always
create a fresh new trace ID.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
let with_ ?(force_new_trace_id = false) ?trace_state ?service_name
let with_' ?(force_new_trace_id = false) ?trace_state ?service_name
?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope
?links name (f : Scope.t -> 'a) : 'a =
?links name cb =
let scope =
if force_new_trace_id then
None
else
get_surrounding_scope ?scope ()
Scope.get_ambient_scope ?scope ()
in
let trace_id =
match trace_id, scope with
@ -753,8 +825,6 @@ module Trace = struct
let start_time = Timestamp_ns.now_unix_ns () in
let span_id = Span_id.create () in
let scope = { trace_id; span_id; events = []; attrs } in
(* set global scope in this thread *)
Scope.with_scope scope @@ fun () ->
(* called once we're done, to emit a span *)
let finally res =
let status =
@ -773,10 +843,39 @@ module Trace = struct
in
emit ?service_name [ span ]
in
let thunk () =
(* set global scope in this thread *)
Scope.with_ambient_scope scope @@ fun () -> cb scope
in
thunk, finally
(** Sync span guard.
Notably, this includes {e implicit} scope-tracking: if called without a
[~scope] argument (or [~parent]/[~trace_id]), it will check in the
{!Ambient_context} for a surrounding environment, and use that as the
scope. Similarly, it uses {!Scope.with_ambient_scope} to {e set} a new
scope in the ambient context, so that any logically-nested calls to
{!with_} will use this span as their parent.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks.
@param force_new_trace_id if true (default false), the span will not use a
ambient scope, the [~scope] argument, nor [~trace_id], but will instead
always create fresh identifiers for this span *)
let with_ ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind
?trace_id ?parent ?scope ?links name (cb : Scope.t -> 'a) : 'a =
let thunk, finally =
with_' ?force_new_trace_id ?trace_state ?service_name ?attrs ?kind
?trace_id ?parent ?scope ?links name cb
in
try
let x = f scope in
let rv = thunk () in
finally (Ok ());
x
rv
with e ->
finally (Error (Printexc.to_string e));
raise e

View file

@ -1,100 +0,0 @@
module A = Opentelemetry_atomic.Atomic
type key = int
let[@inline] get_key_ () : key = Thread.id (Thread.self ())
module Key_map_ = Map.Make (struct
type t = key
let compare : t -> t -> int = compare
end)
type 'a t = 'a ref Key_map_.t A.t
(** The TLS variable is made of a global atomic reference
(which has very low contention: it's modified only when a
thread is started/stopped).
Inside that atomic variable, is a map from thread ID to a mutable [ref]
holding the actual data. Because this [ref] is only ever accessed
by the thread with this given ID, it's safe to modify. *)
let create () : _ t = A.make Key_map_.empty
let[@inline] get_exn (self : _ t) =
let m = A.get self in
let key = get_key_ () in
!(Key_map_.find key m)
let[@inline] get self = try Some (get_exn self) with Not_found -> None
(* remove reference for the key *)
let remove_ref_ self key : unit =
while
let m = A.get self in
let m' = Key_map_.remove key m in
not (A.compare_and_set self m m')
do
Thread.yield ()
done
let set_ref_ self key (r : _ ref) : unit =
while
let m = A.get self in
let m' = Key_map_.add key r m in
not (A.compare_and_set self m m')
do
Thread.yield ()
done
(* get or associate a reference to [key], and return it.
Also return a function to remove the reference if we just created it. *)
let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option =
try
let r = Key_map_.find key (A.get self) in
let old = !r in
r := v;
r, Some old
with Not_found ->
let r = ref v in
set_ref_ self key r;
r, None
let set (self : _ t) v : unit =
let key = get_key_ () in
let _, _ = get_or_create_ref_ self key ~v in
()
let remove (self : _ t) : unit =
let key = get_key_ () in
remove_ref_ self key
let get_or_create ~create (self : 'a t) : 'a =
let key = get_key_ () in
try
let r = Key_map_.find key (A.get self) in
!r
with Not_found ->
Gc.finalise (fun _ -> remove_ref_ self key) (Thread.self ());
let v = create () in
let r = ref v in
set_ref_ self key r;
v
let with_ self v f =
let key = get_key_ () in
let r, old = get_or_create_ref_ self key ~v in
let restore_ () : unit =
match old with
| None -> remove_ref_ self key
| Some old -> r := old
in
try
let res = f old in
restore_ ();
res
with e ->
restore_ ();
raise e

View file

@ -1,27 +0,0 @@
(** Thread/Domain local storage
This allows the creation of global state that is per-domain or per-thread.
*)
type 'a t
val create : unit -> 'a t
(** Create new storage *)
val get : 'a t -> 'a option
(** Get current value *)
val get_exn : 'a t -> 'a
(** Like {!get} but fails with an exception
@raise Not_found if no value was found *)
val set : 'a t -> 'a -> unit
val remove : _ t -> unit
val get_or_create : create:(unit -> 'a) -> 'a t -> 'a
val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b
(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where
[prev] is the value currently in [var] (if any), and
then restores the old value of [var] for this thread. *)

View file

@ -1,7 +1,6 @@
(library
(name opentelemetry_trace)
(public_name opentelemetry.trace)
(synopsis "Use opentelemetry as a collector for trace")
(optional)
(libraries trace opentelemetry))
(name opentelemetry_trace)
(public_name opentelemetry.trace)
(synopsis "Use opentelemetry as a collector for trace")
(optional)
(libraries ambient-context ambient-context.tls trace opentelemetry))

View file

@ -1,128 +1,241 @@
module Otel = Opentelemetry
module TLS = Otel.Thread_local
module Otrace = Trace (* ocaml-trace *)
module TLS = Ambient_context_tls.TLS
type span = Trace.span
module Well_known = struct
let spankind_key = "otrace.spankind"
(** Table indexed by Trace spans *)
module Span_tbl = Hashtbl.Make (struct
include Int64
let internal = `String "INTERNAL"
let hash : t -> int = Hashtbl.hash
end)
let server = `String "SERVER"
(** Per-thread set of active spans. *)
module Active_spans = struct
type span_begin = {
span_id: Otel.Span_id.t;
start_time: int64;
name: string;
data: (string * Trace.user_data) list;
__FILE__: string;
__LINE__: int;
new_scope: Otel.Scope.t;
old_scope: Otel.Scope.t option;
}
(** Information we get at the beginning of the span *)
let client = `String "CLIENT"
type t = { tbl: span_begin Span_tbl.t } [@@unboxed]
(** Storage for active spans *)
let producer = `String "PRODUCER"
let create () : t = { tbl = Span_tbl.create 8 }
let consumer = `String "CONSUMER"
let tls : t TLS.t = TLS.create ()
let spankind_of_string =
let open Otel.Span in
function
| "INTERNAL" -> Span_kind_internal
| "SERVER" -> Span_kind_server
| "CLIENT" -> Span_kind_client
| "PRODUCER" -> Span_kind_producer
| "CONSUMER" -> Span_kind_consumer
| _ -> Span_kind_unspecified
let[@inline] get () : t = TLS.get_or_create tls ~create
let otel_attrs_of_otrace_data data =
let kind : Otel.Span.kind ref = ref Otel.Span.Span_kind_unspecified in
let data =
List.filter_map
(function
| name, `String v when name = "otrace.spankind" ->
kind := spankind_of_string v;
None
| x -> Some x)
data
in
!kind, data
end
let conv_span_to_i64 (id : Otel.Span_id.t) : int64 =
let bs = Otel.Span_id.to_bytes id in
(* lucky that it coincides! *)
assert (Bytes.length bs = 8);
Bytes.get_int64_le bs 0
open Well_known
let span_of_i64 (id : int64) : Otel.Span_id.t =
let bs = Bytes.create 8 in
Bytes.set_int64_le bs 0 id;
Otel.Span_id.of_bytes bs
module Internal = struct
type span_begin = {
id: Otel.Span_id.t;
start_time: int64;
name: string;
data: (string * Otrace.user_data) list;
__FILE__: string;
__LINE__: int;
__FUNCTION__: string option;
trace_id: Otel.Trace_id.t;
scope: Otel.Scope.t;
parent_id: Otel.Span_id.t option;
parent_scope: Otel.Scope.t option;
}
let collector () : Trace.collector =
let module M = struct
let enter_span ?__FUNCTION__:_ ~__FILE__ ~__LINE__ ~data name : span =
let span_id = Otel.Span_id.create () in
let span = conv_span_to_i64 span_id in
module Active_span_tbl = Hashtbl.Make (struct
include Int64
let start_time = Otel.Timestamp_ns.now_unix_ns () in
let hash : t -> int = Hashtbl.hash
end)
let old_scope = Otel.Scope.get_surrounding () in
let trace_id =
match old_scope with
| None -> Otel.Trace_id.create ()
| Some sc -> sc.trace_id
in
(** Per-thread set of active spans. *)
module Active_spans = struct
type t = { tbl: span_begin Active_span_tbl.t } [@@unboxed]
let new_scope =
{ Otel.Scope.span_id; trace_id; events = []; attrs = [] }
in
TLS.set Otel.Scope._global_scope new_scope;
let create () : t = { tbl = Active_span_tbl.create 32 }
let active_spans = Active_spans.get () in
Span_tbl.add active_spans.tbl span
{
span_id;
start_time;
__FILE__;
__LINE__;
old_scope;
new_scope;
name;
data;
};
let tls : t TLS.t = TLS.create ()
span
let[@inline] get () : t = TLS.get_or_create tls ~create
end
let exit_span (span : span) : unit =
let active_spans = Active_spans.get () in
match Span_tbl.find_opt active_spans.tbl span with
| None -> () (* TODO: log warning *)
| Some
{
span_id;
start_time;
name;
__FILE__;
__LINE__;
new_scope;
old_scope;
data;
} ->
let end_time = Otel.Timestamp_ns.now_unix_ns () in
let otrace_of_otel (id : Otel.Span_id.t) : int64 =
let bs = Otel.Span_id.to_bytes id in
(* lucky that it coincides! *)
assert (Bytes.length bs = 8);
Bytes.get_int64_le bs 0
(* restore previous scope *)
(match old_scope with
| None -> TLS.remove Otel.Scope._global_scope
| Some sc -> TLS.set Otel.Scope._global_scope sc);
let otel_of_otrace (id : int64) : Otel.Span_id.t =
let bs = Bytes.create 8 in
Bytes.set_int64_le bs 0 id;
Otel.Span_id.of_bytes bs
let o_span : Otel.Span.t =
let attrs =
[ "file", `String __FILE__; "line", `Int __LINE__ ] @ data
in
Otel.Span.create ~trace_id:new_scope.trace_id ~id:span_id ~start_time
~end_time ~attrs name
|> fst
let enter_span' ?explicit_parent ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
=
let open Otel in
let otel_id = Span_id.create () in
let otrace_id = otrace_of_otel otel_id in
let parent_scope = Scope.get_ambient_scope () in
let trace_id =
match parent_scope with
| Some sc -> sc.trace_id
| None -> Trace_id.create ()
in
let parent_id =
match explicit_parent, parent_scope with
| Some p, _ -> Some (otel_of_otrace p)
| None, Some parent -> Some parent.span_id
| None, None -> None
in
let new_scope =
{ Scope.span_id = otel_id; trace_id; events = []; attrs = [] }
in
let start_time = Timestamp_ns.now_unix_ns () in
let sb =
{
id = otel_id;
start_time;
name;
data;
__FILE__;
__LINE__;
__FUNCTION__;
trace_id;
scope = new_scope;
parent_id;
parent_scope;
}
in
let active_spans = Active_spans.get () in
Active_span_tbl.add active_spans.tbl otrace_id sb;
otrace_id, sb
let exit_span' otrace_id
{
id = otel_id;
start_time;
name;
data;
__FILE__;
__LINE__;
__FUNCTION__;
trace_id;
scope = _;
parent_id;
parent_scope = _;
} =
let open Otel in
let active_spans = Active_spans.get () in
Active_span_tbl.remove active_spans.tbl otrace_id;
let end_time = Timestamp_ns.now_unix_ns () in
let kind, attrs = otel_attrs_of_otrace_data data in
let attrs =
match __FUNCTION__ with
| None ->
[ "code.filepath", `String __FILE__; "code.lineno", `Int __LINE__ ]
@ attrs
| Some __FUNCTION__ ->
let last_dot = String.rindex __FUNCTION__ '.' in
let module_path = String.sub __FUNCTION__ 0 last_dot in
let function_name =
String.sub __FUNCTION__ (last_dot + 1)
(String.length __FUNCTION__ - last_dot - 1)
in
[
"code.filepath", `String __FILE__;
"code.lineno", `Int __LINE__;
"code.function", `String function_name;
"code.namespace", `String module_path;
]
@ attrs
in
Span.create ~kind ~trace_id ?parent:parent_id ~id:otel_id ~start_time
~end_time ~attrs name
|> fst
Otel.Trace.emit [ o_span ];
module M = struct
let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name cb =
let otrace_id, sb =
enter_span' ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
in
Otel.Scope.with_ambient_scope sb.scope @@ fun () ->
let rv = cb otrace_id in
let otel_span = exit_span' otrace_id sb in
Otel.Trace.emit [ otel_span ];
rv
let enter_manual_span ~(parent : Otrace.explicit_span option) ~flavor:_
~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : Otrace.explicit_span =
let otrace_id, sb =
match parent with
| None -> enter_span' ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
| Some { span; _ } ->
enter_span' ~explicit_parent:span ~__FUNCTION__ ~__FILE__ ~__LINE__
~data name
in
let active_spans = Active_spans.get () in
Active_span_tbl.add active_spans.tbl otrace_id sb;
Otrace.{ span = otrace_id; meta = Meta_map.empty }
let exit_manual_span Otrace.{ span = otrace_id; _ } =
let active_spans = Active_spans.get () in
match Active_span_tbl.find_opt active_spans.tbl otrace_id with
| None ->
(* FIXME: some kind of error/debug logging *)
()
| Some sb ->
let otel_span = exit_span' otrace_id sb in
Otel.Trace.emit [ otel_span ]
let add_data_to_span otrace_id data =
let active_spans = Active_spans.get () in
match Active_span_tbl.find_opt active_spans.tbl otrace_id with
| None ->
(* FIXME: some kind of error/debug logging *)
()
| Some sb ->
Active_span_tbl.replace active_spans.tbl otrace_id
{ sb with data = sb.data @ data }
let add_data_to_manual_span Otrace.{ span = otrace_id; _ } data =
add_data_to_span otrace_id data
let message ?span ~data:_ msg : unit =
(* gather information from context *)
let old_scope = Otel.Scope.get_surrounding () in
let old_scope = Otel.Scope.get_ambient_scope () in
let trace_id = Option.map (fun sc -> sc.Otel.Scope.trace_id) old_scope in
let span_id =
match span with
| Some id -> Some (span_of_i64 id)
| Some id -> Some (otel_of_otrace id)
| None -> Option.map (fun sc -> sc.Otel.Scope.span_id) old_scope
in
@ -135,15 +248,19 @@ let collector () : Trace.collector =
let name_thread _name = ()
let counter_int name cur_val : unit =
let m = Otel.Metrics.(gauge ~name [ int cur_val ]) in
let counter_int ~data name cur_val : unit =
let _kind, attrs = otel_attrs_of_otrace_data data in
let m = Otel.Metrics.(gauge ~name [ int ~attrs cur_val ]) in
Otel.Metrics.emit [ m ]
let counter_float name cur_val : unit =
let m = Otel.Metrics.(gauge ~name [ float cur_val ]) in
let counter_float ~data name cur_val : unit =
let _kind, attrs = otel_attrs_of_otrace_data data in
let m = Otel.Metrics.(gauge ~name [ float ~attrs cur_val ]) in
Otel.Metrics.emit [ m ]
end in
(module M)
end
end
let collector () : Trace.collector = (module Internal.M)
let setup () = Trace.setup_collector @@ collector ()

View file

@ -1,8 +1,195 @@
val collector : unit -> Trace.collector
(** Make a Trace collector that uses the OTEL backend to send spans and logs *)
module Otel := Opentelemetry
module Otrace := Trace
module TLS := Ambient_context_tls.TLS
(** [opentelemetry.trace] implements a {!Trace_core.Collector} for
{{:https://v3.ocaml.org/p/trace} ocaml-trace}.
After installing this collector with {!setup}, you can consume libraries
that use [ocaml-trace], and they will automatically emit OpenTelemetry spans
and logs.
Both explicit scope (in the [_manual] functions such as [enter_manual_span])
and implicit scope (in {!Internal.M.with_span}, via {!Ambient_context}) are
supported; see the detailed notes on {!Internal.M.enter_manual_span}.
{1:wellknown Well-known identifiers}
Because [ocaml-trace]'s API is a subset of OpenTelemetry functionality, this
interface allows for a few 'well-known' identifiers to be used in
[Trace]-instrumented libraries that wish to further support OpenTelemetry
usage.
(These strings will not change in subsequent versions of this library, so
you do not need to depend on [opentelemetry.trace] to use them.)
- If a key of exactly ["otrace.spankind"] is included in the
{!Trace.user_data} passed to [with_span] et al., it will be used as the
{!Opentelemetry.Span.kind} of the emitted span. (See
{!Internal.spankind_of_string} for the list of supported values.)
{[ocaml
let describe () = [ Opentelemetry_trace.(spankind_key, client) ] in
Trace.with_span ~__FILE__ ~__LINE__ ~data:describe "my-span" @@ fun _ ->
(* ... *)
]}
*)
val setup : unit -> unit
(** Install the OTEL backend as a Trace collector *)
val setup_with_otel_backend : Opentelemetry.Collector.backend -> unit
(** Same as {!setup}, but also install the given backend as OTEL backend *)
val collector : unit -> Trace.collector
(** Make a Trace collector that uses the OTEL backend to send spans and logs *)
(** Static references for well-known identifiers; see {!label-wellknown}. *)
module Well_known : sig
val spankind_key : string
val internal : Otrace.user_data
val server : Otrace.user_data
val client : Otrace.user_data
val producer : Otrace.user_data
val consumer : Otrace.user_data
val spankind_of_string : string -> Otel.Span.kind
val otel_attrs_of_otrace_data :
(string * Otrace.user_data) list ->
Otel.Span.kind * Otel.Span.key_value list
end
(** Internal implementation details; do not consider these stable. *)
module Internal : sig
module M : sig
val with_span :
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
data:(string * Otrace.user_data) list ->
string (* span name *) ->
(Otrace.span -> 'a) ->
'a
(** Implements {!Trace_core.Collector.S.with_span}, with the OpenTelemetry
collector as the backend. Invoked via {!Trace.with_span}.
Notably, this has the same implicit-scope semantics as
{!Opentelemetry.Trace.with_}, and requires configuration of
{!Ambient_context}.
@see <https://github.com/ELLIOTTCABLE/ocaml-ambient-context> ambient-context docs *)
val enter_manual_span :
parent:Otrace.explicit_span option ->
flavor:'a ->
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
data:(string * Otrace.user_data) list ->
string (* span name *) ->
Otrace.explicit_span
(** Implements {!Trace_core.Collector.S.enter_manual_span}, with the OpenTelemetry
collector as the backend. Invoked at {!Trace.enter_manual_toplevel_span}
and {!Trace.enter_manual_sub_span}; requires an eventual call to
{!Trace.exit_manual_span}.
These 'manual span' functions {e do not} implement the same implicit-
scope semantics of {!with_span}; and thus don't need to wrap a single
stack-frame / callback; you can freely enter a span at any point, store
the returned {!Trace.explicit_span}, and exit it at any later point with
{!Trace.exit_manual_span}.
However, for that same reason, they also cannot update the
{!Ambient_context} that is, when you invoke the various [manual]
functions, if you then invoke other functions that use
{!Trace.with_span}, those callees {e will not} see the span you entered
manually as their [parent].
Generally, the best practice is to only use these [manual] functions at
the 'leaves' of your callstack: that is, don't invoke user callbacks
from within them; or if you do, make sure to pass the [explicit_span]
you recieve from this function onwards to the user callback, so they can create further
child-spans. *)
val exit_manual_span : Otrace.explicit_span -> unit
(** Implements {!Trace_core.Collector.S.exit_manual_span}, with the
OpenTelemetry collector as the backend. Invoked at
{!Trace.exit_manual_span}. Expects the [explicit_span] returned from an
earlier call to {!Trace.enter_manual_toplevel_span} or
{!Trace.enter_manual_sub_span}.
(See the notes at {!enter_manual_span} about {!Ambient_context}.) *)
val add_data_to_span :
Otrace.span -> (string * Otrace.user_data) list -> unit
val add_data_to_manual_span :
Otrace.explicit_span -> (string * Otrace.user_data) list -> unit
val message :
?span:Otrace.span ->
data:(string * Otrace.user_data) list ->
string ->
unit
val shutdown : unit -> unit
val name_process : string -> unit
val name_thread : string -> unit
val counter_int :
data:(string * Otrace.user_data) list -> string -> int -> unit
val counter_float :
data:(string * Otrace.user_data) list -> string -> float -> unit
end
type span_begin = {
id: Otel.Span_id.t;
start_time: int64;
name: string;
data: (string * Otrace.user_data) list;
__FILE__: string;
__LINE__: int;
__FUNCTION__: string option;
trace_id: Otel.Trace_id.t;
scope: Otel.Scope.t;
parent_id: Otel.Span_id.t option;
parent_scope: Otel.Scope.t option;
}
module Active_span_tbl : Hashtbl.S with type key = Otrace.span
(** Table indexed by ocaml-trace spans. *)
module Active_spans : sig
type t = private { tbl: span_begin Active_span_tbl.t } [@@unboxed]
val create : unit -> t
val tls : t TLS.t
val get : unit -> t
end
val otrace_of_otel : Otel.Span_id.t -> Otrace.span
val otel_of_otrace : Otrace.span -> Otel.Span_id.t
val enter_span' :
?explicit_parent:Otrace.span ->
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
data:(string * Otrace.user_data) list ->
string ->
Otrace.span * span_begin
val exit_span' : Otrace.span -> span_begin -> Otel.Span.t
end

View file

@ -6,8 +6,10 @@
(executable
(name emit1_cohttp)
(modules emit1_cohttp)
(preprocess (pps lwt_ppx))
(libraries unix opentelemetry opentelemetry-lwt opentelemetry-client-cohttp-lwt lwt.unix))
(preprocess
(pps lwt_ppx))
(libraries unix opentelemetry opentelemetry-lwt
opentelemetry-client-cohttp-lwt lwt.unix))
(executable
(name cohttp_client)

View file

@ -1,6 +1,4 @@
(tests
(names test_trace_context test_get_url)
(libraries
opentelemetry
opentelemetry-client-ocurl
opentelemetry-client-cohttp-lwt))
(libraries opentelemetry opentelemetry-client-ocurl
opentelemetry-client-cohttp-lwt))

View file

@ -0,0 +1,3 @@
(tests
(names test_implicit_scope_sync)
(libraries alcotest opentelemetry opentelemetry-client-cohttp-lwt))

View file

@ -0,0 +1,74 @@
open Alcotest
module Otel = Opentelemetry
let spans_emitted : Otel.Proto.Trace.resource_spans list ref = ref []
module Test_backend = struct
open Otel.Collector
open Otel.Proto
include Noop_backend
let record_emitted_spans (l : Trace.resource_spans list) ~ret =
spans_emitted := l @ !spans_emitted;
ret ()
let send_trace : Trace.resource_spans list sender =
{ send = record_emitted_spans }
end
let with_test_backend f =
(* uncomment for eprintf debugging: *)
(* let module Debug_and_test_backend = Otel.Collector.Debug_backend (Test_backend) in
let backend = (module Debug_and_test_backend : Otel.Collector.BACKEND) in *)
let backend = (module Test_backend : Otel.Collector.BACKEND) in
Otel.Collector.with_setup_debug_backend backend () f
let bytes_to_hex = Otel.Util_.bytes_to_hex
let test_stack_based_implicit_scope () =
let run () =
Otel.Trace.with_ "first trace" @@ fun _scope ->
Thread.delay 0.2;
Otel.Trace.with_ "second trace" @@ fun _scope ->
Thread.delay 0.2;
Otel.Trace.with_ "third trace" @@ fun _scope ->
Thread.delay 0.2;
()
in
with_test_backend @@ fun () ->
(* start *)
run ();
check' int ~msg:"count of spans emitted"
~actual:(List.length !spans_emitted)
~expected:3;
let open Otel.Proto.Trace in
let f prev_span_id { scope_spans; _ } =
Format.printf "\n%a@\n" (Format.pp_print_list pp_scope_spans) scope_spans;
check' int ~msg:"count of scope_spans in emitted span"
~actual:(List.length scope_spans) ~expected:1;
let { scope; spans; _ } = List.hd scope_spans in
check' bool ~msg:"scope exists in emitted span"
~actual:(Option.is_some scope) ~expected:true;
check' int ~msg:"count of spans in scope_span" ~actual:(List.length spans)
~expected:1;
let { name; trace_id; span_id; parent_span_id; _ } = List.hd spans in
Printf.printf
"name='%s' trace_id='%s' span_id='%s' parent_span_id='%s' \
prev_span_id='%s'\n"
name (bytes_to_hex trace_id) (bytes_to_hex span_id)
(bytes_to_hex parent_span_id)
(bytes_to_hex prev_span_id);
check' string ~msg:"previous span is parent"
~actual:(bytes_to_hex parent_span_id)
~expected:(bytes_to_hex prev_span_id);
span_id
in
List.fold_left f (Bytes.of_string "") !spans_emitted |> ignore
let suite =
[
test_case "stack-based implicit scope" `Quick
test_stack_based_implicit_scope;
]
let () = Alcotest.run "implicit scope" [ "sync", suite ]