Merge pull request #27 from imandra-ai/wip-thread-local

implicit scope using TLS
This commit is contained in:
Simon Cruanes 2022-09-22 10:02:06 -04:00 committed by GitHub
commit b08a89da49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 28 deletions

View file

@ -2,7 +2,7 @@
(name opentelemetry)
(synopsis "API for opentelemetry instrumentation")
(flags :standard -warn-error -a+8)
(libraries ptime ptime.clock.os pbrt threads)
(libraries ptime ptime.clock.os pbrt threads opentelemetry.atomic)
(public_name opentelemetry))
; ### protobuf rules ###

View file

@ -45,12 +45,12 @@ module Server : sig
*)
val get_trace_context :
?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option
?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option
(** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header
added by [trace] and [with_].
*)
val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t
val set_trace_context : Otel.Scope.t -> Request.t -> Request.t
(** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used
by [trace] and [with_].
*)
@ -88,7 +88,7 @@ end = struct
let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent"
let set_trace_context (scope : Otel.Trace.scope) req =
let set_trace_context (scope : Otel.Scope.t) req =
let module Traceparent = Otel.Trace_context.Traceparent in
let headers =
Header.add (Request.headers req) header_x_ocaml_otel_traceparent
@ -146,7 +146,7 @@ end = struct
f req)
end
let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) =
let client ?(scope : Otel.Scope.t option) (module C : Cohttp_lwt.S.Client) =
let module Traced = struct
open Lwt.Syntax
@ -170,7 +170,7 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) =
let attrs = attrs_for ~uri ~meth () in
trace_id, parent, attrs
let add_traceparent (scope : Otel.Trace.scope) headers =
let add_traceparent (scope : Otel.Scope.t) headers =
let module Traceparent = Otel.Trace_context.Traceparent in
let headers =
match headers with

View file

