refactor thoroughly ambient-context

we have a new explicit `Storage.t` interface, that can be used to
get a `Context.t` (a hmap) and to locally swap it; then we have multiple
implementations of the Storage; and then we have a singleton atomic
containing the "main" storage.
This commit is contained in:
Simon Cruanes 2025-12-04 00:23:23 -05:00
parent a33c57a46e
commit e79df14a90
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
23 changed files with 264 additions and 304 deletions

View file

@ -0,0 +1,55 @@
(** Extremely basic storage using a map from thread id to context *)
open Opentelemetry_ambient_context_core
open struct
module Atomic = Opentelemetry_atomic.Atomic
module Int_map = Map.Make (struct
type t = int
let compare : t -> t -> int = Stdlib.compare
end)
type st = { m: Context.t ref Int_map.t Atomic.t } [@@unboxed]
let get (self : st) : Context.t =
let tid = Thread.id @@ Thread.self () in
match Int_map.find tid (Atomic.get self.m) with
| exception Not_found -> Context.empty
| ctx_ref -> !ctx_ref
let with_context (self : st) ctx f =
let tid = Thread.id @@ Thread.self () in
let ctx_ref =
try Int_map.find tid (Atomic.get self.m)
with Not_found ->
let r = ref Context.empty in
while
let m = Atomic.get self.m in
let m' = Int_map.add tid r m in
not (Atomic.compare_and_set self.m m m')
do
()
done;
r
in
let old_ctx = !ctx_ref in
ctx_ref := ctx;
let finally () = ctx_ref := old_ctx in
Fun.protect ~finally f
end
let create_storage () : Storage.t =
let st = { m = Atomic.make Int_map.empty } in
{
name = "basic-map";
get_context = (fun () -> get st);
with_context = (fun ctx f -> with_context st ctx f);
}
(** Default storage *)
let storage : Storage.t = create_storage ()

View file

@ -0,0 +1,7 @@
type t = Hmap.t
type 'a key = 'a Hmap.key
let empty : t = Hmap.empty
let[@inline] new_key () : _ key = Hmap.Key.create ()

View file

@ -0,0 +1,5 @@
(library
(name opentelemetry_ambient_context_core)
(public_name opentelemetry.ambient-context.core)
(synopsis "Core definitions for ambient-context")
(libraries hmap))

View file

@ -0,0 +1,44 @@
(** Storage implementation.
There is a singleton storage for a given program, responsible for providing
ambient context to the rest of the program. *)
type t = {
name: string;
get_context: unit -> Context.t;
with_context: 'a. Context.t -> (unit -> 'a) -> 'a;
}
(** Storage type *)
(** Name of the storage implementation. *)
let[@inline] name self = self.name
(** Get the context from the current storage, or [Hmap.empty] if there is no
ambient context. *)
let[@inline] get_context self = self.get_context ()
(** [with_context storage ctx f] calls [f()] in an ambient context in which
[get_context()] will return [ctx]. Once [f()] returns, the storage is reset
to its previous value. *)
let[@inline] with_context self ctx f = self.with_context ctx f
(** Get the ambient context and then look up [k] in it *)
let[@inline] get self (k : 'a Context.key) : 'a option =
Hmap.find k (get_context self)
(** [with_key_bound_to storage k v f] calls [f()] in a context updated to have
[k] map to [v]. *)
let with_key_bound_to self k v f =
let ctx = get_context self in
let new_ctx = Hmap.add k v ctx in
self.with_context new_ctx f
(** [with_key_unbound storage k f] calls [f()] in a context updated to have [k]
bound to no value. *)
let with_key_unbound self k f =
let ctx = get_context self in
if Hmap.mem k ctx then (
let new_ctx = Hmap.rem k ctx in
self.with_context new_ctx f
) else
f ()

View file

@ -0,0 +1 @@
let storage = Basic_map.storage

View file

@ -0,0 +1,2 @@
val storage : Storage.t
(** Default storage. *)

View file

@ -0,0 +1 @@
let storage = Opentelemetry_ambient_context_tls.storage

View file

@ -0,0 +1,46 @@
(* TODO: conditional compilation, and use Basic_map in each DLS *)
(** Storage using DLS. *)
open Opentelemetry_ambient_context_core
open struct
module DLS = Domain.DLS
module Int_map = Map.Make (struct
type t = int
let compare : t -> t -> int = Stdlib.compare
end)
(* key used to access the context *)
let dls_k_context : Context.t ref Int_map.t DLS.key =
DLS.new_key
~split_from_parent:(fun _ -> Int_map.empty)
(fun _ -> Int_map.empty)
let dls_get () =
let tid = Thread.id @@ Thread.self () in
let map_ref = DLS.get dls_k_context in
try !(Int_map.find tid map_ref) with Not_found -> Hmap.empty
let dls_with ctx f =
let tid = Thread.id @@ Thread.self () in
let map = DLS.get dls_k_context in
let ctx_ref =
try Int_map.find tid map
with Not_found ->
let r = ref Context.empty in
DLS.set dls_k_context (Int_map.add tid r map);
r
in
let old_ctx = !ctx_ref in
ctx_ref := ctx;
let finally () = ctx_ref := old_ctx in
Fun.protect ~finally f
end
let storage : Storage.t =
{ name = "dls-int-map"; get_context = dls_get; with_context = dls_with }

View file

@ -3,13 +3,19 @@
(public_name opentelemetry.ambient-context) (public_name opentelemetry.ambient-context)
(synopsis (synopsis
"Abstraction over thread-local storage and fiber-local storage mechanisms") "Abstraction over thread-local storage and fiber-local storage mechanisms")
(private_modules hmap_key_) (flags
:standard
-open
Opentelemetry_ambient_context_core
-open
Opentelemetry_atomic)
(libraries (libraries
thread-local-storage hmap
threads
atomic atomic
opentelemetry.ambient-context.types opentelemetry.ambient-context.core
opentelemetry.atomic
(select (select
hmap_key_.ml default_.ml
from from
(-> hmap_key_.new.ml)))) (opentelemetry.ambient-context.tls -> default_.tls.ml)
(-> default_.map.ml))))

