From a64565f1040d995b8bb7d8eab4dd1d8872393b73 Mon Sep 17 00:00:00 2001 From: Elliott Cable Date: Tue, 1 Aug 2023 21:03:58 +0000 Subject: [PATCH] store: Extract thread_local, abstract for lwt/eio --- dune-project | 2 + opentelemetry-lwt.opam | 1 + opentelemetry.opam | 1 + src/dune | 2 +- src/opentelemetry.ml | 8 +-- src/thread_local.ml | 100 ------------------------------ src/thread_local.mli | 27 --------- src/trace/dune | 2 +- src/trace/opentelemetry_trace.ml | 101 +++++++------------------------ 9 files changed, 30 insertions(+), 214 deletions(-) delete mode 100644 src/thread_local.ml delete mode 100644 src/thread_local.mli diff --git a/dune-project b/dune-project index 6e948d02..14d68d3f 100644 --- a/dune-project +++ b/dune-project @@ -17,6 +17,7 @@ (depends (ocaml (>= "4.08")) ptime + ambient-context (odoc :with-doc) (pbrt (>= 2.3))) (depopts @@ -29,6 +30,7 @@ (synopsis "Lwt-compatible instrumentation for https://opentelemetry.io") (depends (ocaml (>= "4.08")) + ambient-context (opentelemetry (= :version)) (cohttp-lwt-unix :with-test) (odoc :with-doc) diff --git a/opentelemetry-lwt.opam b/opentelemetry-lwt.opam index 9740df2a..dc5f90d5 100644 --- a/opentelemetry-lwt.opam +++ b/opentelemetry-lwt.opam @@ -11,6 +11,7 @@ bug-reports: "https://github.com/imandra-ai/ocaml-opentelemetry/issues" depends: [ "dune" {>= "2.7"} "ocaml" {>= "4.08"} + "ambient-context" "opentelemetry" {= version} "cohttp-lwt-unix" {with-test} "odoc" {with-doc} diff --git a/opentelemetry.opam b/opentelemetry.opam index d7cd5e08..e13b4a10 100644 --- a/opentelemetry.opam +++ b/opentelemetry.opam @@ -12,6 +12,7 @@ depends: [ "dune" {>= "2.7"} "ocaml" {>= "4.08"} "ptime" + "ambient-context" "odoc" {with-doc} "pbrt" {>= "2.3"} ] diff --git a/src/dune b/src/dune index ea4b4070..8faa01ee 100644 --- a/src/dune +++ b/src/dune @@ -2,7 +2,7 @@ (name opentelemetry) (synopsis "API for opentelemetry instrumentation") (flags :standard -warn-error -a+8) - (libraries ptime ptime.clock.os pbrt threads opentelemetry.atomic) + (libraries ambient-context ptime ptime.clock.os pbrt threads opentelemetry.atomic) (public_name opentelemetry)) ; ### protobuf rules ### diff --git a/src/opentelemetry.ml b/src/opentelemetry.ml index 38dc60e4..60d374fa 100644 --- a/src/opentelemetry.ml +++ b/src/opentelemetry.ml @@ -1,7 +1,5 @@ (** Opentelemetry types and instrumentation *) -module Thread_local = Thread_local - module Lock = Lock (** Global lock. *) @@ -522,7 +520,7 @@ module Scope = struct (**/**) - let _global_scope : t Thread_local.t = Thread_local.create () + let _ambient_scope : t Ambient_context.key = Ambient_context.create_key () (**/**) @@ -530,12 +528,12 @@ module Scope = struct let get_surrounding ?scope () : t option = match scope with | Some _ -> scope - | None -> Thread_local.get _global_scope + | None -> Ambient_context.get _ambient_scope (** [with_scope sc f] calls [f()] in a context where [sc] is the (thread)-local scope, then reverts to the previous local scope, if any. *) let[@inline] with_scope (sc : t) (f : unit -> 'a) : 'a = - Thread_local.with_ _global_scope sc (fun _ -> f ()) + Ambient_context.with_binding _ambient_scope sc (fun _ -> f ()) end open struct diff --git a/src/thread_local.ml b/src/thread_local.ml deleted file mode 100644 index 4c4eadfb..00000000 --- a/src/thread_local.ml +++ /dev/null @@ -1,100 +0,0 @@ -module A = Opentelemetry_atomic.Atomic - -type key = int - -let[@inline] get_key_ () : key = Thread.id (Thread.self ()) - -module Key_map_ = Map.Make (struct - type t = key - - let compare : t -> t -> int = compare -end) - -type 'a t = 'a ref Key_map_.t A.t -(** The TLS variable is made of a global atomic reference - (which has very low contention: it's modified only when a - thread is started/stopped). - - Inside that atomic variable, is a map from thread ID to a mutable [ref] - holding the actual data. Because this [ref] is only ever accessed - by the thread with this given ID, it's safe to modify. *) - -let create () : _ t = A.make Key_map_.empty - -let[@inline] get_exn (self : _ t) = - let m = A.get self in - let key = get_key_ () in - !(Key_map_.find key m) - -let[@inline] get self = try Some (get_exn self) with Not_found -> None - -(* remove reference for the key *) -let remove_ref_ self key : unit = - while - let m = A.get self in - let m' = Key_map_.remove key m in - not (A.compare_and_set self m m') - do - Thread.yield () - done - -let set_ref_ self key (r : _ ref) : unit = - while - let m = A.get self in - let m' = Key_map_.add key r m in - not (A.compare_and_set self m m') - do - Thread.yield () - done - -(* get or associate a reference to [key], and return it. - Also return a function to remove the reference if we just created it. *) -let get_or_create_ref_ (self : _ t) key ~v : _ ref * _ option = - try - let r = Key_map_.find key (A.get self) in - let old = !r in - r := v; - r, Some old - with Not_found -> - let r = ref v in - set_ref_ self key r; - r, None - -let set (self : _ t) v : unit = - let key = get_key_ () in - let _, _ = get_or_create_ref_ self key ~v in - () - -let remove (self : _ t) : unit = - let key = get_key_ () in - remove_ref_ self key - -let get_or_create ~create (self : 'a t) : 'a = - let key = get_key_ () in - try - let r = Key_map_.find key (A.get self) in - !r - with Not_found -> - Gc.finalise (fun _ -> remove_ref_ self key) (Thread.self ()); - let v = create () in - let r = ref v in - set_ref_ self key r; - v - -let with_ self v f = - let key = get_key_ () in - let r, old = get_or_create_ref_ self key ~v in - - let restore_ () : unit = - match old with - | None -> remove_ref_ self key - | Some old -> r := old - in - - try - let res = f old in - restore_ (); - res - with e -> - restore_ (); - raise e diff --git a/src/thread_local.mli b/src/thread_local.mli deleted file mode 100644 index 7a33b709..00000000 --- a/src/thread_local.mli +++ /dev/null @@ -1,27 +0,0 @@ -(** Thread/Domain local storage - - This allows the creation of global state that is per-domain or per-thread. -*) - -type 'a t - -val create : unit -> 'a t -(** Create new storage *) - -val get : 'a t -> 'a option -(** Get current value *) - -val get_exn : 'a t -> 'a -(** Like {!get} but fails with an exception - @raise Not_found if no value was found *) - -val set : 'a t -> 'a -> unit - -val remove : _ t -> unit - -val get_or_create : create:(unit -> 'a) -> 'a t -> 'a - -val with_ : 'a t -> 'a -> ('a option -> 'b) -> 'b -(** [with_ var x f] sets [var] to [x] for this thread, calls [f prev] where - [prev] is the value currently in [var] (if any), and - then restores the old value of [var] for this thread. *) diff --git a/src/trace/dune b/src/trace/dune index 8391f103..69d9e44f 100644 --- a/src/trace/dune +++ b/src/trace/dune @@ -4,4 +4,4 @@ (public_name opentelemetry.trace) (synopsis "Use opentelemetry as a collector for trace") (optional) - (libraries trace opentelemetry)) + (libraries ambient-context trace opentelemetry)) diff --git a/src/trace/opentelemetry_trace.ml b/src/trace/opentelemetry_trace.ml index 2670429c..5f0d2c24 100644 --- a/src/trace/opentelemetry_trace.ml +++ b/src/trace/opentelemetry_trace.ml @@ -1,38 +1,4 @@ module Otel = Opentelemetry -module TLS = Otel.Thread_local - -type span = Trace.span - -(** Table indexed by Trace spans *) -module Span_tbl = Hashtbl.Make (struct - include Int64 - - let hash : t -> int = Hashtbl.hash -end) - -(** Per-thread set of active spans. *) -module Active_spans = struct - type span_begin = { - span_id: Otel.Span_id.t; - start_time: int64; - name: string; - data: (string * Trace.user_data) list; - __FILE__: string; - __LINE__: int; - new_scope: Otel.Scope.t; - old_scope: Otel.Scope.t option; - } - (** Information we get at the beginning of the span *) - - type t = { tbl: span_begin Span_tbl.t } [@@unboxed] - (** Storage for active spans *) - - let create () : t = { tbl = Span_tbl.create 8 } - - let tls : t TLS.t = TLS.create () - - let[@inline] get () : t = TLS.get_or_create tls ~create -end let conv_span_to_i64 (id : Otel.Span_id.t) : int64 = let bs = Otel.Span_id.to_bytes id in @@ -47,7 +13,7 @@ let span_of_i64 (id : int64) : Otel.Span_id.t = let collector () : Trace.collector = let module M = struct - let enter_span ?__FUNCTION__:_ ~__FILE__ ~__LINE__ ~data name : span = + let with_span ~__FUNCTION__:_ ~__FILE__ ~__LINE__ ~data name cb = let span_id = Otel.Span_id.create () in let span = conv_span_to_i64 span_id in @@ -63,57 +29,32 @@ let collector () : Trace.collector = let new_scope = { Otel.Scope.span_id; trace_id; events = []; attrs = [] } in - TLS.set Otel.Scope._global_scope new_scope; - let active_spans = Active_spans.get () in - Span_tbl.add active_spans.tbl span - { - span_id; - start_time; - __FILE__; - __LINE__; - old_scope; - new_scope; - name; - data; - }; + Ambient_context.with_binding Otel.Scope._ambient_scope new_scope + @@ fun () -> + let rv = cb span in - span + let end_time = Otel.Timestamp_ns.now_unix_ns () in - let exit_span (span : span) : unit = - let active_spans = Active_spans.get () in - match Span_tbl.find_opt active_spans.tbl span with - | None -> () (* TODO: log warning *) - | Some - { - span_id; - start_time; - name; - __FILE__; - __LINE__; - new_scope; - old_scope; - data; - } -> - let end_time = Otel.Timestamp_ns.now_unix_ns () in - - (* restore previous scope *) - (match old_scope with - | None -> TLS.remove Otel.Scope._global_scope - | Some sc -> TLS.set Otel.Scope._global_scope sc); - - let o_span : Otel.Span.t = - let attrs = - [ "file", `String __FILE__; "line", `Int __LINE__ ] @ data - in - Otel.Span.create ~trace_id:new_scope.trace_id ~id:span_id ~start_time - ~end_time ~attrs name - |> fst + let o_span : Otel.Span.t = + let attrs = + [ "file", `String __FILE__; "line", `Int __LINE__ ] @ data in + Otel.Span.create ~trace_id:new_scope.trace_id ~id:span_id ~start_time + ~end_time ~attrs name + |> fst + in - Otel.Trace.emit [ o_span ]; + Otel.Trace.emit [ o_span ]; - () + rv + + let enter_explicit_span ~surrounding:_ ?__FUNCTION__:_ ~__FILE__:_ + ~__LINE__:_ ~data:_ _name : Trace.explicit_span = + failwith "nyi" + + let exit_explicit_span _sp = + failwith "nyi" let message ?span ~data:_ msg : unit = (* gather information from context *)