@ -15,7 +15,7 @@ module Trace = struct
(** Sync span guard *)
let with_ ?trace_state ?service_name ?(attrs = []) ?kind ?trace_id ?parent
?scope ?links name (f : Trace.scope -> 'a Lwt.t) : 'a Lwt.t =
?scope ?links name (f : Scope.t -> 'a Lwt.t) : 'a Lwt.t =
let trace_id =
match trace_id, scope with
| Some trace_id, _ -> trace_id

View file

@ -1,5 +1,7 @@
(** Opentelemetry types and instrumentation *)
module Thread_local = Thread_local
module Lock = Lock
(** Global lock *)
@ -482,6 +484,53 @@ end = struct
default_span_event ~time_unix_nano ~name ~attributes:attrs ()
end
(** {2 Scopes} *)
(** Scopes.
A scope is a trace ID and the span ID of the currently active span.
*)
module Scope = struct
type t = {
trace_id: Trace_id.t;
span_id: Span_id.t;
mutable events: Event.t list;
mutable attrs: key_value list;
}
(** Add an event to the scope. It will be aggregated into the span.
Note that this takes a function that produces an event, and will only
call it if there is an instrumentation backend. *)
let[@inline] add_event (scope : t) (ev : unit -> Event.t) : unit =
if Collector.has_backend () then scope.events <- ev () :: scope.events
(** Add an attr to the scope. It will be aggregated into the span.
Note that this takes a function that produces attributes, and will only
call it if there is an instrumentation backend. *)
let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit =
if Collector.has_backend () then
scope.attrs <- List.rev_append (attrs ()) scope.attrs
(**/**)
(* define this locally *)
let _global_scope : t Thread_local.t = Thread_local.create ()
(**/**)
(** Obtain current scope from thread-local storage, if available *)
let get_surrounding ?scope () : t option =
match scope with
| Some _ -> scope
| None -> Thread_local.get _global_scope
end
open struct
let get_surrounding_scope = Scope.get_surrounding
end
(** Span Link
A pointer from the current span to another span in the same trace or in a
@ -648,45 +697,45 @@ module Trace = struct
let rs = make_resource_spans ?service_name ?attrs spans in
Collector.send_trace [ rs ] ~ret:(fun () -> ())
type scope = {
type scope = Scope.t = {
trace_id: Trace_id.t;
span_id: Span_id.t;
mutable events: Event.t list;
mutable attrs: Span.key_value list;
}
(** Scope to be used with {!with_}. *)
[@@deprecated "use Scope.t"]
(** Add an event to the scope. It will be aggregated into the span.
let add_event = Scope.add_event [@@deprecated "use Scope.add_event"]
Note that this takes a function that produces an event, and will only
call it if there is an instrumentation backend. *)
let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit =
if Collector.has_backend () then scope.events <- ev () :: scope.events
(** Add an attr to the scope. It will be aggregated into the span.
Note that this takes a function that produces attributes, and will only
call it if there is an instrumentation backend. *)
let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) :
unit =
if Collector.has_backend () then
scope.attrs <- List.rev_append (attrs ()) scope.attrs
let add_attrs = Scope.add_attrs [@@deprecated "use Scope.add_attrs"]
(** Sync span guard.
@param force_new_trace_id if true (default false), the span will not use a
surrounding context, or [scope], or [trace_id], but will always
create a fresh new trace ID.
{b NOTE} be careful not to call this inside a Gc alarm, as it can
cause deadlocks. *)
let with_ ?trace_state ?service_name
let with_ ?(force_new_trace_id = false) ?trace_state ?service_name
?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope
?links name (f : scope -> 'a) : 'a =
?links name (f : Scope.t -> 'a) : 'a =
let scope =
if force_new_trace_id then
None
else
get_surrounding_scope ?scope ()
in
let trace_id =
match trace_id, scope with
| _ when force_new_trace_id -> Trace_id.create ()
| Some trace_id, _ -> trace_id
| None, Some scope -> scope.trace_id
| None, None -> Trace_id.create ()
in
let parent =
match parent, scope with
| _ when force_new_trace_id -> None
| Some span_id, _ -> Some span_id
| None, Some scope -> Some scope.span_id
| None, None -> None
@ -694,7 +743,8 @@ module Trace = struct
let start_time = Timestamp_ns.now_unix_ns () in
let span_id = Span_id.create () in
let scope = { trace_id; span_id; events = []; attrs } in
(* set global scope in this thread *)
Thread_local.with_ Scope._global_scope scope @@ fun _sc ->
(* called once we're done, to emit a span *)
let finally res =
let status =

71
src/thread_local.ml Normal file
View file

@ -0,0 +1,71 @@
module A = Opentelemetry_atomic.Atomic
type key = int
let get_key_ () : key = Thread.id (Thread.self ())
module Key_map_ = Map.Make (struct
type t = key
let compare : t -> t -> int = compare
end)
type 'a t = 'a ref Key_map_.t A.t
let create () : _ t = A.make Key_map_.empty
let get_exn (self : _ t) =
let m = A.get self in
let key = get_key_ () in
!(Key_map_.find key m)
let[@inline] get self = try Some (get_exn self) with Not_found -> None
let[@inline] get_or ~default self = try get_exn self with Not_found -> default
(* remove reference for the key *)
let[@inline] remove_ref_ self key : unit =
while
let m = A.get self in
let m' = Key_map_.remove key m in
not (A.compare_and_set self m m')
do
()
done
(* get or associate a reference to [key], and return it.
Also return a function to remove the reference if we just created it. *)
let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option =
try
let r = Key_map_.find key (A.get self) in
let old = !r in
r := v;
r, Some old
with Not_found ->
let r = ref v in
while
let m = A.get self in
let m' = Key_map_.add key r m in
not (A.compare_and_set self m m')
do
()
done;
r, None
let with_ self v f =
let key = get_key_ () in
let r, old = get_or_create_ref_ self key ~v in
let restore_ () : unit =
match old with
| None -> remove_ref_ self key
| Some old -> r := old
in
try
let res = f old in
restore_ ();
res
with e ->
restore_ ();
raise e

21
src/thread_local.mli Normal file
View file

@ -0,0 +1,21 @@
(** Thread/Domain local storage
This allows the creation of global state that is per-domain or per-thread.
*)
type 'a t
val create : unit -> 'a t
(** Create new storage *)
val get : 'a t -> 'a option
(** Get current value *)
val get_exn : 'a t -> 'a
(** Like {!get} but fails with an exception
@raise Not_found if no value was found *)
val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b
(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where
[prev] is the value currently in [var] (if any), and
then restores the old value of [var] for this thread. *)

View file

@ -23,16 +23,17 @@ let run_job () =
let@ () = Fun.protect ~finally:(fun () -> Atomic.set stop true) in
let i = ref 0 in
while not @@ Atomic.get stop do
let@ scope =
let@ _scope =
Atomic.incr num_tr;
T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer"
~attrs:[ "i", `Int !i ]
in
for j = 0 to 4 do
(* parent scope is found via thread local storage *)
let@ scope =
Atomic.incr num_tr;
T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope
T.Trace.with_ ~kind:T.Span.Span_kind_internal
~attrs:[ "j", `Int j ]
"loop.inner"
in