diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index db33c233..4938cd47 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -45,12 +45,12 @@ module Server : sig *) val get_trace_context : - ?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option + ?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header added by [trace] and [with_]. *) - val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t + val set_trace_context : Otel.Scope.t -> Request.t -> Request.t (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used by [trace] and [with_]. *) @@ -88,7 +88,7 @@ end = struct let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent" - let set_trace_context (scope : Otel.Trace.scope) req = + let set_trace_context (scope : Otel.Scope.t) req = let module Traceparent = Otel.Trace_context.Traceparent in let headers = Header.add (Request.headers req) header_x_ocaml_otel_traceparent @@ -146,7 +146,7 @@ end = struct f req) end -let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = +let client ?(scope : Otel.Scope.t option) (module C : Cohttp_lwt.S.Client) = let module Traced = struct open Lwt.Syntax @@ -170,7 +170,7 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = let attrs = attrs_for ~uri ~meth () in trace_id, parent, attrs - let add_traceparent (scope : Otel.Trace.scope) headers = + let add_traceparent (scope : Otel.Scope.t) headers = let module Traceparent = Otel.Trace_context.Traceparent in let headers = match headers with diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index c2fde0d0..f79afc2d 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -1,5 +1,7 @@ (** Opentelemetry types and instrumentation *) +module Thread_local = Thread_local + module Lock = Lock (** Global lock *) @@ -482,6 +484,48 @@ end = struct default_span_event ~time_unix_nano ~name ~attributes:attrs () end +(** {2 Scopes} *) + +(** Scopes. + + A scope is a trace ID and the span ID of the currently active span. +*) +module Scope = struct + type t = { + trace_id: Trace_id.t; + span_id: Span_id.t; + mutable events: Event.t list; + mutable attrs: key_value list; + } + + (** Add an event to the scope. It will be aggregated into the span. + + Note that this takes a function that produces an event, and will only + call it if there is an instrumentation backend. *) + let[@inline] add_event (scope : t) (ev : unit -> Event.t) : unit = + if Collector.has_backend () then scope.events <- ev () :: scope.events + + (** Add an attr to the scope. It will be aggregated into the span. + + Note that this takes a function that produces attributes, and will only + call it if there is an instrumentation backend. *) + let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit = + if Collector.has_backend () then + scope.attrs <- List.rev_append (attrs ()) scope.attrs +end + +(* now. We use thread local storage to store the currently active scope, + if any. *) +open struct + let global_scope : Scope.t Thread_local.t = Thread_local.create () + + (* access global scope if [scope=None] *) + let get_scope ?scope () = + match scope with + | Some _ -> scope + | None -> Thread_local.get global_scope +end + (** Span Link A pointer from the current span to another span in the same trace or in a @@ -648,29 +692,17 @@ module Trace = struct let rs = make_resource_spans ?service_name ?attrs spans in Collector.send_trace [ rs ] ~ret:(fun () -> ()) - type scope = { + type scope = Scope.t = { trace_id: Trace_id.t; span_id: Span_id.t; mutable events: Event.t list; mutable attrs: Span.key_value list; } - (** Scope to be used with {!with_}. *) + [@@deprecated "use Scope.t"] - (** Add an event to the scope. It will be aggregated into the span. + let add_event = Scope.add_event [@@deprecated "use Scope.add_event"] - Note that this takes a function that produces an event, and will only - call it if there is an instrumentation backend. *) - let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit = - if Collector.has_backend () then scope.events <- ev () :: scope.events - - (** Add an attr to the scope. It will be aggregated into the span. - - Note that this takes a function that produces attributes, and will only - call it if there is an instrumentation backend. *) - let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) : - unit = - if Collector.has_backend () then - scope.attrs <- List.rev_append (attrs ()) scope.attrs + let add_attrs = Scope.add_attrs [@@deprecated "use Scope.add_attrs"] (** Sync span guard. @@ -678,7 +710,8 @@ module Trace = struct cause deadlocks. *) let with_ ?trace_state ?service_name ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope - ?links name (f : scope -> 'a) : 'a = + ?links name (f : Scope.t -> 'a) : 'a = + let scope = get_scope ?scope () in let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id @@ -694,7 +727,8 @@ module Trace = struct let start_time = Timestamp_ns.now_unix_ns () in let span_id = Span_id.create () in let scope = { trace_id; span_id; events = []; attrs } in - + (* set global scope in this thread *) + Thread_local.with_ global_scope scope @@ fun _sc -> (* called once we're done, to emit a span *) let finally res = let status = diff --git a/src/thread_local.ml b/src/thread_local.ml new file mode 100644 index 00000000..d7b0c296 --- /dev/null +++ b/src/thread_local.ml @@ -0,0 +1,71 @@ +module A = Atomic + +type key = int + +let get_key_ () : key = Thread.id (Thread.self ()) + +module Key_map_ = Map.Make (struct + type t = key + + let compare : t -> t -> int = compare +end) + +type 'a t = 'a ref Key_map_.t A.t + +let create () : _ t = A.make Key_map_.empty + +let get_exn (self : _ t) = + let m = A.get self in + let key = get_key_ () in + !(Key_map_.find key m) + +let[@inline] get self = try Some (get_exn self) with Not_found -> None + +let[@inline] get_or ~default self = try get_exn self with Not_found -> default + +(* remove reference for the key *) +let[@inline] remove_ref_ self key : unit = + while + let m = A.get self in + let m' = Key_map_.remove key m in + not (A.compare_and_set self m m') + do + () + done + +(* get or associate a reference to [key], and return it. + Also return a function to remove the reference if we just created it. *) +let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option = + try + let r = Key_map_.find key (A.get self) in + let old = !r in + r := v; + r, Some old + with Not_found -> + let r = ref v in + while + let m = A.get self in + let m' = Key_map_.add key r m in + not (A.compare_and_set self m m') + do + () + done; + r, None + +let with_ self v f = + let key = get_key_ () in + let r, old = get_or_create_ref_ self key ~v in + + let restore_ () : unit = + match old with + | None -> remove_ref_ self key + | Some old -> r := old + in + + try + let res = f old in + restore_ (); + res + with e -> + (restore_ [@inlined]) (); + raise e diff --git a/src/thread_local.mli b/src/thread_local.mli new file mode 100644 index 00000000..ecd162f2 --- /dev/null +++ b/src/thread_local.mli @@ -0,0 +1,21 @@ +(** Thread/Domain local storage + + This allows the creation of global state that is per-domain or per-thread. +*) + +type 'a t + +val create : unit -> 'a t +(** Create new storage *) + +val get : 'a t -> 'a option +(** Get current value *) + +val get_exn : 'a t -> 'a +(** Like {!get} but fails with an exception + @raise Not_found if no value was found *) + +val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b +(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where + [prev] is the value currently in [var] (if any), and + then restores the old value of [var] for this thread. *)