mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-07 18:37:56 -05:00
feat trace: make it compile again, no TLS, no magic strings, pass exporter
This commit is contained in:
parent
c4e8f8c39b
commit
5804cd299b
6 changed files with 293 additions and 493 deletions
|
|
@ -1,6 +1,5 @@
|
|||
module Otel = Opentelemetry
|
||||
module OTEL = Opentelemetry
|
||||
module Otrace = Trace_core (* ocaml-trace *)
|
||||
module TSub = Trace_subscriber.Subscriber
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
|
||||
|
|
|
|||
|
|
@ -1,40 +1,40 @@
|
|||
open Common_
|
||||
|
||||
let[@inline] trace_id_of_otel (id : Otel.Trace_id.t) : Otrace.trace_id =
|
||||
if id == Otel.Trace_id.dummy then
|
||||
let[@inline] trace_id_of_otel (id : OTEL.Trace_id.t) : Otrace.trace_id =
|
||||
if id == OTEL.Trace_id.dummy then
|
||||
Otrace.Collector.dummy_trace_id
|
||||
else
|
||||
Bytes.unsafe_to_string (Otel.Trace_id.to_bytes id)
|
||||
Bytes.unsafe_to_string (OTEL.Trace_id.to_bytes id)
|
||||
|
||||
let[@inline] trace_id_to_otel (id : Otrace.trace_id) : Otel.Trace_id.t =
|
||||
let[@inline] trace_id_to_otel (id : Otrace.trace_id) : OTEL.Trace_id.t =
|
||||
if id == Otrace.Collector.dummy_trace_id then
|
||||
Otel.Trace_id.dummy
|
||||
OTEL.Trace_id.dummy
|
||||
else
|
||||
Otel.Trace_id.of_bytes @@ Bytes.unsafe_of_string id
|
||||
OTEL.Trace_id.of_bytes @@ Bytes.unsafe_of_string id
|
||||
|
||||
let[@inline] span_id_of_otel (id : Otel.Span_id.t) : Otrace.span =
|
||||
if id == Otel.Span_id.dummy then
|
||||
let[@inline] span_id_of_otel (id : OTEL.Span_id.t) : Otrace.span =
|
||||
if id == OTEL.Span_id.dummy then
|
||||
Otrace.Collector.dummy_span
|
||||
else
|
||||
Bytes.get_int64_le (Otel.Span_id.to_bytes id) 0
|
||||
Bytes.get_int64_le (OTEL.Span_id.to_bytes id) 0
|
||||
|
||||
let[@inline] span_id_to_otel (id : Otrace.span) : Otel.Span_id.t =
|
||||
let[@inline] span_id_to_otel (id : Otrace.span) : OTEL.Span_id.t =
|
||||
if id == Otrace.Collector.dummy_span then
|
||||
Otel.Span_id.dummy
|
||||
OTEL.Span_id.dummy
|
||||
else (
|
||||
let b = Bytes.create 8 in
|
||||
Bytes.set_int64_le b 0 id;
|
||||
Otel.Span_id.of_bytes b
|
||||
OTEL.Span_id.of_bytes b
|
||||
)
|
||||
|
||||
let[@inline] ctx_to_otel (self : Otrace.explicit_span_ctx) : Otel.Span_ctx.t =
|
||||
Otel.Span_ctx.make
|
||||
let[@inline] ctx_to_otel (self : Otrace.explicit_span_ctx) : OTEL.Span_ctx.t =
|
||||
OTEL.Span_ctx.make
|
||||
~trace_id:(trace_id_to_otel self.trace_id)
|
||||
~parent_id:(span_id_to_otel self.span)
|
||||
()
|
||||
|
||||
let[@inline] ctx_of_otel (ctx : Otel.Span_ctx.t) : Otrace.explicit_span_ctx =
|
||||
let[@inline] ctx_of_otel (ctx : OTEL.Span_ctx.t) : Otrace.explicit_span_ctx =
|
||||
{
|
||||
trace_id = trace_id_of_otel (Otel.Span_ctx.trace_id ctx);
|
||||
span = span_id_of_otel (Otel.Span_ctx.parent_id ctx);
|
||||
trace_id = trace_id_of_otel (OTEL.Span_ctx.trace_id ctx);
|
||||
span = span_id_of_otel (OTEL.Span_ctx.parent_id ctx);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,10 +3,11 @@
|
|||
(public_name opentelemetry.trace)
|
||||
(synopsis "Use opentelemetry as a collector for trace")
|
||||
(optional) ; trace
|
||||
(flags :standard -open Opentelemetry_util)
|
||||
(flags :standard -open Opentelemetry_util -open Opentelemetry_atomic)
|
||||
(libraries
|
||||
opentelemetry.ambient-context
|
||||
opentelemetry.util
|
||||
opentelemetry.core
|
||||
opentelemetry.atomic
|
||||
opentelemetry
|
||||
trace.core
|
||||
trace.subscriber))
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
open Common_
|
||||
module TLS = Thread_local_storage
|
||||
module Conv = Conv
|
||||
open Conv
|
||||
|
||||
let on_internal_error =
|
||||
|
|
@ -7,305 +7,288 @@ let on_internal_error =
|
|||
|
||||
module Extensions = struct
|
||||
type Otrace.extension_event +=
|
||||
| Ev_link_span of Otrace.explicit_span * Otrace.explicit_span
|
||||
| Ev_record_exn of Otrace.explicit_span * exn * Printexc.raw_backtrace
|
||||
| Ev_set_span_kind of Otrace.explicit_span * Otel.Span_kind.t
|
||||
| Ev_link_span of Otrace.explicit_span * OTEL.Span_ctx.t
|
||||
| Ev_record_exn of {
|
||||
sp: Otrace.explicit_span;
|
||||
exn: exn;
|
||||
bt: Printexc.raw_backtrace;
|
||||
}
|
||||
| Ev_set_span_kind of Otrace.explicit_span * OTEL.Span_kind.t
|
||||
end
|
||||
|
||||
open Extensions
|
||||
|
||||
(* use the fast, thread safe span table that relies on picos. *)
|
||||
module Span_tbl = Trace_subscriber.Span_tbl
|
||||
|
||||
(* TODO: subscriber
|
||||
type state = {
|
||||
foo: unit (* TODO: *)
|
||||
}
|
||||
|
||||
module Callbacks
|
||||
*)
|
||||
|
||||
let subscriber_of_exporter _ = assert false
|
||||
|
||||
let collector_of_exporter _ = assert false
|
||||
|
||||
module Internal = struct
|
||||
type span_begin = {
|
||||
start_time: int64;
|
||||
name: string;
|
||||
__FILE__: string;
|
||||
__LINE__: int;
|
||||
__FUNCTION__: string option;
|
||||
scope: Otel.Scope.t;
|
||||
parent: Otel.Span_ctx.t option;
|
||||
}
|
||||
type span_begin = { span: OTEL.Span.t } [@@unboxed]
|
||||
|
||||
module Active_span_tbl = Hashtbl.Make (struct
|
||||
include Int64
|
||||
module Active_span_tbl = Span_tbl
|
||||
|
||||
let hash : t -> int = Hashtbl.hash
|
||||
end)
|
||||
type state = { tbl: span_begin Active_span_tbl.t } [@@unboxed]
|
||||
|
||||
(** key to access a OTEL scope from an explicit span *)
|
||||
let k_explicit_scope : Otel.Scope.t Otrace.Meta_map.key =
|
||||
let create_state () : state = { tbl = Active_span_tbl.create () }
|
||||
|
||||
(** key to access a OTEL span (the current span) from a Trace_core
|
||||
explicit_span *)
|
||||
let k_explicit_span : OTEL.Span.t Otrace.Meta_map.key =
|
||||
Otrace.Meta_map.Key.create ()
|
||||
|
||||
(** Per-thread set of active spans. *)
|
||||
module Active_spans = struct
|
||||
type t = { tbl: span_begin Active_span_tbl.t } [@@unboxed]
|
||||
|
||||
let create () : t = { tbl = Active_span_tbl.create 32 }
|
||||
|
||||
let k_tls : t TLS.t = TLS.create ()
|
||||
|
||||
let[@inline] get () : t =
|
||||
try TLS.get_exn k_tls
|
||||
with TLS.Not_set ->
|
||||
let self = create () in
|
||||
TLS.set k_tls self;
|
||||
self
|
||||
end
|
||||
|
||||
let otrace_of_otel (id : Otel.Span_id.t) : int64 =
|
||||
let bs = Otel.Span_id.to_bytes id in
|
||||
let otrace_of_otel (id : OTEL.Span_id.t) : int64 =
|
||||
let bs = OTEL.Span_id.to_bytes id in
|
||||
(* lucky that it coincides! *)
|
||||
assert (Bytes.length bs = 8);
|
||||
Bytes.get_int64_le bs 0
|
||||
|
||||
let enter_span' ?(explicit_parent : Otrace.explicit_span_ctx option)
|
||||
~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name =
|
||||
let open Otel in
|
||||
let enter_span' (self : state)
|
||||
?(explicit_parent : Otrace.explicit_span_ctx option) ~__FUNCTION__
|
||||
~__FILE__ ~__LINE__ ~data name =
|
||||
let open OTEL in
|
||||
let otel_id = Span_id.create () in
|
||||
let otrace_id = otrace_of_otel otel_id in
|
||||
|
||||
let parent_scope = Scope.get_ambient_scope () in
|
||||
let trace_id =
|
||||
match parent_scope with
|
||||
| Some sc -> sc.trace_id
|
||||
| None -> Trace_id.create ()
|
||||
in
|
||||
let parent =
|
||||
match explicit_parent, parent_scope with
|
||||
let implicit_parent = OTEL.Ambient_span.get () in
|
||||
|
||||
let trace_id, parent_id =
|
||||
match explicit_parent, implicit_parent with
|
||||
| Some p, _ ->
|
||||
Some
|
||||
(Otel.Span_ctx.make ~trace_id ~parent_id:(span_id_to_otel p.span) ())
|
||||
| None, Some parent -> Some (Otel.Scope.to_span_ctx parent)
|
||||
| None, None -> None
|
||||
in
|
||||
|
||||
let new_scope = Otel.Scope.make ~trace_id ~span_id:otel_id ~attrs:data () in
|
||||
|
||||
let start_time = Timestamp_ns.now_unix_ns () in
|
||||
let sb =
|
||||
{
|
||||
start_time;
|
||||
name;
|
||||
__FILE__;
|
||||
__LINE__;
|
||||
__FUNCTION__;
|
||||
scope = new_scope;
|
||||
parent;
|
||||
}
|
||||
in
|
||||
|
||||
let active_spans = Active_spans.get () in
|
||||
Active_span_tbl.add active_spans.tbl otrace_id sb;
|
||||
|
||||
otrace_id, sb
|
||||
|
||||
let exit_span_
|
||||
{ start_time; name; __FILE__; __LINE__; __FUNCTION__; scope; parent } =
|
||||
let open Otel in
|
||||
let end_time = Timestamp_ns.now_unix_ns () in
|
||||
let kind = Scope.kind scope in
|
||||
let attrs = Scope.attrs scope in
|
||||
|
||||
let status : Span_status.t =
|
||||
match List.assoc_opt Well_known.status_error_key attrs with
|
||||
| Some (`String message) ->
|
||||
Span_status.make ~message ~code:Status_code_error
|
||||
| _ -> Span_status.make ~message:"" ~code:Status_code_ok
|
||||
let trace_id = p.trace_id |> Conv.trace_id_to_otel in
|
||||
let parent_id =
|
||||
try
|
||||
let sb = Active_span_tbl.find_exn self.tbl p.span in
|
||||
Some (OTEL.Span.id sb.span)
|
||||
with Not_found -> None
|
||||
in
|
||||
trace_id, parent_id
|
||||
| None, Some p -> Span.trace_id p, Some (Span.id p)
|
||||
| None, None -> Trace_id.create (), None
|
||||
in
|
||||
|
||||
let attrs =
|
||||
match __FUNCTION__ with
|
||||
| None ->
|
||||
[ "code.filepath", `String __FILE__; "code.lineno", `Int __LINE__ ]
|
||||
@ attrs
|
||||
| Some __FUNCTION__ ->
|
||||
let last_dot = String.rindex __FUNCTION__ '.' in
|
||||
let module_path = String.sub __FUNCTION__ 0 last_dot in
|
||||
let function_name =
|
||||
String.sub __FUNCTION__ (last_dot + 1)
|
||||
(String.length __FUNCTION__ - last_dot - 1)
|
||||
in
|
||||
("code.filepath", `String __FILE__)
|
||||
:: ("code.lineno", `Int __LINE__)
|
||||
:: data
|
||||
in
|
||||
|
||||
let start_time = Timestamp_ns.now_unix_ns () in
|
||||
let span : OTEL.Span.t =
|
||||
OTEL.Span.make ?parent:parent_id ~trace_id ~id:otel_id ~attrs name
|
||||
~start_time ~end_time:start_time
|
||||
in
|
||||
|
||||
let sb = { span } in
|
||||
|
||||
(match __FUNCTION__ with
|
||||
| Some __FUNCTION__ when OTEL.Span.is_not_dummy span ->
|
||||
let last_dot = String.rindex __FUNCTION__ '.' in
|
||||
let module_path = String.sub __FUNCTION__ 0 last_dot in
|
||||
let function_name =
|
||||
String.sub __FUNCTION__ (last_dot + 1)
|
||||
(String.length __FUNCTION__ - last_dot - 1)
|
||||
in
|
||||
Span.add_attrs span
|
||||
[
|
||||
"code.filepath", `String __FILE__;
|
||||
"code.lineno", `Int __LINE__;
|
||||
"code.function", `String function_name;
|
||||
"code.namespace", `String module_path;
|
||||
]
|
||||
@ attrs
|
||||
in
|
||||
| _ -> ());
|
||||
|
||||
let parent_id = Option.map Otel.Span_ctx.parent_id parent in
|
||||
Span.create ~kind ~trace_id:scope.trace_id ?parent:parent_id ~status
|
||||
~id:scope.span_id ~start_time ~end_time ~attrs
|
||||
~events:(Scope.events scope) name
|
||||
|> fst
|
||||
Active_span_tbl.add self.tbl otrace_id sb;
|
||||
|
||||
let exit_span' otrace_id otel_span_begin =
|
||||
let active_spans = Active_spans.get () in
|
||||
Active_span_tbl.remove active_spans.tbl otrace_id;
|
||||
otrace_id, sb
|
||||
|
||||
let exit_span_ { span } : OTEL.Span.t =
|
||||
let open OTEL in
|
||||
let end_time = Timestamp_ns.now_unix_ns () in
|
||||
Proto.Trace.span_set_end_time_unix_nano span end_time;
|
||||
span
|
||||
|
||||
let exit_span' (self : state) otrace_id otel_span_begin =
|
||||
Active_span_tbl.remove self.tbl otrace_id;
|
||||
exit_span_ otel_span_begin
|
||||
|
||||
let exit_span_from_id otrace_id =
|
||||
let active_spans = Active_spans.get () in
|
||||
match Active_span_tbl.find_opt active_spans.tbl otrace_id with
|
||||
| None -> None
|
||||
| Some otel_span_begin ->
|
||||
Active_span_tbl.remove active_spans.tbl otrace_id;
|
||||
(** Find the OTEL span corresponding to this Trace span *)
|
||||
let exit_span_from_id (self : state) otrace_id =
|
||||
match Active_span_tbl.find_exn self.tbl otrace_id with
|
||||
| exception Not_found -> None
|
||||
| otel_span_begin ->
|
||||
Active_span_tbl.remove self.tbl otrace_id;
|
||||
Some (exit_span_ otel_span_begin)
|
||||
|
||||
let[@inline] get_scope (span : Otrace.explicit_span) : Otel.Scope.t option =
|
||||
Otrace.Meta_map.find k_explicit_scope span.meta
|
||||
|
||||
module M = struct
|
||||
let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name cb =
|
||||
let otrace_id, sb =
|
||||
enter_span' ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
|
||||
in
|
||||
|
||||
Otel.Scope.with_ambient_scope sb.scope @@ fun () ->
|
||||
match cb otrace_id with
|
||||
| res ->
|
||||
let otel_span = exit_span' otrace_id sb in
|
||||
Otel.Trace.emit [ otel_span ];
|
||||
res
|
||||
| exception e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
|
||||
Otel.Scope.record_exception sb.scope e bt;
|
||||
let otel_span = exit_span' otrace_id sb in
|
||||
Otel.Trace.emit [ otel_span ];
|
||||
|
||||
Printexc.raise_with_backtrace e bt
|
||||
|
||||
let enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name :
|
||||
Trace_core.span =
|
||||
let otrace_id, _sb =
|
||||
enter_span' ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
|
||||
in
|
||||
(* NOTE: we cannot enter ambient scope in a disjoint way
|
||||
with the exit, because we only have [Ambient_context.with_binding],
|
||||
no [set_binding] *)
|
||||
otrace_id
|
||||
|
||||
let exit_span otrace_id =
|
||||
match exit_span_from_id otrace_id with
|
||||
| None -> ()
|
||||
| Some otel_span -> Otel.Trace.emit [ otel_span ]
|
||||
|
||||
let enter_manual_span ~(parent : Otrace.explicit_span_ctx option) ~flavor:_
|
||||
~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : Otrace.explicit_span =
|
||||
let otrace_id, sb =
|
||||
match parent with
|
||||
| None -> enter_span' ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
|
||||
| Some parent ->
|
||||
enter_span' ~explicit_parent:parent ~__FUNCTION__ ~__FILE__ ~__LINE__
|
||||
~data name
|
||||
in
|
||||
|
||||
let active_spans = Active_spans.get () in
|
||||
Active_span_tbl.add active_spans.tbl otrace_id sb;
|
||||
|
||||
Otrace.
|
||||
{
|
||||
span = otrace_id;
|
||||
trace_id = trace_id_of_otel sb.scope.trace_id;
|
||||
meta = Meta_map.(empty |> add k_explicit_scope sb.scope);
|
||||
}
|
||||
|
||||
let exit_manual_span Otrace.{ span = otrace_id; _ } =
|
||||
let active_spans = Active_spans.get () in
|
||||
match Active_span_tbl.find_opt active_spans.tbl otrace_id with
|
||||
| None -> !on_internal_error (spf "no active span with ID %Ld" otrace_id)
|
||||
| Some sb ->
|
||||
let otel_span = exit_span' otrace_id sb in
|
||||
Otel.Trace.emit [ otel_span ]
|
||||
|
||||
let add_data_to_span otrace_id data =
|
||||
let active_spans = Active_spans.get () in
|
||||
match Active_span_tbl.find_opt active_spans.tbl otrace_id with
|
||||
| None -> !on_internal_error (spf "no active span with ID %Ld" otrace_id)
|
||||
| Some sb -> Otel.Scope.add_attrs sb.scope (fun () -> data)
|
||||
|
||||
let add_data_to_manual_span (span : Otrace.explicit_span) data : unit =
|
||||
match get_scope span with
|
||||
| None ->
|
||||
!on_internal_error (spf "manual span does not a contain an OTEL scope")
|
||||
| Some scope -> Otel.Scope.add_attrs scope (fun () -> data)
|
||||
|
||||
let message ?span ~data:_ msg : unit =
|
||||
(* gather information from context *)
|
||||
let old_scope = Otel.Scope.get_ambient_scope () in
|
||||
let trace_id = Option.map (fun sc -> sc.Otel.Scope.trace_id) old_scope in
|
||||
|
||||
let span_id =
|
||||
match span with
|
||||
| Some id -> Some (span_id_to_otel id)
|
||||
| None -> Option.map (fun sc -> sc.Otel.Scope.span_id) old_scope
|
||||
in
|
||||
|
||||
let log = Otel.Logs.make_str ?trace_id ?span_id msg in
|
||||
Otel.Logs.emit [ log ]
|
||||
|
||||
let shutdown () = ()
|
||||
|
||||
let name_process _name = ()
|
||||
|
||||
let name_thread _name = ()
|
||||
|
||||
let counter_int ~data name cur_val : unit =
|
||||
let _kind, attrs = otel_attrs_of_otrace_data data in
|
||||
let m = Otel.Metrics.(gauge ~name [ int ~attrs cur_val ]) in
|
||||
Otel.Metrics.emit [ m ]
|
||||
|
||||
let counter_float ~data name cur_val : unit =
|
||||
let _kind, attrs = otel_attrs_of_otrace_data data in
|
||||
let m = Otel.Metrics.(gauge ~name [ float ~attrs cur_val ]) in
|
||||
Otel.Metrics.emit [ m ]
|
||||
|
||||
let extension_event = function
|
||||
| Ev_link_span (sp1, sp2) ->
|
||||
(match get_scope sp1, get_scope sp2 with
|
||||
| Some sc1, Some sc2 ->
|
||||
Otel.Scope.add_links sc1 (fun () -> [ Otel.Scope.to_span_link sc2 ])
|
||||
| _ -> !on_internal_error "could not find scope for OTEL span")
|
||||
| Ev_set_span_kind (sp, k) ->
|
||||
(match get_scope sp with
|
||||
| None -> !on_internal_error "could not find scope for OTEL span"
|
||||
| Some sc -> Otel.Scope.set_kind sc k)
|
||||
| Ev_record_exn (sp, exn, bt) ->
|
||||
(match get_scope sp with
|
||||
| None -> !on_internal_error "could not find scope for OTEL span"
|
||||
| Some sc -> Otel.Scope.record_exception sc exn bt)
|
||||
| _ -> ()
|
||||
end
|
||||
let[@inline] get_span_ (span : Otrace.explicit_span) : OTEL.Span.t option =
|
||||
Otrace.Meta_map.find k_explicit_span span.meta
|
||||
end
|
||||
|
||||
module type COLLECTOR_ARG = sig
|
||||
val exporter : OTEL.Exporter.t
|
||||
end
|
||||
|
||||
module Make_collector (A : COLLECTOR_ARG) = struct
|
||||
open Internal
|
||||
|
||||
let exporter = A.exporter
|
||||
|
||||
let state = create_state ()
|
||||
|
||||
let with_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name cb =
|
||||
let otrace_id, sb =
|
||||
enter_span' state ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
|
||||
in
|
||||
|
||||
match
|
||||
let@ () = OTEL.Ambient_span.with_ambient sb.span in
|
||||
cb otrace_id
|
||||
with
|
||||
| res ->
|
||||
let otel_span = exit_span' state otrace_id sb in
|
||||
OTEL.Exporter.send_trace exporter [ otel_span ];
|
||||
res
|
||||
| exception e ->
|
||||
let bt = Printexc.get_raw_backtrace () in
|
||||
|
||||
OTEL.Span.record_exception sb.span e bt;
|
||||
let otel_span = exit_span' state otrace_id sb in
|
||||
OTEL.Exporter.send_trace exporter [ otel_span ];
|
||||
|
||||
Printexc.raise_with_backtrace e bt
|
||||
|
||||
let enter_span ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : Trace_core.span
|
||||
=
|
||||
let otrace_id, _sb =
|
||||
enter_span' state ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
|
||||
in
|
||||
(* NOTE: we cannot enter ambient scope in a disjoint way
|
||||
with the exit, because we only have [Ambient_context.with_binding],
|
||||
no [set_binding] *)
|
||||
otrace_id
|
||||
|
||||
let exit_span otrace_id =
|
||||
match exit_span_from_id state otrace_id with
|
||||
| None -> ()
|
||||
| Some otel_span -> OTEL.Exporter.send_trace exporter [ otel_span ]
|
||||
|
||||
let enter_manual_span ~(parent : Otrace.explicit_span_ctx option) ~flavor:_
|
||||
~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name : Otrace.explicit_span =
|
||||
let otrace_id, sb =
|
||||
match parent with
|
||||
| None -> enter_span' state ~__FUNCTION__ ~__FILE__ ~__LINE__ ~data name
|
||||
| Some parent ->
|
||||
enter_span' state ~explicit_parent:parent ~__FUNCTION__ ~__FILE__
|
||||
~__LINE__ ~data name
|
||||
in
|
||||
|
||||
Active_span_tbl.add state.tbl otrace_id sb;
|
||||
|
||||
{
|
||||
Otrace.span = otrace_id;
|
||||
trace_id = trace_id_of_otel (OTEL.Span.trace_id sb.span);
|
||||
meta = Otrace.Meta_map.(empty |> add k_explicit_span sb.span);
|
||||
}
|
||||
|
||||
let exit_manual_span { Otrace.span = otrace_id; _ } =
|
||||
match Active_span_tbl.find_exn state.tbl otrace_id with
|
||||
| exception Not_found ->
|
||||
!on_internal_error (spf "no active span with ID %Ld" otrace_id)
|
||||
| sb ->
|
||||
let otel_span = exit_span' state otrace_id sb in
|
||||
OTEL.Exporter.send_trace exporter [ otel_span ]
|
||||
|
||||
let add_data_to_span otrace_id data =
|
||||
match Active_span_tbl.find_exn state.tbl otrace_id with
|
||||
| exception Not_found ->
|
||||
!on_internal_error (spf "no active span with ID %Ld" otrace_id)
|
||||
| sb -> OTEL.Span.add_attrs sb.span data
|
||||
|
||||
let add_data_to_manual_span (span : Otrace.explicit_span) data : unit =
|
||||
match get_span_ span with
|
||||
| None ->
|
||||
!on_internal_error (spf "manual span does not a contain an OTEL scope")
|
||||
| Some span -> OTEL.Span.add_attrs span data
|
||||
|
||||
let message ?span ~data:_ msg : unit =
|
||||
(* gather information from context *)
|
||||
let old_span = OTEL.Ambient_span.get () in
|
||||
let trace_id = Option.map OTEL.Span.trace_id old_span in
|
||||
|
||||
let span_id =
|
||||
match span with
|
||||
| Some id -> Some (span_id_to_otel id)
|
||||
| None -> Option.map OTEL.Span.id old_span
|
||||
in
|
||||
|
||||
let log = OTEL.Log_record.make_str ?trace_id ?span_id msg in
|
||||
OTEL.Exporter.send_logs exporter [ log ]
|
||||
|
||||
let shutdown () = ()
|
||||
|
||||
let name_process _name = ()
|
||||
|
||||
let name_thread _name = ()
|
||||
|
||||
let counter_int ~data:attrs name cur_val : unit =
|
||||
let m = OTEL.Metrics.(gauge ~name [ int ~attrs cur_val ]) in
|
||||
OTEL.Exporter.send_metrics exporter [ m ]
|
||||
|
||||
let counter_float ~data:attrs name cur_val : unit =
|
||||
let m = OTEL.Metrics.(gauge ~name [ float ~attrs cur_val ]) in
|
||||
OTEL.Exporter.send_metrics exporter [ m ]
|
||||
|
||||
let extension_event = function
|
||||
| Ev_link_span (sp1, sc2) ->
|
||||
(match get_span_ sp1 with
|
||||
| Some sc1 -> OTEL.Span.add_links sc1 [ OTEL.Span_link.of_span_ctx sc2 ]
|
||||
| _ -> !on_internal_error "could not find scope for OTEL span")
|
||||
| Ev_set_span_kind (sp, k) ->
|
||||
(match get_span_ sp with
|
||||
| None -> !on_internal_error "could not find scope for OTEL span"
|
||||
| Some sc -> OTEL.Span.set_kind sc k)
|
||||
| Ev_record_exn { sp; exn; bt } ->
|
||||
(match get_span_ sp with
|
||||
| None -> !on_internal_error "could not find scope for OTEL span"
|
||||
| Some sc -> OTEL.Span.record_exception sc exn bt)
|
||||
| _ -> ()
|
||||
end
|
||||
|
||||
let collector_of_exporter (exp : OTEL.Exporter.t) : Trace_core.collector =
|
||||
let module M = Make_collector (struct
|
||||
let exporter = exp
|
||||
end) in
|
||||
(module M : Trace_core.Collector.S)
|
||||
|
||||
let link_span_to_otel_ctx (sp1 : Otrace.explicit_span) (sp2 : OTEL.Span_ctx.t) :
|
||||
unit =
|
||||
if Otrace.enabled () then Otrace.extension_event @@ Ev_link_span (sp1, sp2)
|
||||
|
||||
(*
|
||||
let link_spans (sp1 : Otrace.explicit_span) (sp2 : Otrace.explicit_span) : unit
|
||||
=
|
||||
if Otrace.enabled () then Otrace.extension_event @@ Ev_link_span (sp1, sp2)
|
||||
*)
|
||||
|
||||
let set_span_kind sp k : unit =
|
||||
if Otrace.enabled () then Otrace.extension_event @@ Ev_set_span_kind (sp, k)
|
||||
|
||||
let record_exception sp exn bt : unit =
|
||||
if Otrace.enabled () then Otrace.extension_event @@ Ev_record_exn (sp, exn, bt)
|
||||
if Otrace.enabled () then
|
||||
Otrace.extension_event @@ Ev_record_exn { sp; exn; bt }
|
||||
|
||||
let collector () : Otrace.collector = (module Internal.M)
|
||||
(** Collector that forwards to the {b currently installed} OTEL exporter. *)
|
||||
let collector_main_otel_exporter () : Otrace.collector =
|
||||
collector_of_exporter OTEL.Main_exporter.dynamic_forward_to_main_exporter
|
||||
|
||||
let setup () = Otrace.setup_collector @@ collector ()
|
||||
let (collector
|
||||
[@deprecated "use collector_of_exporter or collector_main_otel_exporter"])
|
||||
=
|
||||
collector_main_otel_exporter
|
||||
|
||||
let setup_with_otel_backend b : unit =
|
||||
Otel.Collector.set_backend b;
|
||||
setup ()
|
||||
let setup () = Otrace.setup_collector @@ collector_main_otel_exporter ()
|
||||
|
||||
let setup_with_otel_exporter exp : unit =
|
||||
let coll = collector_of_exporter exp in
|
||||
OTEL.Main_exporter.set exp;
|
||||
Otrace.setup_collector coll
|
||||
|
||||
let setup_with_otel_backend = setup_with_otel_exporter
|
||||
|
||||
module Well_known = struct end
|
||||
|
|
|
|||
|
|
@ -18,39 +18,37 @@
|
|||
(* ... *)
|
||||
]} *)
|
||||
|
||||
module Otel := Opentelemetry
|
||||
module OTEL := Opentelemetry_core
|
||||
module Otrace := Trace_core
|
||||
module TLS := Thread_local_storage
|
||||
|
||||
(** Conversions between [Opentelemetry] and [Trace_core] types *)
|
||||
module Conv : sig
|
||||
val trace_id_of_otel : Otel.Trace_id.t -> string
|
||||
val trace_id_of_otel : OTEL.Trace_id.t -> string
|
||||
|
||||
val trace_id_to_otel : string -> Otel.Trace_id.t
|
||||
val trace_id_to_otel : string -> OTEL.Trace_id.t
|
||||
|
||||
val span_id_of_otel : Otel.Span_id.t -> int64
|
||||
val span_id_of_otel : OTEL.Span_id.t -> int64
|
||||
|
||||
val span_id_to_otel : int64 -> Otel.Span_id.t
|
||||
val span_id_to_otel : int64 -> OTEL.Span_id.t
|
||||
|
||||
val ctx_to_otel : Otrace.explicit_span_ctx -> Otel.Span_ctx.t
|
||||
val ctx_to_otel : Otrace.explicit_span_ctx -> OTEL.Span_ctx.t
|
||||
|
||||
val ctx_of_otel : Otel.Span_ctx.t -> Otrace.explicit_span_ctx
|
||||
val ctx_of_otel : OTEL.Span_ctx.t -> Otrace.explicit_span_ctx
|
||||
end
|
||||
|
||||
(** The extension events for {!Trace_core}. *)
|
||||
module Extensions : sig
|
||||
type Otrace.extension_event +=
|
||||
| Ev_link_span of Otrace.explicit_span * Otrace.explicit_span_ctx
|
||||
| Ev_link_span of Otrace.explicit_span * OTEL.Span_ctx.t
|
||||
(** Link the given span to the given context. The context isn't the
|
||||
parent, but the link can be used to correlate both spans. *)
|
||||
| Ev_record_exn of {
|
||||
sp: Otrace.span;
|
||||
sp: Otrace.explicit_span;
|
||||
exn: exn;
|
||||
bt: Printexc.raw_backtrace;
|
||||
error: bool; (** Is this an actual internal error? *)
|
||||
}
|
||||
(** Record exception and potentially turn span to an error *)
|
||||
| Ev_set_span_kind of Otrace.span * Otel.Span_kind.t
|
||||
| Ev_set_span_kind of Otrace.explicit_span * OTEL.Span_kind.t
|
||||
end
|
||||
|
||||
val on_internal_error : (string -> unit) ref
|
||||
|
|
@ -59,25 +57,38 @@ val on_internal_error : (string -> unit) ref
|
|||
val setup : unit -> unit
|
||||
(** Install the OTEL backend as a Trace collector *)
|
||||
|
||||
val setup_with_otel_exporter : #Opentelemetry.Exporter.t -> unit
|
||||
val setup_with_otel_exporter : OTEL.Exporter.t -> unit
|
||||
(** Same as {!setup}, but using the given exporter *)
|
||||
|
||||
val setup_with_otel_backend : #Opentelemetry.Exporter.t -> unit
|
||||
val setup_with_otel_backend : OTEL.Exporter.t -> unit
|
||||
[@@deprecated "use setup_with_otel_exporter"]
|
||||
|
||||
val subscriber_of_exporter : #Otel.Exporter.t -> Trace_subscriber.t
|
||||
(* TODO: subscriber, with the next gen of Trace_subscriber
|
||||
that allows us to provide [new_trace_id] so we can produce 16B trace IDs.
|
||||
val subscriber_of_exporter : OTEL.Exporter.t -> Trace_subscriber.t
|
||||
*)
|
||||
|
||||
val collector_of_exporter : #Otel.Exporter.t -> Trace_core.collector
|
||||
val collector_of_exporter : OTEL.Exporter.t -> Trace_core.collector
|
||||
|
||||
val collector : unit -> Trace_core.collector
|
||||
[@@deprecated "use collector_of_exporter, avoid global state"]
|
||||
(** Make a Trace collector that uses the OTEL backend to send spans and logs *)
|
||||
|
||||
(* NOTE: we cannot be sure that [sc2] is still alive and findable
|
||||
in the active spans table. We could provide this operation under
|
||||
the explicit precondition that it is?
|
||||
|
||||
val link_spans : Otrace.explicit_span -> Otrace.explicit_span -> unit
|
||||
(** [link_spans sp1 sp2] modifies [sp1] by adding a span link to [sp2].
|
||||
@since 0.11 *)
|
||||
*)
|
||||
|
||||
val set_span_kind : Otrace.explicit_span -> Otel.Span.kind -> unit
|
||||
val link_span_to_otel_ctx : Otrace.explicit_span -> OTEL.Span_ctx.t -> unit
|
||||
(** [link_spans sp1 sp_ctx2] modifies [sp1] by adding a span link to [sp_ctx2].
|
||||
It must be the case that [sp1] is a currently active span.
|
||||
@since NEXT_RELEASE *)
|
||||
|
||||
val set_span_kind : Otrace.explicit_span -> OTEL.Span.kind -> unit
|
||||
(** [set_span_kind sp k] sets the span's kind.
|
||||
@since 0.11 *)
|
||||
|
||||
|
|
@ -86,24 +97,6 @@ val record_exception :
|
|||
(** Record exception in the current span.
|
||||
@since 0.11 *)
|
||||
|
||||
(** Static references for well-known identifiers; see {!label-wellknown}. *)
|
||||
module Well_known : sig
|
||||
val spankind_key : string
|
||||
|
||||
val internal : Otrace.user_data
|
||||
|
||||
val server : Otrace.user_data
|
||||
|
||||
val client : Otrace.user_data
|
||||
|
||||
val producer : Otrace.user_data
|
||||
|
||||
val consumer : Otrace.user_data
|
||||
|
||||
val spankind_of_string : string -> Otel.Span.kind
|
||||
|
||||
val otel_attrs_of_otrace_data :
|
||||
(string * Otrace.user_data) list ->
|
||||
Otel.Span.kind * Otel.Span.key_value list
|
||||
end
|
||||
[@@deprecated "use the regular functions for this"]
|
||||
module Well_known : sig end
|
||||
[@@deprecated
|
||||
"use the regular functions such as `link_spans` or `set_span_kind` for this"]
|
||||
|
|
|
|||
|
|
@ -1,176 +0,0 @@
|
|||
open Common_
|
||||
open Trace_core
|
||||
module Span_tbl = Trace_subscriber.Span_tbl
|
||||
|
||||
module Buf_pool = struct
|
||||
type t = Buffer.t Rpool.t
|
||||
|
||||
let create ?(max_size = 32) ?(buf_size = 256) () : t =
|
||||
Rpool.create ~max_size ~clear:Buffer.reset
|
||||
~create:(fun () -> Buffer.create buf_size)
|
||||
()
|
||||
end
|
||||
|
||||
open struct
|
||||
let[@inline] time_us_of_time_ns (t : int64) : float =
|
||||
Int64.div t 1_000L |> Int64.to_float
|
||||
|
||||
let[@inline] int64_of_trace_id_ (id : Trace_core.trace_id) : int64 =
|
||||
if id == Trace_core.Collector.dummy_trace_id then
|
||||
0L
|
||||
else
|
||||
Bytes.get_int64_le (Bytes.unsafe_of_string id) 0
|
||||
end
|
||||
|
||||
let on_tracing_error = ref (fun s -> Printf.eprintf "%s\n%!" s)
|
||||
|
||||
type span_info = {
|
||||
tid: int;
|
||||
name: string;
|
||||
start_us: float;
|
||||
mutable data: (string * Sub.user_data) list;
|
||||
(* NOTE: thread safety: this is supposed to only be modified by the thread
|
||||
that's running this (synchronous, stack-abiding) span. *)
|
||||
}
|
||||
(** Information we store about a span begin event, to emit a complete event when
|
||||
we meet the corresponding span end event *)
|
||||
|
||||
type t = {
|
||||
active: bool A.t;
|
||||
pid: int;
|
||||
spans: span_info Span_tbl.t;
|
||||
buf_pool: Buf_pool.t;
|
||||
exporter: Exporter.t;
|
||||
}
|
||||
(** Subscriber state *)
|
||||
|
||||
open struct
|
||||
let print_non_closed_spans_warning spans =
|
||||
let module Str_set = Set.Make (String) in
|
||||
let spans = Span_tbl.to_list spans in
|
||||
if spans <> [] then (
|
||||
!on_tracing_error
|
||||
@@ Printf.sprintf "trace-tef: warning: %d spans were not closed"
|
||||
(List.length spans);
|
||||
let names =
|
||||
List.fold_left
|
||||
(fun set (_, span) -> Str_set.add span.name set)
|
||||
Str_set.empty spans
|
||||
in
|
||||
Str_set.iter
|
||||
(fun name ->
|
||||
!on_tracing_error @@ Printf.sprintf " span %S was not closed" name)
|
||||
names;
|
||||
flush stderr
|
||||
)
|
||||
end
|
||||
|
||||
let close (self : t) : unit =
|
||||
if A.exchange self.active false then (
|
||||
print_non_closed_spans_warning self.spans;
|
||||
self.exporter.close ()
|
||||
)
|
||||
|
||||
let[@inline] active self = A.get self.active
|
||||
|
||||
let[@inline] flush (self : t) : unit = self.exporter.flush ()
|
||||
|
||||
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
|
||||
{ active = A.make true; exporter; buf_pool; pid; spans = Span_tbl.create () }
|
||||
|
||||
module Callbacks = struct
|
||||
type st = t
|
||||
|
||||
let on_init _ ~time_ns:_ = ()
|
||||
|
||||
let on_shutdown (self : st) ~time_ns:_ = close self
|
||||
|
||||
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_name_process ~pid:self.pid ~name buf;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
|
||||
self.exporter.on_json buf
|
||||
|
||||
(* add function name, if provided, to the metadata *)
|
||||
let add_fun_name_ fun_name data : _ list =
|
||||
match fun_name with
|
||||
| None -> data
|
||||
| Some f -> ("function", Sub.U_string f) :: data
|
||||
|
||||
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
let data = add_fun_name_ fun_name data in
|
||||
let info = { tid; name; start_us = time_us; data } in
|
||||
(* save the span so we find it at exit *)
|
||||
Span_tbl.add self.spans span info
|
||||
|
||||
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
|
||||
match Span_tbl.find_exn self.spans span with
|
||||
| exception Not_found ->
|
||||
!on_tracing_error
|
||||
(Printf.sprintf "trace-tef: error: cannot find span %Ld" span)
|
||||
| { tid; name; start_us; data } ->
|
||||
Span_tbl.remove self.spans span;
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_duration_event buf ~pid:self.pid ~tid ~name ~start:start_us
|
||||
~end_:time_us ~args:data;
|
||||
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_add_data (self : st) ~data span =
|
||||
if data <> [] then (
|
||||
try
|
||||
let info = Span_tbl.find_exn self.spans span in
|
||||
info.data <- List.rev_append data info.data
|
||||
with Not_found ->
|
||||
!on_tracing_error
|
||||
(Printf.sprintf "trace-tef: error: cannot find span %Ld" span)
|
||||
)
|
||||
|
||||
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
|
||||
~args:data;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_counter (self : st) ~time_ns ~tid ~data:_ ~name n : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_enter_manual_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
|
||||
~__LINE__:_ ~time_ns ~tid ~parent:_ ~data ~name ~flavor ~trace_id _span :
|
||||
unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
|
||||
let data = add_fun_name_ fun_name data in
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_manual_begin buf ~pid:self.pid ~tid ~name
|
||||
~id:(int64_of_trace_id_ trace_id)
|
||||
~ts:time_us ~args:data ~flavor;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor
|
||||
~trace_id (_ : span) : unit =
|
||||
let time_us = time_us_of_time_ns @@ time_ns in
|
||||
|
||||
let@ buf = Rpool.with_ self.buf_pool in
|
||||
Writer.emit_manual_end buf ~pid:self.pid ~tid ~name
|
||||
~id:(int64_of_trace_id_ trace_id)
|
||||
~ts:time_us ~flavor ~args:data;
|
||||
self.exporter.on_json buf
|
||||
|
||||
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
|
||||
end
|
||||
|
||||
let subscriber (self : t) : Sub.t =
|
||||
Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }
|
||||
Loading…
Add table
Reference in a new issue