From ced8dd421f7d7bbd5c1e18ea400646e072f2bc4a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 3 Dec 2025 15:08:40 -0500 Subject: [PATCH] feat client: various changes --- src/client/batch.ml | 2 + src/client/batch.mli | 3 + src/client/client.ml | 8 --- src/client/{config.ml => client_config.ml} | 0 src/client/{config.mli => client_config.mli} | 0 src/client/common_.ml | 2 + src/client/rpool.ml | 59 +++++++++++++++++++ src/client/rpool.mli | 27 +++++++++ src/client/self_trace.ml | 3 +- src/client/stdout_exporter.ml | 7 ++- src/client/sync_queue.ml | 59 +++++++++++++++++++ src/client/sync_queue.mli | 24 ++++++++ ..._resource_builder.ml => util_resources.ml} | 0 13 files changed, 182 insertions(+), 12 deletions(-) delete mode 100644 src/client/client.ml rename src/client/{config.ml => client_config.ml} (100%) rename src/client/{config.mli => client_config.mli} (100%) create mode 100644 src/client/rpool.ml create mode 100644 src/client/rpool.mli create mode 100644 src/client/sync_queue.ml create mode 100644 src/client/sync_queue.mli rename src/client/{signal_resource_builder.ml => util_resources.ml} (100%) diff --git a/src/client/batch.ml b/src/client/batch.ml index e508c09f..1fc4aaa9 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -85,3 +85,5 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = push_unprotected self ~elems; `Ok ) + +let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) diff --git a/src/client/batch.mli b/src/client/batch.mli index a7ed2aa9..c3b6f7e1 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -50,3 +50,6 @@ val push : 'a t -> 'a list -> [ `Dropped | `Ok ] (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch [b], or [`Dropped] if the current size of the batch has exceeded the high water mark determined by the [batch] argument to [{!make}]. ) *) + +val push' : 'a t -> 'a list -> unit +(** Like {!push} but ignores the result *) diff --git a/src/client/client.ml b/src/client/client.ml deleted file mode 100644 index fa69c983..00000000 --- a/src/client/client.ml +++ /dev/null @@ -1,8 +0,0 @@ -(** Utilities for writing clients - - These are used for implementing e.g., the [opentelemetry-client-cohttp-lwt] - and [opentelemetry-client-ocurl] packages package. *) - -module Config = Config -module Signal = Signal -module Self_trace = Self_trace diff --git a/src/client/config.ml b/src/client/client_config.ml similarity index 100% rename from src/client/config.ml rename to src/client/client_config.ml diff --git a/src/client/config.mli b/src/client/client_config.mli similarity index 100% rename from src/client/config.mli rename to src/client/client_config.mli diff --git a/src/client/common_.ml b/src/client/common_.ml index 9ee9cf28..b1872cd8 100644 --- a/src/client/common_.ml +++ b/src/client/common_.ml @@ -1,4 +1,6 @@ module OTEL = Opentelemetry module Proto = Opentelemetry_proto +let spf = Printf.sprintf + let ( let@ ) = ( @@ ) diff --git a/src/client/rpool.ml b/src/client/rpool.ml new file mode 100644 index 00000000..833ccaef --- /dev/null +++ b/src/client/rpool.ml @@ -0,0 +1,59 @@ +module A = Atomic + +type 'a list_ = + | Nil + | Cons of int * 'a * 'a list_ + +type 'a t = { + mk_item: unit -> 'a; + clear: 'a -> unit; + max_size: int; (** Max number of items *) + items: 'a list_ A.t; +} + +let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t = + { mk_item; clear; max_size; items = A.make Nil } + +let rec acquire self = + match A.get self.items with + | Nil -> self.mk_item () + | Cons (_, x, tl) as l -> + if A.compare_and_set self.items l tl then + x + else + acquire self + +let[@inline] size_ = function + | Cons (sz, _, _) -> sz + | Nil -> 0 + +let release self x : unit = + let rec loop () = + match A.get self.items with + | Cons (sz, _, _) when sz >= self.max_size -> + (* forget the item *) + () + | l -> + if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then + loop () + in + + self.clear x; + loop () + +let with_resource (self : _ t) f = + let x = acquire self in + try + let res = f x in + release self x; + res + with e -> + let bt = Printexc.get_raw_backtrace () in + release self x; + Printexc.raise_with_backtrace e bt + +module Raw = struct + let release = release + + let acquire = acquire +end diff --git a/src/client/rpool.mli b/src/client/rpool.mli new file mode 100644 index 00000000..4a80e115 --- /dev/null +++ b/src/client/rpool.mli @@ -0,0 +1,27 @@ +(** Simple resource pool. + + This is intended for buffers, protobuf encoders, etc. *) + +type 'a t +(** Pool of values of type ['a] *) + +val create : + ?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t +(** Create a new pool. + @param mk_item produce a new item in case the pool is empty + @param max_size + maximum number of item in the pool before we start dropping resources on + the floor. This controls resource consumption. + @param clear a function called on items before recycling them. *) + +val with_resource : 'a t -> ('a -> 'b) -> 'b +(** [with_resource pool f] runs [f x] with [x] a resource; when [f] fails or + returns, [x] is returned to the pool for future reuse. *) + +(** Low level control over the pool. This is easier to get wrong (e.g. releasing + the same resource twice) so use with caution. *) +module Raw : sig + val acquire : 'a t -> 'a + + val release : 'a t -> 'a -> unit +end diff --git a/src/client/self_trace.ml b/src/client/self_trace.ml index 46757302..52d8b2f4 100644 --- a/src/client/self_trace.ml +++ b/src/client/self_trace.ml @@ -8,9 +8,10 @@ let dummy_trace_id_ = OT.Trace_id.dummy let dummy_span_id = OT.Span_id.dummy +(* FIXME: get an explicit tracer instead *) let with_ ?kind ?attrs name f = if Atomic.get enabled then - OT.Trace.with_ ?kind ?attrs name f + OT.Tracer.with_ ?kind ?attrs name f else ( (* A new scope is needed here because it might be modified *) let scope = diff --git a/src/client/stdout_exporter.ml b/src/client/stdout_exporter.ml index ac0b0af9..05122369 100644 --- a/src/client/stdout_exporter.ml +++ b/src/client/stdout_exporter.ml @@ -28,10 +28,11 @@ open struct end class stdout : OTEL.Exporter.t = + let open Opentelemetry_util in let out = Format.std_formatter in let mutex = Mutex.create () in - let ticker = Tick_callbacks.create () in + let tick_cbs = Cb_set.create () in object method send_trace l = pp_vlist mutex pp_span out l @@ -39,9 +40,9 @@ class stdout : OTEL.Exporter.t = method send_logs l = pp_vlist mutex Proto.Logs.pp_log_record out l - method tick () = Tick_callbacks.tick ticker + method tick () = Cb_set.trigger tick_cbs - method add_on_tick_callback cb = Tick_callbacks.on_tick ticker cb + method add_on_tick_callback cb = Cb_set.register tick_cbs cb method cleanup ~on_done () = on_done () end diff --git a/src/client/sync_queue.ml b/src/client/sync_queue.ml new file mode 100644 index 00000000..10983b2f --- /dev/null +++ b/src/client/sync_queue.ml @@ -0,0 +1,59 @@ +module UM = Opentelemetry.Util_mutex + +type 'a t = { + mutex: Mutex.t; + cond: Condition.t; + q: 'a Queue.t; + mutable closed: bool; +} + +exception Closed + +let create () : _ t = + { + mutex = Mutex.create (); + cond = Condition.create (); + q = Queue.create (); + closed = false; + } + +let close (self : _ t) = + UM.protect self.mutex @@ fun () -> + if not self.closed then ( + self.closed <- true; + Condition.broadcast self.cond (* awake waiters so they fail *) + ) + +let push (self : _ t) x : unit = + UM.protect self.mutex @@ fun () -> + if self.closed then + raise Closed + else ( + Queue.push x self.q; + Condition.signal self.cond + ) + +let pop (self : 'a t) : 'a = + let rec loop () = + if self.closed then + raise Closed + else if Queue.is_empty self.q then ( + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else ( + let x = Queue.pop self.q in + x + ) + in + UM.protect self.mutex loop + +let pop_all (self : 'a t) into : unit = + let rec loop () = + if Queue.is_empty self.q then ( + if self.closed then raise Closed; + Condition.wait self.cond self.mutex; + (loop [@tailcall]) () + ) else + Queue.transfer self.q into + in + UM.protect self.mutex loop diff --git a/src/client/sync_queue.mli b/src/client/sync_queue.mli new file mode 100644 index 00000000..d64296d7 --- /dev/null +++ b/src/client/sync_queue.mli @@ -0,0 +1,24 @@ +(** Simple blocking queue *) + +type 'a t + +val create : unit -> _ t + +exception Closed + +val push : 'a t -> 'a -> unit +(** [push q x] pushes [x] into [q], and returns [()]. + @raise Closed if [close q] was previously called.*) + +val pop : 'a t -> 'a +(** [pop q] pops the next element in [q]. It might block until an element comes. + @raise Closed if the queue was closed before a new element was available. *) + +val pop_all : 'a t -> 'a Queue.t -> unit +(** [pop_all q into] pops all the elements of [q] and moves them into [into]. if + no element is available, it will block until it successfully transfers at + least one item to [into]. + @raise Closed if the queue was closed before a new element was available. *) + +val close : _ t -> unit +(** Close the queue, meaning there won't be any more [push] allowed. *) diff --git a/src/client/signal_resource_builder.ml b/src/client/util_resources.ml similarity index 100% rename from src/client/signal_resource_builder.ml rename to src/client/util_resources.ml