From 35d1782c7299522c162e3c128803522e31f4678d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 16 Sep 2022 16:58:15 -0400 Subject: [PATCH 1/7] use thread-local storage to store global scope --- .../cohttp/opentelemetry_cohttp_lwt.ml | 10 +-- src/opentelemetry.ml | 70 +++++++++++++----- src/thread_local.ml | 71 +++++++++++++++++++ src/thread_local.mli | 21 ++++++ 4 files changed, 149 insertions(+), 23 deletions(-) create mode 100644 src/thread_local.ml create mode 100644 src/thread_local.mli diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index db33c233..4938cd47 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -45,12 +45,12 @@ module Server : sig *) val get_trace_context : - ?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option + ?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header added by [trace] and [with_]. *) - val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t + val set_trace_context : Otel.Scope.t -> Request.t -> Request.t (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used by [trace] and [with_]. *) @@ -88,7 +88,7 @@ end = struct let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent" - let set_trace_context (scope : Otel.Trace.scope) req = + let set_trace_context (scope : Otel.Scope.t) req = let module Traceparent = Otel.Trace_context.Traceparent in let headers = Header.add (Request.headers req) header_x_ocaml_otel_traceparent @@ -146,7 +146,7 @@ end = struct f req) end -let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = +let client ?(scope : Otel.Scope.t option) (module C : Cohttp_lwt.S.Client) = let module Traced = struct open Lwt.Syntax @@ -170,7 +170,7 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = let attrs = attrs_for ~uri ~meth () in trace_id, parent, attrs - let add_traceparent (scope : Otel.Trace.scope) headers = + let add_traceparent (scope : Otel.Scope.t) headers = let module Traceparent = Otel.Trace_context.Traceparent in let headers = match headers with diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index c2fde0d0..f79afc2d 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -1,5 +1,7 @@ (** Opentelemetry types and instrumentation *) +module Thread_local = Thread_local + module Lock = Lock (** Global lock *) @@ -482,6 +484,48 @@ end = struct default_span_event ~time_unix_nano ~name ~attributes:attrs () end +(** {2 Scopes} *) + +(** Scopes. + + A scope is a trace ID and the span ID of the currently active span. +*) +module Scope = struct + type t = { + trace_id: Trace_id.t; + span_id: Span_id.t; + mutable events: Event.t list; + mutable attrs: key_value list; + } + + (** Add an event to the scope. It will be aggregated into the span. + + Note that this takes a function that produces an event, and will only + call it if there is an instrumentation backend. *) + let[@inline] add_event (scope : t) (ev : unit -> Event.t) : unit = + if Collector.has_backend () then scope.events <- ev () :: scope.events + + (** Add an attr to the scope. It will be aggregated into the span. + + Note that this takes a function that produces attributes, and will only + call it if there is an instrumentation backend. *) + let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit = + if Collector.has_backend () then + scope.attrs <- List.rev_append (attrs ()) scope.attrs +end + +(* now. We use thread local storage to store the currently active scope, + if any. *) +open struct + let global_scope : Scope.t Thread_local.t = Thread_local.create () + + (* access global scope if [scope=None] *) + let get_scope ?scope () = + match scope with + | Some _ -> scope + | None -> Thread_local.get global_scope +end + (** Span Link A pointer from the current span to another span in the same trace or in a @@ -648,29 +692,17 @@ module Trace = struct let rs = make_resource_spans ?service_name ?attrs spans in Collector.send_trace [ rs ] ~ret:(fun () -> ()) - type scope = { + type scope = Scope.t = { trace_id: Trace_id.t; span_id: Span_id.t; mutable events: Event.t list; mutable attrs: Span.key_value list; } - (** Scope to be used with {!with_}. *) + [@@deprecated "use Scope.t"] - (** Add an event to the scope. It will be aggregated into the span. + let add_event = Scope.add_event [@@deprecated "use Scope.add_event"] - Note that this takes a function that produces an event, and will only - call it if there is an instrumentation backend. *) - let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit = - if Collector.has_backend () then scope.events <- ev () :: scope.events - - (** Add an attr to the scope. It will be aggregated into the span. - - Note that this takes a function that produces attributes, and will only - call it if there is an instrumentation backend. *) - let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) : - unit = - if Collector.has_backend () then - scope.attrs <- List.rev_append (attrs ()) scope.attrs + let add_attrs = Scope.add_attrs [@@deprecated "use Scope.add_attrs"] (** Sync span guard. @@ -678,7 +710,8 @@ module Trace = struct cause deadlocks. *) let with_ ?trace_state ?service_name ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope - ?links name (f : scope -> 'a) : 'a = + ?links name (f : Scope.t -> 'a) : 'a = + let scope = get_scope ?scope () in let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id @@ -694,7 +727,8 @@ module Trace = struct let start_time = Timestamp_ns.now_unix_ns () in let span_id = Span_id.create () in let scope = { trace_id; span_id; events = []; attrs } in - + (* set global scope in this thread *) + Thread_local.with_ global_scope scope @@ fun _sc -> (* called once we're done, to emit a span *) let finally res = let status = diff --git a/src/thread_local.ml b/src/thread_local.ml new file mode 100644 index 00000000..d7b0c296 --- /dev/null +++ b/src/thread_local.ml @@ -0,0 +1,71 @@ +module A = Atomic + +type key = int + +let get_key_ () : key = Thread.id (Thread.self ()) + +module Key_map_ = Map.Make (struct + type t = key + + let compare : t -> t -> int = compare +end) + +type 'a t = 'a ref Key_map_.t A.t + +let create () : _ t = A.make Key_map_.empty + +let get_exn (self : _ t) = + let m = A.get self in + let key = get_key_ () in + !(Key_map_.find key m) + +let[@inline] get self = try Some (get_exn self) with Not_found -> None + +let[@inline] get_or ~default self = try get_exn self with Not_found -> default + +(* remove reference for the key *) +let[@inline] remove_ref_ self key : unit = + while + let m = A.get self in + let m' = Key_map_.remove key m in + not (A.compare_and_set self m m') + do + () + done + +(* get or associate a reference to [key], and return it. + Also return a function to remove the reference if we just created it. *) +let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option = + try + let r = Key_map_.find key (A.get self) in + let old = !r in + r := v; + r, Some old + with Not_found -> + let r = ref v in + while + let m = A.get self in + let m' = Key_map_.add key r m in + not (A.compare_and_set self m m') + do + () + done; + r, None + +let with_ self v f = + let key = get_key_ () in + let r, old = get_or_create_ref_ self key ~v in + + let restore_ () : unit = + match old with + | None -> remove_ref_ self key + | Some old -> r := old + in + + try + let res = f old in + restore_ (); + res + with e -> + (restore_ [@inlined]) (); + raise e diff --git a/src/thread_local.mli b/src/thread_local.mli new file mode 100644 index 00000000..ecd162f2 --- /dev/null +++ b/src/thread_local.mli @@ -0,0 +1,21 @@ +(** Thread/Domain local storage + + This allows the creation of global state that is per-domain or per-thread. +*) + +type 'a t + +val create : unit -> 'a t +(** Create new storage *) + +val get : 'a t -> 'a option +(** Get current value *) + +val get_exn : 'a t -> 'a +(** Like {!get} but fails with an exception + @raise Not_found if no value was found *) + +val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b +(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where + [prev] is the value currently in [var] (if any), and + then restores the old value of [var] for this thread. *) From 6bdf35131304e7fd8fa6caa3e80849bdf0b8d2bd Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 16 Sep 2022 17:00:46 -0400 Subject: [PATCH 2/7] fix warning --- src/thread_local.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/thread_local.ml b/src/thread_local.ml index d7b0c296..21db1626 100644 --- a/src/thread_local.ml +++ b/src/thread_local.ml @@ -67,5 +67,5 @@ let with_ self v f = restore_ (); res with e -> - (restore_ [@inlined]) (); + restore_ (); raise e From 555a69a0de4845ae2c3e36d96c7fe06bc175f77a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 16 Sep 2022 17:00:51 -0400 Subject: [PATCH 3/7] test: in emit1, use implicit local storage --- tests/bin/emit1.ml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index d5196413..faaeca12 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -23,16 +23,17 @@ let run_job () = let@ () = Fun.protect ~finally:(fun () -> Atomic.set stop true) in let i = ref 0 in while not @@ Atomic.get stop do - let@ scope = + let@ _scope = Atomic.incr num_tr; T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" ~attrs:[ "i", `Int !i ] in for j = 0 to 4 do + (* parent scope is found via thread local storage *) let@ scope = Atomic.incr num_tr; - T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope + T.Trace.with_ ~kind:T.Span.Span_kind_internal ~attrs:[ "j", `Int j ] "loop.inner" in From 19c0fac8fdda8027c43f907fd251b6c65f0f7ebd Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Fri, 16 Sep 2022 18:54:45 -0400 Subject: [PATCH 4/7] fix compilation on < 4.12 --- src/dune | 2 +- src/thread_local.ml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dune b/src/dune index aef121fa..268cf4cb 100644 --- a/src/dune +++ b/src/dune @@ -2,7 +2,7 @@ (name opentelemetry) (synopsis "API for opentelemetry instrumentation") (flags :standard -warn-error -a+8) - (libraries ptime ptime.clock.os pbrt threads) + (libraries ptime ptime.clock.os pbrt threads opentelemetry.atomic) (public_name opentelemetry)) ; ### protobuf rules ### diff --git a/src/thread_local.ml b/src/thread_local.ml index 21db1626..06d54bcb 100644 --- a/src/thread_local.ml +++ b/src/thread_local.ml @@ -1,4 +1,4 @@ -module A = Atomic +module A = Opentelemetry_atomic.Atomic type key = int From 8d361744ecd95149c4d5ab000fbb9510ddc2f965 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 19 Sep 2022 15:01:13 -0400 Subject: [PATCH 5/7] fix warning --- src/lwt/opentelemetry_lwt.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index 37fadef6..0bb93bc6 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -15,7 +15,7 @@ module Trace = struct (** Sync span guard *) let with_ ?trace_state ?service_name ?(attrs = []) ?kind ?trace_id ?parent - ?scope ?links name (f : Trace.scope -> 'a Lwt.t) : 'a Lwt.t = + ?scope ?links name (f : Scope.t -> 'a Lwt.t) : 'a Lwt.t = let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id From a2e273282c088e2606b32b8fdae131b07488865a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 19 Sep 2022 15:01:16 -0400 Subject: [PATCH 6/7] feat(opentelemetry): Trace.with_ now has `force_new_trace_id` param this parameter can be used to force the creation of a new context, independent of surrounding context. --- src/opentelemetry.ml | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index f79afc2d..d1ec0b64 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -706,20 +706,31 @@ module Trace = struct (** Sync span guard. + @param force_new_trace_id if true (default false), the span will not use a + surrounding context, or [scope], or [trace_id], but will always + create a fresh new trace ID. + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause deadlocks. *) - let with_ ?trace_state ?service_name + let with_ ?(force_new_trace_id = false) ?trace_state ?service_name ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope ?links name (f : Scope.t -> 'a) : 'a = - let scope = get_scope ?scope () in + let scope = + if force_new_trace_id then + None + else + get_scope ?scope () + in let trace_id = match trace_id, scope with + | _ when force_new_trace_id -> Trace_id.create () | Some trace_id, _ -> trace_id | None, Some scope -> scope.trace_id | None, None -> Trace_id.create () in let parent = match parent, scope with + | _ when force_new_trace_id -> None | Some span_id, _ -> Some span_id | None, Some scope -> Some scope.span_id | None, None -> None From e60e7754b30c806621ec130c1da6e88247ba8da6 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 19 Sep 2022 16:04:37 -0400 Subject: [PATCH 7/7] expose `Scope.get_surrounding` --- src/opentelemetry.ml | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index d1ec0b64..40c4fa98 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -512,18 +512,23 @@ module Scope = struct let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit = if Collector.has_backend () then scope.attrs <- List.rev_append (attrs ()) scope.attrs -end -(* now. We use thread local storage to store the currently active scope, - if any. *) -open struct - let global_scope : Scope.t Thread_local.t = Thread_local.create () + (**/**) - (* access global scope if [scope=None] *) - let get_scope ?scope () = + (* define this locally *) + let _global_scope : t Thread_local.t = Thread_local.create () + + (**/**) + + (** Obtain current scope from thread-local storage, if available *) + let get_surrounding ?scope () : t option = match scope with | Some _ -> scope - | None -> Thread_local.get global_scope + | None -> Thread_local.get _global_scope +end + +open struct + let get_surrounding_scope = Scope.get_surrounding end (** Span Link @@ -719,7 +724,7 @@ module Trace = struct if force_new_trace_id then None else - get_scope ?scope () + get_surrounding_scope ?scope () in let trace_id = match trace_id, scope with @@ -739,7 +744,7 @@ module Trace = struct let span_id = Span_id.create () in let scope = { trace_id; span_id; events = []; attrs } in (* set global scope in this thread *) - Thread_local.with_ global_scope scope @@ fun _sc -> + Thread_local.with_ Scope._global_scope scope @@ fun _sc -> (* called once we're done, to emit a span *) let finally res = let status =