View file

@ -4,4 +4,4 @@
(synopsis (synopsis
"Storage backend for ambient-context using Eio's fibre-local storage") "Storage backend for ambient-context using Eio's fibre-local storage")
(optional) ; eio (optional) ; eio
(libraries eio hmap opentelemetry.ambient-context thread-local-storage)) (libraries eio hmap opentelemetry.ambient-context.core))

View file

@ -1,39 +1,15 @@
open Opentelemetry_ambient_context_core
module Fiber = Eio.Fiber module Fiber = Eio.Fiber
open struct open struct
let _internal_key : Hmap.t Fiber.key = Fiber.create_key () let fiber_context_key : Context.t Fiber.key = Fiber.create_key ()
let ( let* ) = Option.bind
end end
module M = struct let storage : Storage.t =
let name = "Storage_eio" {
name = "eio";
let[@inline] get_map () = Fiber.get _internal_key get_context =
(fun () ->
let[@inline] with_map m cb = Fiber.with_binding _internal_key m cb Fiber.get fiber_context_key |> Option.value ~default:Hmap.empty);
with_context = (fun ctx f -> Fiber.with_binding fiber_context_key ctx f);
let create_key = Hmap.Key.create }
let get k =
let* context = get_map () in
Hmap.find k context
let with_binding k v cb =
let new_context =
match get_map () with
| None -> Hmap.singleton k v
| Some old_context -> Hmap.add k v old_context
in
with_map new_context cb
let without_binding k cb =
let new_context =
match get_map () with
| None -> Hmap.empty
| Some old_context -> Hmap.rem k old_context
in
with_map new_context cb
end
let storage () : Opentelemetry_ambient_context.storage = (module M)

View file

@ -1,2 +0,0 @@
val storage : unit -> Opentelemetry_ambient_context.storage
(** Storage using Eio's fibers local storage *)

View file

@ -1 +0,0 @@
let key : Hmap.t Thread_local_storage.t = Thread_local_storage.create ()

View file

@ -4,4 +4,4 @@
(optional) ; lwt (optional) ; lwt
(synopsis (synopsis
"Storage backend for ambient-context using Lwt's sequence-associated storage") "Storage backend for ambient-context using Lwt's sequence-associated storage")
(libraries lwt opentelemetry.ambient-context thread-local-storage)) (libraries lwt opentelemetry.ambient-context.core))

View file

@ -1,37 +1,15 @@
(** Storage using Lwt keys *)
open Opentelemetry_ambient_context_core
open struct open struct
let _internal_key : Hmap.t Lwt.key = Lwt.new_key () let lwt_context_key : Context.t Lwt.key = Lwt.new_key ()
let ( let* ) = Option.bind
end end
module M = struct let storage : Storage.t =
let name = "Storage_lwt" {
name = "lwt";
let[@inline] get_map () = Lwt.get _internal_key get_context =
(fun () -> Lwt.get lwt_context_key |> Option.value ~default:Hmap.empty);
let[@inline] with_map m cb = Lwt.with_value _internal_key (Some m) cb with_context = (fun ctx f -> Lwt.with_value lwt_context_key (Some ctx) f);
}
let create_key = Hmap.Key.create
let get k =
let* context = get_map () in
Hmap.find k context
let with_binding k v cb =
let new_context =
match get_map () with
| None -> Hmap.singleton k v
| Some old_context -> Hmap.add k v old_context
in
with_map new_context cb
let without_binding k cb =
let new_context =
match get_map () with
| None -> Hmap.empty
| Some old_context -> Hmap.rem k old_context
in
with_map new_context cb
end
let storage () : Opentelemetry_ambient_context.storage = (module M)

View file

@ -1,2 +0,0 @@
val storage : unit -> Opentelemetry_ambient_context.storage
(** Storage using Lwt keys *)

View file

