mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-07 18:37:56 -05:00
store: Extract thread_local, abstract for lwt/eio
This commit is contained in:
parent
83b9837778
commit
a64565f104
9 changed files with 30 additions and 214 deletions
|
|
@ -17,6 +17,7 @@
|
||||||
(depends
|
(depends
|
||||||
(ocaml (>= "4.08"))
|
(ocaml (>= "4.08"))
|
||||||
ptime
|
ptime
|
||||||
|
ambient-context
|
||||||
(odoc :with-doc)
|
(odoc :with-doc)
|
||||||
(pbrt (>= 2.3)))
|
(pbrt (>= 2.3)))
|
||||||
(depopts
|
(depopts
|
||||||
|
|
@ -29,6 +30,7 @@
|
||||||
(synopsis "Lwt-compatible instrumentation for https://opentelemetry.io")
|
(synopsis "Lwt-compatible instrumentation for https://opentelemetry.io")
|
||||||
(depends
|
(depends
|
||||||
(ocaml (>= "4.08"))
|
(ocaml (>= "4.08"))
|
||||||
|
ambient-context
|
||||||
(opentelemetry (= :version))
|
(opentelemetry (= :version))
|
||||||
(cohttp-lwt-unix :with-test)
|
(cohttp-lwt-unix :with-test)
|
||||||
(odoc :with-doc)
|
(odoc :with-doc)
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ bug-reports: "https://github.com/imandra-ai/ocaml-opentelemetry/issues"
|
||||||
depends: [
|
depends: [
|
||||||
"dune" {>= "2.7"}
|
"dune" {>= "2.7"}
|
||||||
"ocaml" {>= "4.08"}
|
"ocaml" {>= "4.08"}
|
||||||
|
"ambient-context"
|
||||||
"opentelemetry" {= version}
|
"opentelemetry" {= version}
|
||||||
"cohttp-lwt-unix" {with-test}
|
"cohttp-lwt-unix" {with-test}
|
||||||
"odoc" {with-doc}
|
"odoc" {with-doc}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ depends: [
|
||||||
"dune" {>= "2.7"}
|
"dune" {>= "2.7"}
|
||||||
"ocaml" {>= "4.08"}
|
"ocaml" {>= "4.08"}
|
||||||
"ptime"
|
"ptime"
|
||||||
|
"ambient-context"
|
||||||
"odoc" {with-doc}
|
"odoc" {with-doc}
|
||||||
"pbrt" {>= "2.3"}
|
"pbrt" {>= "2.3"}
|
||||||
]
|
]
|
||||||
|
|
|
||||||
2
src/dune
2
src/dune
|
|
@ -2,7 +2,7 @@
|
||||||
(name opentelemetry)
|
(name opentelemetry)
|
||||||
(synopsis "API for opentelemetry instrumentation")
|
(synopsis "API for opentelemetry instrumentation")
|
||||||
(flags :standard -warn-error -a+8)
|
(flags :standard -warn-error -a+8)
|
||||||
(libraries ptime ptime.clock.os pbrt threads opentelemetry.atomic)
|
(libraries ambient-context ptime ptime.clock.os pbrt threads opentelemetry.atomic)
|
||||||
(public_name opentelemetry))
|
(public_name opentelemetry))
|
||||||
|
|
||||||
; ### protobuf rules ###
|
; ### protobuf rules ###
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,5 @@
|
||||||
(** Opentelemetry types and instrumentation *)
|
(** Opentelemetry types and instrumentation *)
|
||||||
|
|
||||||
module Thread_local = Thread_local
|
|
||||||
|
|
||||||
module Lock = Lock
|
module Lock = Lock
|
||||||
(** Global lock. *)
|
(** Global lock. *)
|
||||||
|
|
||||||
|
|
@ -522,7 +520,7 @@ module Scope = struct
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
||||||
let _global_scope : t Thread_local.t = Thread_local.create ()
|
let _ambient_scope : t Ambient_context.key = Ambient_context.create_key ()
|
||||||
|
|
||||||
(**/**)
|
(**/**)
|
||||||
|
|
||||||
|
|
@ -530,12 +528,12 @@ module Scope = struct
|
||||||
let get_surrounding ?scope () : t option =
|
let get_surrounding ?scope () : t option =
|
||||||
match scope with
|
match scope with
|
||||||
| Some _ -> scope
|
| Some _ -> scope
|
||||||
| None -> Thread_local.get _global_scope
|
| None -> Ambient_context.get _ambient_scope
|
||||||
|
|
||||||
(** [with_scope sc f] calls [f()] in a context where [sc] is the
|
(** [with_scope sc f] calls [f()] in a context where [sc] is the
|
||||||
(thread)-local scope, then reverts to the previous local scope, if any. *)
|
(thread)-local scope, then reverts to the previous local scope, if any. *)
|
||||||
let[@inline] with_scope (sc : t) (f : unit -> 'a) : 'a =
|
let[@inline] with_scope (sc : t) (f : unit -> 'a) : 'a =
|
||||||
Thread_local.with_ _global_scope sc (fun _ -> f ())
|
Ambient_context.with_binding _ambient_scope sc (fun _ -> f ())
|
||||||
end
|
end
|
||||||
|
|
||||||
open struct
|
open struct
|
||||||
|
|
|
||||||
|
|
@ -1,100 +0,0 @@
|
||||||
module A = Opentelemetry_atomic.Atomic
|
|
||||||
|
|
||||||
type key = int
|
|
||||||
|
|
||||||
let[@inline] get_key_ () : key = Thread.id (Thread.self ())
|
|
||||||
|
|
||||||
module Key_map_ = Map.Make (struct
|
|
||||||
type t = key
|
|
||||||
|
|
||||||
let compare : t -> t -> int = compare
|
|
||||||
end)
|
|
||||||
|
|
||||||
type 'a t = 'a ref Key_map_.t A.t
|
|
||||||
(** The TLS variable is made of a global atomic reference
|
|
||||||
(which has very low contention: it's modified only when a
|
|
||||||
thread is started/stopped).
|
|
||||||
|
|
||||||
Inside that atomic variable, is a map from thread ID to a mutable [ref]
|
|
||||||
holding the actual data. Because this [ref] is only ever accessed
|
|
||||||
by the thread with this given ID, it's safe to modify. *)
|
|
||||||
|
|
||||||
let create () : _ t = A.make Key_map_.empty
|
|
||||||
|
|
||||||
let[@inline] get_exn (self : _ t) =
|
|
||||||
let m = A.get self in
|
|
||||||
let key = get_key_ () in
|
|
||||||
!(Key_map_.find key m)
|
|
||||||
|
|
||||||
let[@inline] get self = try Some (get_exn self) with Not_found -> None
|
|
||||||
|
|
||||||
(* remove reference for the key *)
|
|
||||||
let remove_ref_ self key : unit =
|
|
||||||
while
|
|
||||||
let m = A.get self in
|
|
||||||
let m' = Key_map_.remove key m in
|
|
||||||
not (A.compare_and_set self m m')
|
|
||||||
do
|
|
||||||
Thread.yield ()
|
|
||||||
done
|
|
||||||
|
|
||||||
let set_ref_ self key (r : _ ref) : unit =
|
|
||||||
while
|
|
||||||
let m = A.get self in
|
|
||||||
let m' = Key_map_.add key r m in
|
|
||||||
not (A.compare_and_set self m m')
|
|
||||||
do
|
|
||||||
Thread.yield ()
|
|
||||||
done
|
|
||||||
|
|
||||||
(* get or associate a reference to [key], and return it.
|
|
||||||
Also return a function to remove the reference if we just created it. *)
|
|
||||||
let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option =
|
|
||||||
try
|
|
||||||
let r = Key_map_.find key (A.get self) in
|
|
||||||
let old = !r in
|
|
||||||
r := v;
|
|
||||||
r, Some old
|
|
||||||
with Not_found ->
|
|
||||||
let r = ref v in
|
|
||||||
set_ref_ self key r;
|
|
||||||
r, None
|
|
||||||
|
|
||||||
let set (self : _ t) v : unit =
|
|
||||||
let key = get_key_ () in
|
|
||||||
let _, _ = get_or_create_ref_ self key ~v in
|
|
||||||
()
|
|
||||||
|
|
||||||
let remove (self : _ t) : unit =
|
|
||||||
let key = get_key_ () in
|
|
||||||
remove_ref_ self key
|
|
||||||
|
|
||||||
let get_or_create ~create (self : 'a t) : 'a =
|
|
||||||
let key = get_key_ () in
|
|
||||||
try
|
|
||||||
let r = Key_map_.find key (A.get self) in
|
|
||||||
!r
|
|
||||||
with Not_found ->
|
|
||||||
Gc.finalise (fun _ -> remove_ref_ self key) (Thread.self ());
|
|
||||||
let v = create () in
|
|
||||||
let r = ref v in
|
|
||||||
set_ref_ self key r;
|
|
||||||
v
|
|
||||||
|
|
||||||
let with_ self v f =
|
|
||||||
let key = get_key_ () in
|
|
||||||
let r, old = get_or_create_ref_ self key ~v in
|
|
||||||
|
|
||||||
let restore_ () : unit =
|
|
||||||
match old with
|
|
||||||
| None -> remove_ref_ self key
|
|
||||||
| Some old -> r := old
|
|
||||||
in
|
|
||||||
|
|
||||||
try
|
|
||||||
let res = f old in
|
|
||||||
restore_ ();
|
|
||||||
res
|
|
||||||
with e ->
|
|
||||||
restore_ ();
|
|
||||||
raise e
|
|
||||||
|
|
@ -1,27 +0,0 @@
|
||||||
(** Thread/Domain local storage
|
|
||||||
|
|
||||||
This allows the creation of global state that is per-domain or per-thread.
|
|
||||||
*)
|
|
||||||
|
|
||||||
type 'a t
|
|
||||||
|
|
||||||
val create : unit -> 'a t
|
|
||||||
(** Create new storage *)
|
|
||||||
|
|
||||||
val get : 'a t -> 'a option
|
|
||||||
(** Get current value *)
|
|
||||||
|
|
||||||
val get_exn : 'a t -> 'a
|
|
||||||
(** Like {!get} but fails with an exception
|
|
||||||
@raise Not_found if no value was found *)
|
|
||||||
|
|
||||||
val set : 'a t -> 'a -> unit
|
|
||||||
|
|
||||||
val remove : _ t -> unit
|
|
||||||
|
|
||||||
val get_or_create : create:(unit -> 'a) -> 'a t -> 'a
|
|
||||||
|
|
||||||
val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b
|
|
||||||
(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where
|
|
||||||
[prev] is the value currently in [var] (if any), and
|
|
||||||
then restores the old value of [var] for this thread. *)
|
|
||||||
|
|
@ -4,4 +4,4 @@
|
||||||
(public_name opentelemetry.trace)
|
(public_name opentelemetry.trace)
|
||||||
(synopsis "Use opentelemetry as a collector for trace")
|
(synopsis "Use opentelemetry as a collector for trace")
|
||||||
(optional)
|
(optional)
|
||||||
(libraries trace opentelemetry))
|
(libraries ambient-context trace opentelemetry))
|
||||||
|
|
|
||||||
|
|
@ -1,38 +1,4 @@
|
||||||
module Otel = Opentelemetry
|
module Otel = Opentelemetry
|
||||||
module TLS = Otel.Thread_local
|
|
||||||
|
|
||||||
type span = Trace.span
|
|
||||||
|
|
||||||
(** Table indexed by Trace spans *)
|
|
||||||
module Span_tbl = Hashtbl.Make (struct
|
|
||||||
include Int64
|
|
||||||
|
|
||||||
let hash : t -> int = Hashtbl.hash
|
|
||||||
end)
|
|
||||||
|
|
||||||
(** Per-thread set of active spans. *)
|
|
||||||
module Active_spans = struct
|
|
||||||
type span_begin = {
|
|
||||||
span_id: Otel.Span_id.t;
|
|
||||||
start_time: int64;
|
|
||||||
name: string;
|
|
||||||
data: (string * Trace.user_data) list;
|
|
||||||
__FILE__: string;
|
|
||||||
__LINE__: int;
|
|
||||||
new_scope: Otel.Scope.t;
|
|
||||||
old_scope: Otel.Scope.t option;
|
|
||||||
}
|
|
||||||
(** Information we get at the beginning of the span *)
|
|
||||||
|
|
||||||
type t = { tbl: span_begin Span_tbl.t } [@@unboxed]
|
|
||||||
(** Storage for active spans *)
|
|
||||||
|
|
||||||
let create () : t = { tbl = Span_tbl.create 8 }
|
|
||||||
|
|
||||||
let tls : t TLS.t = TLS.create ()
|
|
||||||
|
|
||||||
let[@inline] get () : t = TLS.get_or_create tls ~create
|
|
||||||
end
|
|
||||||
|
|
||||||
let conv_span_to_i64 (id : Otel.Span_id.t) : int64 =
|
let conv_span_to_i64 (id : Otel.Span_id.t) : int64 =
|
||||||
let bs = Otel.Span_id.to_bytes id in
|
let bs = Otel.Span_id.to_bytes id in
|
||||||
|
|
@ -47,7 +13,7 @@ let span_of_i64 (id : int64) : Otel.Span_id.t =
|
||||||
|
|
||||||
let collector () : Trace.collector =
|
let collector () : Trace.collector =
|
||||||
let module M = struct
|
let module M = struct
|
||||||
let enter_span ?__FUNCTION__:_ ~__FILE__ ~__LINE__ ~data name : span =
|
let with_span ~__FUNCTION__:_ ~__FILE__ ~__LINE__ ~data name cb =
|
||||||
let span_id = Otel.Span_id.create () in
|
let span_id = Otel.Span_id.create () in
|
||||||
let span = conv_span_to_i64 span_id in
|
let span = conv_span_to_i64 span_id in
|
||||||
|
|
||||||
|
|
@ -63,57 +29,32 @@ let collector () : Trace.collector =
|
||||||
let new_scope =
|
let new_scope =
|
||||||
{ Otel.Scope.span_id; trace_id; events = []; attrs = [] }
|
{ Otel.Scope.span_id; trace_id; events = []; attrs = [] }
|
||||||
in
|
in
|
||||||
TLS.set Otel.Scope._global_scope new_scope;
|
|
||||||
|
|
||||||
let active_spans = Active_spans.get () in
|
Ambient_context.with_binding Otel.Scope._ambient_scope new_scope
|
||||||
Span_tbl.add active_spans.tbl span
|
@@ fun () ->
|
||||||
{
|
let rv = cb span in
|
||||||
span_id;
|
|
||||||
start_time;
|
|
||||||
__FILE__;
|
|
||||||
__LINE__;
|
|
||||||
old_scope;
|
|
||||||
new_scope;
|
|
||||||
name;
|
|
||||||
data;
|
|
||||||
};
|
|
||||||
|
|
||||||
span
|
let end_time = Otel.Timestamp_ns.now_unix_ns () in
|
||||||
|
|
||||||
let exit_span (span : span) : unit =
|
let o_span : Otel.Span.t =
|
||||||
let active_spans = Active_spans.get () in
|
let attrs =
|
||||||
match Span_tbl.find_opt active_spans.tbl span with
|
[ "file", `String __FILE__; "line", `Int __LINE__ ] @ data
|
||||||
| None -> () (* TODO: log warning *)
|
|
||||||
| Some
|
|
||||||
{
|
|
||||||
span_id;
|
|
||||||
start_time;
|
|
||||||
name;
|
|
||||||
__FILE__;
|
|
||||||
__LINE__;
|
|
||||||
new_scope;
|
|
||||||
old_scope;
|
|
||||||
data;
|
|
||||||
} ->
|
|
||||||
let end_time = Otel.Timestamp_ns.now_unix_ns () in
|
|
||||||
|
|
||||||
(* restore previous scope *)
|
|
||||||
(match old_scope with
|
|
||||||
| None -> TLS.remove Otel.Scope._global_scope
|
|
||||||
| Some sc -> TLS.set Otel.Scope._global_scope sc);
|
|
||||||
|
|
||||||
let o_span : Otel.Span.t =
|
|
||||||
let attrs =
|
|
||||||
[ "file", `String __FILE__; "line", `Int __LINE__ ] @ data
|
|
||||||
in
|
|
||||||
Otel.Span.create ~trace_id:new_scope.trace_id ~id:span_id ~start_time
|
|
||||||
~end_time ~attrs name
|
|
||||||
|> fst
|
|
||||||
in
|
in
|
||||||
|
Otel.Span.create ~trace_id:new_scope.trace_id ~id:span_id ~start_time
|
||||||
|
~end_time ~attrs name
|
||||||
|
|> fst
|
||||||
|
in
|
||||||
|
|
||||||
Otel.Trace.emit [ o_span ];
|
Otel.Trace.emit [ o_span ];
|
||||||
|
|
||||||
()
|
rv
|
||||||
|
|
||||||
|
let enter_explicit_span ~surrounding:_ ?__FUNCTION__:_ ~__FILE__:_
|
||||||
|
~__LINE__:_ ~data:_ _name : Trace.explicit_span =
|
||||||
|
failwith "nyi"
|
||||||
|
|
||||||
|
let exit_explicit_span _sp =
|
||||||
|
failwith "nyi"
|
||||||
|
|
||||||
let message ?span ~data:_ msg : unit =
|
let message ?span ~data:_ msg : unit =
|
||||||
(* gather information from context *)
|
(* gather information from context *)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue