mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-08 03:47:59 -04:00
use thread-local storage to store global scope
This commit is contained in:
parent
b2df8ab31d
commit
35d1782c72
4 changed files with 149 additions and 23 deletions
|
|
@ -45,12 +45,12 @@ module Server : sig
|
|||
*)
|
||||
|
||||
val get_trace_context :
|
||||
?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option
|
||||
?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option
|
||||
(** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header
|
||||
added by [trace] and [with_].
|
||||
*)
|
||||
|
||||
val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t
|
||||
val set_trace_context : Otel.Scope.t -> Request.t -> Request.t
|
||||
(** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used
|
||||
by [trace] and [with_].
|
||||
*)
|
||||
|
|
@ -88,7 +88,7 @@ end = struct
|
|||
|
||||
let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent"
|
||||
|
||||
let set_trace_context (scope : Otel.Trace.scope) req =
|
||||
let set_trace_context (scope : Otel.Scope.t) req =
|
||||
let module Traceparent = Otel.Trace_context.Traceparent in
|
||||
let headers =
|
||||
Header.add (Request.headers req) header_x_ocaml_otel_traceparent
|
||||
|
|
@ -146,7 +146,7 @@ end = struct
|
|||
f req)
|
||||
end
|
||||
|
||||
let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) =
|
||||
let client ?(scope : Otel.Scope.t option) (module C : Cohttp_lwt.S.Client) =
|
||||
let module Traced = struct
|
||||
open Lwt.Syntax
|
||||
|
||||
|
|
@ -170,7 +170,7 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) =
|
|||
let attrs = attrs_for ~uri ~meth () in
|
||||
trace_id, parent, attrs
|
||||
|
||||
let add_traceparent (scope : Otel.Trace.scope) headers =
|
||||
let add_traceparent (scope : Otel.Scope.t) headers =
|
||||
let module Traceparent = Otel.Trace_context.Traceparent in
|
||||
let headers =
|
||||
match headers with
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
(** Opentelemetry types and instrumentation *)
|
||||
|
||||
module Thread_local = Thread_local
|
||||
|
||||
module Lock = Lock
|
||||
(** Global lock *)
|
||||
|
||||
|
|
@ -482,6 +484,48 @@ end = struct
|
|||
default_span_event ~time_unix_nano ~name ~attributes:attrs ()
|
||||
end
|
||||
|
||||
(** {2 Scopes} *)
|
||||
|
||||
(** Scopes.
|
||||
|
||||
A scope is a trace ID and the span ID of the currently active span.
|
||||
*)
|
||||
module Scope = struct
|
||||
type t = {
|
||||
trace_id: Trace_id.t;
|
||||
span_id: Span_id.t;
|
||||
mutable events: Event.t list;
|
||||
mutable attrs: key_value list;
|
||||
}
|
||||
|
||||
(** Add an event to the scope. It will be aggregated into the span.
|
||||
|
||||
Note that this takes a function that produces an event, and will only
|
||||
call it if there is an instrumentation backend. *)
|
||||
let[@inline] add_event (scope : t) (ev : unit -> Event.t) : unit =
|
||||
if Collector.has_backend () then scope.events <- ev () :: scope.events
|
||||
|
||||
(** Add an attr to the scope. It will be aggregated into the span.
|
||||
|
||||
Note that this takes a function that produces attributes, and will only
|
||||
call it if there is an instrumentation backend. *)
|
||||
let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit =
|
||||
if Collector.has_backend () then
|
||||
scope.attrs <- List.rev_append (attrs ()) scope.attrs
|
||||
end
|
||||
|
||||
(* now. We use thread local storage to store the currently active scope,
|
||||
if any. *)
|
||||
open struct
|
||||
let global_scope : Scope.t Thread_local.t = Thread_local.create ()
|
||||
|
||||
(* access global scope if [scope=None] *)
|
||||
let get_scope ?scope () =
|
||||
match scope with
|
||||
| Some _ -> scope
|
||||
| None -> Thread_local.get global_scope
|
||||
end
|
||||
|
||||
(** Span Link
|
||||
|
||||
A pointer from the current span to another span in the same trace or in a
|
||||
|
|
@ -648,29 +692,17 @@ module Trace = struct
|
|||
let rs = make_resource_spans ?service_name ?attrs spans in
|
||||
Collector.send_trace [ rs ] ~ret:(fun () -> ())
|
||||
|
||||
type scope = {
|
||||
type scope = Scope.t = {
|
||||
trace_id: Trace_id.t;
|
||||
span_id: Span_id.t;
|
||||
mutable events: Event.t list;
|
||||
mutable attrs: Span.key_value list;
|
||||
}
|
||||
(** Scope to be used with {!with_}. *)
|
||||
[@@deprecated "use Scope.t"]
|
||||
|
||||
(** Add an event to the scope. It will be aggregated into the span.
|
||||
let add_event = Scope.add_event [@@deprecated "use Scope.add_event"]
|
||||
|
||||
Note that this takes a function that produces an event, and will only
|
||||
call it if there is an instrumentation backend. *)
|
||||
let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit =
|
||||
if Collector.has_backend () then scope.events <- ev () :: scope.events
|
||||
|
||||
(** Add an attr to the scope. It will be aggregated into the span.
|
||||
|
||||
Note that this takes a function that produces attributes, and will only
|
||||
call it if there is an instrumentation backend. *)
|
||||
let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) :
|
||||
unit =
|
||||
if Collector.has_backend () then
|
||||
scope.attrs <- List.rev_append (attrs ()) scope.attrs
|
||||
let add_attrs = Scope.add_attrs [@@deprecated "use Scope.add_attrs"]
|
||||
|
||||
(** Sync span guard.
|
||||
|
||||
|
|
@ -678,7 +710,8 @@ module Trace = struct
|
|||
cause deadlocks. *)
|
||||
let with_ ?trace_state ?service_name
|
||||
?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope
|
||||
?links name (f : scope -> 'a) : 'a =
|
||||
?links name (f : Scope.t -> 'a) : 'a =
|
||||
let scope = get_scope ?scope () in
|
||||
let trace_id =
|
||||
match trace_id, scope with
|
||||
| Some trace_id, _ -> trace_id
|
||||
|
|
@ -694,7 +727,8 @@ module Trace = struct
|
|||
let start_time = Timestamp_ns.now_unix_ns () in
|
||||
let span_id = Span_id.create () in
|
||||
let scope = { trace_id; span_id; events = []; attrs } in
|
||||
|
||||
(* set global scope in this thread *)
|
||||
Thread_local.with_ global_scope scope @@ fun _sc ->
|
||||
(* called once we're done, to emit a span *)
|
||||
let finally res =
|
||||
let status =
|
||||
|
|
|
|||
71
src/thread_local.ml
Normal file
71
src/thread_local.ml
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
module A = Atomic
|
||||
|
||||
type key = int
|
||||
|
||||
let get_key_ () : key = Thread.id (Thread.self ())
|
||||
|
||||
module Key_map_ = Map.Make (struct
|
||||
type t = key
|
||||
|
||||
let compare : t -> t -> int = compare
|
||||
end)
|
||||
|
||||
type 'a t = 'a ref Key_map_.t A.t
|
||||
|
||||
let create () : _ t = A.make Key_map_.empty
|
||||
|
||||
let get_exn (self : _ t) =
|
||||
let m = A.get self in
|
||||
let key = get_key_ () in
|
||||
!(Key_map_.find key m)
|
||||
|
||||
let[@inline] get self = try Some (get_exn self) with Not_found -> None
|
||||
|
||||
let[@inline] get_or ~default self = try get_exn self with Not_found -> default
|
||||
|
||||
(* remove reference for the key *)
|
||||
let[@inline] remove_ref_ self key : unit =
|
||||
while
|
||||
let m = A.get self in
|
||||
let m' = Key_map_.remove key m in
|
||||
not (A.compare_and_set self m m')
|
||||
do
|
||||
()
|
||||
done
|
||||
|
||||
(* get or associate a reference to [key], and return it.
|
||||
Also return a function to remove the reference if we just created it. *)
|
||||
let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option =
|
||||
try
|
||||
let r = Key_map_.find key (A.get self) in
|
||||
let old = !r in
|
||||
r := v;
|
||||
r, Some old
|
||||
with Not_found ->
|
||||
let r = ref v in
|
||||
while
|
||||
let m = A.get self in
|
||||
let m' = Key_map_.add key r m in
|
||||
not (A.compare_and_set self m m')
|
||||
do
|
||||
()
|
||||
done;
|
||||
r, None
|
||||
|
||||
let with_ self v f =
|
||||
let key = get_key_ () in
|
||||
let r, old = get_or_create_ref_ self key ~v in
|
||||
|
||||
let restore_ () : unit =
|
||||
match old with
|
||||
| None -> remove_ref_ self key
|
||||
| Some old -> r := old
|
||||
in
|
||||
|
||||
try
|
||||
let res = f old in
|
||||
restore_ ();
|
||||
res
|
||||
with e ->
|
||||
(restore_ [@inlined]) ();
|
||||
raise e
|
||||
21
src/thread_local.mli
Normal file
21
src/thread_local.mli
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
(** Thread/Domain local storage
|
||||
|
||||
This allows the creation of global state that is per-domain or per-thread.
|
||||
*)
|
||||
|
||||
type 'a t
|
||||
|
||||
val create : unit -> 'a t
|
||||
(** Create new storage *)
|
||||
|
||||
val get : 'a t -> 'a option
|
||||
(** Get current value *)
|
||||
|
||||
val get_exn : 'a t -> 'a
|
||||
(** Like {!get} but fails with an exception
|
||||
@raise Not_found if no value was found *)
|
||||
|
||||
val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b
|
||||
(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where
|
||||
[prev] is the value currently in [var] (if any), and
|
||||
then restores the old value of [var] for this thread. *)
|
||||
Loading…
Add table
Reference in a new issue