@ -1,124 +1,49 @@
module TLS = Thread_local_storage include Opentelemetry_ambient_context_core
include Opentelemetry_ambient_context_types
type 'a key = int * 'a Hmap.key let default_storage = Default_.storage
let debug = open struct
match Sys.getenv_opt "OCAML_AMBIENT_CONTEXT_DEBUG" with (** The current ambient-context storage. *)
| Some ("1" | "true") -> true let cur_storage : Storage.t Atomic.t = Atomic.make Default_.storage
| _ -> false
let _debug_id_ = Atomic.make 0
let[@inline] generate_debug_id () = Atomic.fetch_and_add _debug_id_ 1
let compare_key : int -> int -> int = Stdlib.compare
module Storage_tls_hmap = struct
let[@inline] ( let* ) o f =
match o with
| None -> None
| Some x -> f x
let key : Hmap.t TLS.t = Hmap_key_.key
let name = "Storage_tls"
let[@inline] get_map () = TLS.get_opt key
let[@inline] with_map m cb =
let old = TLS.get_opt key |> Option.value ~default:Hmap.empty in
TLS.set key m;
Fun.protect ~finally:(fun () -> TLS.set key old) cb
let create_key = Hmap.Key.create
let get k =
let* context = get_map () in
Hmap.find k context
let with_binding k v cb =
let new_context =
match get_map () with
| None -> Hmap.singleton k v
| Some old_context -> Hmap.add k v old_context
in
with_map new_context @@ fun _context -> cb ()
let without_binding k cb =
match get_map () with
| None -> cb ()
| Some old_context ->
let new_context = Hmap.rem k old_context in
with_map new_context @@ fun _context -> cb ()
end end
let default_storage : storage = (module Storage_tls_hmap) let[@inline] get_current_storage () = Atomic.get cur_storage
let k_current_storage : storage TLS.t = TLS.create () (* NOTE: we can't really "map" each local context from the old
to the new. Maybe the old storage is TLS based and the new one
is per-lwt-task. *)
let set_current_storage (storage : Storage.t) = Atomic.set cur_storage storage
let get_current_storage () = (** {2 Functions operating with the current storage} *)
match TLS.get_exn k_current_storage with
| v -> v
| exception TLS.Not_set ->
let v = default_storage in
TLS.set k_current_storage v;
v
let create_key () = (** Get the context from the current storage, or [Hmap.empty] if there is no
let (module Store : STORAGE) = get_current_storage () in ambient context. *)
if not debug then let[@inline] get_context () = Storage.get_context (Atomic.get cur_storage)
0, Store.create_key ()
else (
let id = generate_debug_id () in
Printf.printf "%s: create_key %i\n%!" Store.name id;
id, Store.create_key ()
)
let get (id, k) = (** [with_context ctx f] calls [f()] in an ambient context in which
let (module Store : STORAGE) = get_current_storage () in [get_context()] will return [ctx]. Once [f()] returns, the storage is reset
if not debug then to its previous value. *)
Store.get k let[@inline] with_context ctx f =
else ( Storage.with_context (Atomic.get cur_storage) ctx f
let rv = Store.get k in
(match rv with
| Some _ -> Printf.printf "%s: get %i -> Some\n%!" Store.name id
| None -> Printf.printf "%s: get %i -> None\n%!" Store.name id);
rv
)
let with_binding : 'a key -> 'a -> (unit -> 'r) -> 'r = (** Get the ambient context and then look up [k] in it *)
fun (id, k) v cb -> let[@inline] get (k : 'a Context.key) : 'a option = Hmap.find k (get_context ())
let (module Store : STORAGE) = get_current_storage () in
if not debug then
Store.with_binding k v cb
else (
Printf.printf "%s: with_binding %i enter\n%!" Store.name id;
let rv = Store.with_binding k v cb in
Printf.printf "%s: with_binding %i exit\n%!" Store.name id;
rv
)
let without_binding (id, k) cb = (** [with_key_bound_to storage k v f] calls [f()] in a context updated to have
let (module Store : STORAGE) = get_current_storage () in [k] map to [v]. *)
if not debug then let with_key_bound_to k v f =
Store.without_binding k cb let storage = get_current_storage () in
else ( let ctx = Storage.get_context storage in
Printf.printf "%s: without_binding %i enter\n%!" Store.name id; let new_ctx = Hmap.add k v ctx in
let rv = Store.without_binding k cb in Storage.with_context storage new_ctx f
Printf.printf "%s: without_binding %i exit\n%!" Store.name id;
rv
)
let set_storage_provider store_new = (** [with_key_unbound k f] calls [f()] in a context updated to have [k] bound to
let store_before = get_current_storage () in no value. *)
if store_new == store_before then let with_key_unbound k f =
() let storage = Atomic.get cur_storage in
else let ctx = Storage.get_context storage in
TLS.set k_current_storage store_new; if Hmap.mem k ctx then (
if debug then ( let new_ctx = Hmap.rem k ctx in
let (module Store_before : STORAGE) = store_before in Storage.with_context storage new_ctx f
let (module Store_new : STORAGE) = store_new in ) else
Printf.printf "set_storage_provider %s (previously %s)\n%!" Store_new.name f ()
Store_before.name
)

View file

@ -1,55 +0,0 @@
(** Ambient context.
The ambient context, like the Matrix, is everywhere around you.
It is responsible for keeping track of that context in a manner that's
consistent with the program's choice of control flow paradigm:
- for synchronous/threaded/direct style code, {b TLS} ("thread local
storage") keeps track of a global variable per thread. Each thread has its
own copy of the variable and updates it independently of other threads.
- for Lwt, any ['a Lwt.t] created inside the [with_binding k v (fun _ -> )]
will inherit the [k := v] assignment.
- for Eio, fibers created inside [with_binding k v (fun () -> )] will
inherit the [k := v] assignment. This is consistent with the structured
concurrency approach of Eio.
The only data stored by this storage is a {!Hmap.t}, ie a heterogeneous map.
Various users (libraries, user code, etc.) can create their own {!key} to
store what they are interested in, without affecting other parts of the
storage. *)
module Types := Opentelemetry_ambient_context_types
module type STORAGE = Types.STORAGE
type storage = (module STORAGE)
val default_storage : storage
val get_current_storage : unit -> storage
val set_storage_provider : storage -> unit
type 'a key
(** A key that can be mapped to values of type ['a] in the ambient context. *)
val compare_key : int -> int -> int
(** Total order on keys *)
val create_key : unit -> 'a key
(** Create a new fresh key, distinct from any previously created key. *)
val get : 'a key -> 'a option
(** Get the current value for a given key, or [None] if no value was associated
with the key in the ambient context. *)
val with_binding : 'a key -> 'a -> (unit -> 'r) -> 'r
(** [with_binding k v cb] calls [cb()] in a context in which [k] is bound to
[v]. This does not affect storage outside of [cb()]. *)
val without_binding : 'a key -> (unit -> 'b) -> 'b
(** [without_binding k cb] calls [cb()] in a context where [k] has no binding
(possibly shadowing the current ambient binding of [k] if it exists). *)

View file

@ -0,0 +1,6 @@
(library
(name opentelemetry_ambient_context_tls)
(public_name opentelemetry.ambient-context.tls)
(synopsis "Implementation of ambient-context from thread-local-storage")
(optional) ; TLS
(libraries opentelemetry.ambient-context.core thread-local-storage))

View file

@ -0,0 +1,23 @@
open Opentelemetry_ambient_context_core
open struct
module TLS = Thread_local_storage
(* key used to access the context *)
let tls_k_context : Context.t TLS.t = TLS.create ()
end
let storage : Storage.t =
{
name = "tls";
get_context =
(fun () -> try TLS.get_exn tls_k_context with TLS.Not_set -> Hmap.empty);
with_context =
(fun ctx f ->
let old =
try TLS.get_exn tls_k_context with TLS.Not_set -> Hmap.empty
in
let finally () = TLS.set tls_k_context old in
TLS.set tls_k_context ctx;
Fun.protect ~finally f);
}

View file

@ -1,4 +0,0 @@
(library
(name opentelemetry_ambient_context_types)
(public_name opentelemetry.ambient-context.types)
(libraries hmap thread-local-storage))

View file

@ -1,19 +0,0 @@
type 'a key = 'a Hmap.key
module type STORAGE = sig
val name : string
val get_map : unit -> Hmap.t option
val with_map : Hmap.t -> (unit -> 'b) -> 'b
val create_key : unit -> 'a key
val get : 'a key -> 'a option
val with_binding : 'a key -> 'a -> (unit -> 'b) -> 'b
val without_binding : 'a key -> (unit -> 'b) -> 'b
end
type storage = (module STORAGE)

View file

@ -1,32 +0,0 @@
(** Storage implementation.
There is a singleton storage for a given program, responsible for providing
ambient context to the rest of the program. *)
type 'a key = 'a Hmap.key
module type STORAGE = sig
val name : string
(** Name of the storage implementation. *)
val get_map : unit -> Hmap.t option
(** Get the hmap from the current ambient context, or [None] if there is no
ambient context. *)
val with_map : Hmap.t -> (unit -> 'b) -> 'b
(** [with_hmap h cb] calls [cb()] in an ambient context in which [get_map()]
will return [h]. Once [cb()] returns, the storage is reset to its previous
value. *)
val create_key : unit -> 'a key
(** Create a new storage key, guaranteed to be distinct from any previously
created key. *)
val get : 'a key -> 'a option
val with_binding : 'a key -> 'a -> (unit -> 'b) -> 'b
val without_binding : 'a key -> (unit -> 'b) -> 'b
end
type storage = (module STORAGE)