From b76c90b7850a1b570db5cc7c98572e1fd605230d Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 3 Dec 2025 13:59:00 -0500 Subject: [PATCH] feat: opentelemetry.util with various utilities --- src/util/alist.ml | 28 +++++++++++++++++ src/util/alist.mli | 14 +++++++++ src/util/dune | 6 ++++ src/util/interval_limiter.ml | 18 +++++++++++ src/util/interval_limiter.mli | 9 ++++++ src/util/rpool.ml | 59 +++++++++++++++++++++++++++++++++++ src/util/rpool.mli | 27 ++++++++++++++++ src/util/util_bytes_.ml | 49 +++++++++++++++++++++++++++++ src/util/util_mutex.ml | 12 +++++++ src/util/util_mutex.mli | 1 + 10 files changed, 223 insertions(+) create mode 100644 src/util/alist.ml create mode 100644 src/util/alist.mli create mode 100644 src/util/dune create mode 100644 src/util/interval_limiter.ml create mode 100644 src/util/interval_limiter.mli create mode 100644 src/util/rpool.ml create mode 100644 src/util/rpool.mli create mode 100644 src/util/util_bytes_.ml create mode 100644 src/util/util_mutex.ml create mode 100644 src/util/util_mutex.mli diff --git a/src/util/alist.ml b/src/util/alist.ml new file mode 100644 index 00000000..356f2630 --- /dev/null +++ b/src/util/alist.ml @@ -0,0 +1,28 @@ +module Atomic = Opentelemetry_atomic.Atomic + +type 'a t = 'a list Atomic.t + +let make () = Atomic.make [] + +let[@inline] is_empty self : bool = + match Atomic.get self with + | [] -> true + | _ :: _ -> false + +let get = Atomic.get + +let add self x = + while + let old = Atomic.get self in + let l' = x :: old in + not (Atomic.compare_and_set self old l') + do + () + done + +let rec pop_all self = + let l = Atomic.get self in + if Atomic.compare_and_set self l [] then + l + else + pop_all self diff --git a/src/util/alist.mli b/src/util/alist.mli new file mode 100644 index 00000000..832e3c2e --- /dev/null +++ b/src/util/alist.mli @@ -0,0 +1,14 @@ +(** Atomic list *) + +type 'a t + +val get : 'a t -> 'a list +(** Snapshot *) + +val is_empty : _ t -> bool + +val make : unit -> 'a t + +val add : 'a t -> 'a -> unit + +val pop_all : 'a t -> 'a list diff --git a/src/util/dune b/src/util/dune new file mode 100644 index 00000000..1348fe68 --- /dev/null +++ b/src/util/dune @@ -0,0 +1,6 @@ +(library + (name opentelemetry_util) + (public_name opentelemetry.util) + (flags :standard -open Opentelemetry_atomic) + (libraries opentelemetry.atomic mtime mtime.clock.os) + (synopsis "Utilities for opentelemetry")) diff --git a/src/util/interval_limiter.ml b/src/util/interval_limiter.ml new file mode 100644 index 00000000..456de0f6 --- /dev/null +++ b/src/util/interval_limiter.ml @@ -0,0 +1,18 @@ +type t = { + min_interval: Mtime.span; + last: Mtime.t Atomic.t; +} + +let create ~min_interval () : t = + { min_interval; last = Atomic.make Mtime.min_stamp } + +let make_attempt (self : t) : bool = + let now = Mtime_clock.now () in + let last = Atomic.get self.last in + let elapsed = Mtime.span last now in + if Mtime.Span.compare elapsed self.min_interval >= 0 then + (* attempts succeeds, unless another thread updated [self.last] + in the mean time *) + Atomic.compare_and_set self.last last now + else + false diff --git a/src/util/interval_limiter.mli b/src/util/interval_limiter.mli new file mode 100644 index 00000000..b07f7c68 --- /dev/null +++ b/src/util/interval_limiter.mli @@ -0,0 +1,9 @@ +type t + +val create : min_interval:Mtime.span -> unit -> t + +val make_attempt : t -> bool +(** [make_attempt lim] returns [true] if the last successful attempt was more + than [min_interval] ago, as measured by mtime. If so, this counts as the new + latest attempt; otherwise [false] is returned and the state is not updated. +*) diff --git a/src/util/rpool.ml b/src/util/rpool.ml new file mode 100644 index 00000000..833ccaef --- /dev/null +++ b/src/util/rpool.ml @@ -0,0 +1,59 @@ +module A = Atomic + +type 'a list_ = + | Nil + | Cons of int * 'a * 'a list_ + +type 'a t = { + mk_item: unit -> 'a; + clear: 'a -> unit; + max_size: int; (** Max number of items *) + items: 'a list_ A.t; +} + +let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t = + { mk_item; clear; max_size; items = A.make Nil } + +let rec acquire self = + match A.get self.items with + | Nil -> self.mk_item () + | Cons (_, x, tl) as l -> + if A.compare_and_set self.items l tl then + x + else + acquire self + +let[@inline] size_ = function + | Cons (sz, _, _) -> sz + | Nil -> 0 + +let release self x : unit = + let rec loop () = + match A.get self.items with + | Cons (sz, _, _) when sz >= self.max_size -> + (* forget the item *) + () + | l -> + if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then + loop () + in + + self.clear x; + loop () + +let with_resource (self : _ t) f = + let x = acquire self in + try + let res = f x in + release self x; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + release self x; + Printexc.raise_with_backtrace e bt + +module Raw = struct + let release = release + + let acquire = acquire +end diff --git a/src/util/rpool.mli b/src/util/rpool.mli new file mode 100644 index 00000000..4a80e115 --- /dev/null +++ b/src/util/rpool.mli @@ -0,0 +1,27 @@ +(** Simple resource pool. + + This is intended for buffers, protobuf encoders, etc. *) + +type 'a t +(** Pool of values of type ['a] *) + +val create : + ?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t +(** Create a new pool. + @param mk_item produce a new item in case the pool is empty + @param max_size + maximum number of item in the pool before we start dropping resources on + the floor. This controls resource consumption. + @param clear a function called on items before recycling them. *) + +val with_resource : 'a t -> ('a -> 'b) -> 'b +(** [with_resource pool f] runs [f x] with [x] a resource; when [f] fails or + returns, [x] is returned to the pool for future reuse. *) + +(** Low level control over the pool. This is easier to get wrong (e.g. releasing + the same resource twice) so use with caution. *) +module Raw : sig + val acquire : 'a t -> 'a + + val release : 'a t -> 'a -> unit +end diff --git a/src/util/util_bytes_.ml b/src/util/util_bytes_.ml new file mode 100644 index 00000000..91e65a52 --- /dev/null +++ b/src/util/util_bytes_.ml @@ -0,0 +1,49 @@ +open struct + let spf = Printf.sprintf +end + +let int_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + +let bytes_to_hex_into b res off : unit = + for i = 0 to Bytes.length b - 1 do + let n = Char.code (Bytes.get b i) in + Bytes.set res ((2 * i) + off) (int_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1 + off) (int_to_hex (n land 0x0f)) + done + +let bytes_to_hex (b : bytes) : string = + let res = Bytes.create (2 * Bytes.length b) in + bytes_to_hex_into b res 0; + Bytes.unsafe_to_string res + +let int_of_hex = function + | '0' .. '9' as c -> Char.code c - Char.code '0' + | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' + | c -> raise (Invalid_argument (spf "invalid hex char: %C" c)) + +let bytes_of_hex_substring (s : string) off len = + if len mod 2 <> 0 then + raise (Invalid_argument "hex sequence must be of even length"); + let res = Bytes.make (len / 2) '\x00' in + for i = 0 to (len / 2) - 1 do + let n1 = int_of_hex (String.get s (off + (2 * i))) in + let n2 = int_of_hex (String.get s (off + (2 * i) + 1)) in + let n = (n1 lsl 4) lor n2 in + Bytes.set res i (Char.chr n) + done; + res + +let bytes_of_hex (s : string) : bytes = + bytes_of_hex_substring s 0 (String.length s) + +let bytes_non_zero (self : bytes) : bool = + try + for i = 0 to Bytes.length self - 1 do + if Char.code (Bytes.unsafe_get self i) <> 0 then raise_notrace Exit + done; + false + with Exit -> true diff --git a/src/util/util_mutex.ml b/src/util/util_mutex.ml new file mode 100644 index 00000000..ab7e48a7 --- /dev/null +++ b/src/util/util_mutex.ml @@ -0,0 +1,12 @@ +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *) +(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +let[@inline never] protect m f = + Mutex.lock m; + match f () with + | x -> + Mutex.unlock m; + x + | exception e -> + (* NOTE: [unlock] does not poll for asynchronous exceptions *) + Mutex.unlock m; + Printexc.raise_with_backtrace e (Printexc.get_raw_backtrace ()) diff --git a/src/util/util_mutex.mli b/src/util/util_mutex.mli new file mode 100644 index 00000000..feccf59f --- /dev/null +++ b/src/util/util_mutex.mli @@ -0,0 +1 @@ +val protect : Mutex.t -> (unit -> 'a) -> 'a