diff --git a/src/dune b/src/dune index aef121fa..268cf4cb 100644 --- a/src/dune +++ b/src/dune @@ -2,7 +2,7 @@ (name opentelemetry) (synopsis "API for opentelemetry instrumentation") (flags :standard -warn-error -a+8) - (libraries ptime ptime.clock.os pbrt threads) + (libraries ptime ptime.clock.os pbrt threads opentelemetry.atomic) (public_name opentelemetry)) ; ### protobuf rules ### diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index db33c233..4938cd47 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -45,12 +45,12 @@ module Server : sig *) val get_trace_context : - ?from:[ `Internal | `External ] -> Request.t -> Otel.Trace.scope option + ?from:[ `Internal | `External ] -> Request.t -> Otel.Scope.t option (** Get the tracing scope from the custom [x-ocaml-otel-traceparent] header added by [trace] and [with_]. *) - val set_trace_context : Otel.Trace.scope -> Request.t -> Request.t + val set_trace_context : Otel.Scope.t -> Request.t -> Request.t (** Set the tracing scope in the custom [x-ocaml-otel-traceparent] header used by [trace] and [with_]. *) @@ -88,7 +88,7 @@ end = struct let header_x_ocaml_otel_traceparent = "x-ocaml-otel-traceparent" - let set_trace_context (scope : Otel.Trace.scope) req = + let set_trace_context (scope : Otel.Scope.t) req = let module Traceparent = Otel.Trace_context.Traceparent in let headers = Header.add (Request.headers req) header_x_ocaml_otel_traceparent @@ -146,7 +146,7 @@ end = struct f req) end -let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = +let client ?(scope : Otel.Scope.t option) (module C : Cohttp_lwt.S.Client) = let module Traced = struct open Lwt.Syntax @@ -170,7 +170,7 @@ let client ?(scope : Otel.Trace.scope option) (module C : Cohttp_lwt.S.Client) = let attrs = attrs_for ~uri ~meth () in trace_id, parent, attrs - let add_traceparent (scope : Otel.Trace.scope) headers = + let add_traceparent (scope : Otel.Scope.t) headers = let module Traceparent = Otel.Trace_context.Traceparent in let headers = match headers with diff --git a/src/lwt/opentelemetry_lwt.ml b/src/lwt/opentelemetry_lwt.ml index 37fadef6..0bb93bc6 100644 --- a/src/lwt/opentelemetry_lwt.ml +++ b/src/lwt/opentelemetry_lwt.ml @@ -15,7 +15,7 @@ module Trace = struct (** Sync span guard *) let with_ ?trace_state ?service_name ?(attrs = []) ?kind ?trace_id ?parent - ?scope ?links name (f : Trace.scope -> 'a Lwt.t) : 'a Lwt.t = + ?scope ?links name (f : Scope.t -> 'a Lwt.t) : 'a Lwt.t = let trace_id = match trace_id, scope with | Some trace_id, _ -> trace_id diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index c2fde0d0..40c4fa98 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -1,5 +1,7 @@ (** Opentelemetry types and instrumentation *) +module Thread_local = Thread_local + module Lock = Lock (** Global lock *) @@ -482,6 +484,53 @@ end = struct default_span_event ~time_unix_nano ~name ~attributes:attrs () end +(** {2 Scopes} *) + +(** Scopes. + + A scope is a trace ID and the span ID of the currently active span. +*) +module Scope = struct + type t = { + trace_id: Trace_id.t; + span_id: Span_id.t; + mutable events: Event.t list; + mutable attrs: key_value list; + } + + (** Add an event to the scope. It will be aggregated into the span. + + Note that this takes a function that produces an event, and will only + call it if there is an instrumentation backend. *) + let[@inline] add_event (scope : t) (ev : unit -> Event.t) : unit = + if Collector.has_backend () then scope.events <- ev () :: scope.events + + (** Add an attr to the scope. It will be aggregated into the span. + + Note that this takes a function that produces attributes, and will only + call it if there is an instrumentation backend. *) + let[@inline] add_attrs (scope : t) (attrs : unit -> key_value list) : unit = + if Collector.has_backend () then + scope.attrs <- List.rev_append (attrs ()) scope.attrs + + (**/**) + + (* define this locally *) + let _global_scope : t Thread_local.t = Thread_local.create () + + (**/**) + + (** Obtain current scope from thread-local storage, if available *) + let get_surrounding ?scope () : t option = + match scope with + | Some _ -> scope + | None -> Thread_local.get _global_scope +end + +open struct + let get_surrounding_scope = Scope.get_surrounding +end + (** Span Link A pointer from the current span to another span in the same trace or in a @@ -648,45 +697,45 @@ module Trace = struct let rs = make_resource_spans ?service_name ?attrs spans in Collector.send_trace [ rs ] ~ret:(fun () -> ()) - type scope = { + type scope = Scope.t = { trace_id: Trace_id.t; span_id: Span_id.t; mutable events: Event.t list; mutable attrs: Span.key_value list; } - (** Scope to be used with {!with_}. *) + [@@deprecated "use Scope.t"] - (** Add an event to the scope. It will be aggregated into the span. + let add_event = Scope.add_event [@@deprecated "use Scope.add_event"] - Note that this takes a function that produces an event, and will only - call it if there is an instrumentation backend. *) - let[@inline] add_event (scope : scope) (ev : unit -> Event.t) : unit = - if Collector.has_backend () then scope.events <- ev () :: scope.events - - (** Add an attr to the scope. It will be aggregated into the span. - - Note that this takes a function that produces attributes, and will only - call it if there is an instrumentation backend. *) - let[@inline] add_attrs (scope : scope) (attrs : unit -> Span.key_value list) : - unit = - if Collector.has_backend () then - scope.attrs <- List.rev_append (attrs ()) scope.attrs + let add_attrs = Scope.add_attrs [@@deprecated "use Scope.add_attrs"] (** Sync span guard. + @param force_new_trace_id if true (default false), the span will not use a + surrounding context, or [scope], or [trace_id], but will always + create a fresh new trace ID. + {b NOTE} be careful not to call this inside a Gc alarm, as it can cause deadlocks. *) - let with_ ?trace_state ?service_name + let with_ ?(force_new_trace_id = false) ?trace_state ?service_name ?(attrs : (string * [< value ]) list = []) ?kind ?trace_id ?parent ?scope - ?links name (f : scope -> 'a) : 'a = + ?links name (f : Scope.t -> 'a) : 'a = + let scope = + if force_new_trace_id then + None + else + get_surrounding_scope ?scope () + in let trace_id = match trace_id, scope with + | _ when force_new_trace_id -> Trace_id.create () | Some trace_id, _ -> trace_id | None, Some scope -> scope.trace_id | None, None -> Trace_id.create () in let parent = match parent, scope with + | _ when force_new_trace_id -> None | Some span_id, _ -> Some span_id | None, Some scope -> Some scope.span_id | None, None -> None @@ -694,7 +743,8 @@ module Trace = struct let start_time = Timestamp_ns.now_unix_ns () in let span_id = Span_id.create () in let scope = { trace_id; span_id; events = []; attrs } in - + (* set global scope in this thread *) + Thread_local.with_ Scope._global_scope scope @@ fun _sc -> (* called once we're done, to emit a span *) let finally res = let status = diff --git a/src/thread_local.ml b/src/thread_local.ml new file mode 100644 index 00000000..06d54bcb --- /dev/null +++ b/src/thread_local.ml @@ -0,0 +1,71 @@ +module A = Opentelemetry_atomic.Atomic + +type key = int + +let get_key_ () : key = Thread.id (Thread.self ()) + +module Key_map_ = Map.Make (struct + type t = key + + let compare : t -> t -> int = compare +end) + +type 'a t = 'a ref Key_map_.t A.t + +let create () : _ t = A.make Key_map_.empty + +let get_exn (self : _ t) = + let m = A.get self in + let key = get_key_ () in + !(Key_map_.find key m) + +let[@inline] get self = try Some (get_exn self) with Not_found -> None + +let[@inline] get_or ~default self = try get_exn self with Not_found -> default + +(* remove reference for the key *) +let[@inline] remove_ref_ self key : unit = + while + let m = A.get self in + let m' = Key_map_.remove key m in + not (A.compare_and_set self m m') + do + () + done + +(* get or associate a reference to [key], and return it. + Also return a function to remove the reference if we just created it. *) +let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option = + try + let r = Key_map_.find key (A.get self) in + let old = !r in + r := v; + r, Some old + with Not_found -> + let r = ref v in + while + let m = A.get self in + let m' = Key_map_.add key r m in + not (A.compare_and_set self m m') + do + () + done; + r, None + +let with_ self v f = + let key = get_key_ () in + let r, old = get_or_create_ref_ self key ~v in + + let restore_ () : unit = + match old with + | None -> remove_ref_ self key + | Some old -> r := old + in + + try + let res = f old in + restore_ (); + res + with e -> + restore_ (); + raise e diff --git a/src/thread_local.mli b/src/thread_local.mli new file mode 100644 index 00000000..ecd162f2 --- /dev/null +++ b/src/thread_local.mli @@ -0,0 +1,21 @@ +(** Thread/Domain local storage + + This allows the creation of global state that is per-domain or per-thread. +*) + +type 'a t + +val create : unit -> 'a t +(** Create new storage *) + +val get : 'a t -> 'a option +(** Get current value *) + +val get_exn : 'a t -> 'a +(** Like {!get} but fails with an exception + @raise Not_found if no value was found *) + +val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b +(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where + [prev] is the value currently in [var] (if any), and + then restores the old value of [var] for this thread. *) diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index d5196413..faaeca12 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -23,16 +23,17 @@ let run_job () = let@ () = Fun.protect ~finally:(fun () -> Atomic.set stop true) in let i = ref 0 in while not @@ Atomic.get stop do - let@ scope = + let@ _scope = Atomic.incr num_tr; T.Trace.with_ ~kind:T.Span.Span_kind_producer "loop.outer" ~attrs:[ "i", `Int !i ] in for j = 0 to 4 do + (* parent scope is found via thread local storage *) let@ scope = Atomic.incr num_tr; - T.Trace.with_ ~kind:T.Span.Span_kind_internal ~scope + T.Trace.with_ ~kind:T.Span.Span_kind_internal ~attrs:[ "j", `Int j ] "loop.inner" in