feat client: various changes

This commit is contained in:
Simon Cruanes 2025-12-03 15:08:40 -05:00
parent 6b6fb34342
commit ced8dd421f
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
13 changed files with 182 additions and 12 deletions

View file

@ -85,3 +85,5 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
push_unprotected self ~elems;
`Ok
)
let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ])

View file

@ -50,3 +50,6 @@ val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the
batch [b], or [`Dropped] if the current size of the batch has exceeded the
high water mark determined by the [batch] argument to [{!make}]. ) *)
val push' : 'a t -> 'a list -> unit
(** Like {!push} but ignores the result *)

View file

@ -1,8 +0,0 @@
(** Utilities for writing clients
These are used for implementing e.g., the [opentelemetry-client-cohttp-lwt]
and [opentelemetry-client-ocurl] packages package. *)
module Config = Config
module Signal = Signal
module Self_trace = Self_trace

View file

@ -1,4 +1,6 @@
module OTEL = Opentelemetry
module Proto = Opentelemetry_proto
let spf = Printf.sprintf
let ( let@ ) = ( @@ )

59
src/client/rpool.ml Normal file
View file

@ -0,0 +1,59 @@
module A = Atomic
type 'a list_ =
| Nil
| Cons of int * 'a * 'a list_
type 'a t = {
mk_item: unit -> 'a;
clear: 'a -> unit;
max_size: int; (** Max number of items *)
items: 'a list_ A.t;
}
let create ?(clear = ignore) ~mk_item ?(max_size = 512) () : _ t =
{ mk_item; clear; max_size; items = A.make Nil }
let rec acquire self =
match A.get self.items with
| Nil -> self.mk_item ()
| Cons (_, x, tl) as l ->
if A.compare_and_set self.items l tl then
x
else
acquire self
let[@inline] size_ = function
| Cons (sz, _, _) -> sz
| Nil -> 0
let release self x : unit =
let rec loop () =
match A.get self.items with
| Cons (sz, _, _) when sz >= self.max_size ->
(* forget the item *)
()
| l ->
if not (A.compare_and_set self.items l (Cons (size_ l + 1, x, l))) then
loop ()
in
self.clear x;
loop ()
let with_resource (self : _ t) f =
let x = acquire self in
try
let res = f x in
release self x;
res
with e ->
let bt = Printexc.get_raw_backtrace () in
release self x;
Printexc.raise_with_backtrace e bt
module Raw = struct
let release = release
let acquire = acquire
end

27
src/client/rpool.mli Normal file
View file

@ -0,0 +1,27 @@
(** Simple resource pool.
This is intended for buffers, protobuf encoders, etc. *)
type 'a t
(** Pool of values of type ['a] *)
val create :
?clear:('a -> unit) -> mk_item:(unit -> 'a) -> ?max_size:int -> unit -> 'a t
(** Create a new pool.
@param mk_item produce a new item in case the pool is empty
@param max_size
maximum number of item in the pool before we start dropping resources on
the floor. This controls resource consumption.
@param clear a function called on items before recycling them. *)
val with_resource : 'a t -> ('a -> 'b) -> 'b
(** [with_resource pool f] runs [f x] with [x] a resource; when [f] fails or
returns, [x] is returned to the pool for future reuse. *)
(** Low level control over the pool. This is easier to get wrong (e.g. releasing
the same resource twice) so use with caution. *)
module Raw : sig
val acquire : 'a t -> 'a
val release : 'a t -> 'a -> unit
end

View file

@ -8,9 +8,10 @@ let dummy_trace_id_ = OT.Trace_id.dummy
let dummy_span_id = OT.Span_id.dummy
(* FIXME: get an explicit tracer instead *)
let with_ ?kind ?attrs name f =
if Atomic.get enabled then
OT.Trace.with_ ?kind ?attrs name f
OT.Tracer.with_ ?kind ?attrs name f
else (
(* A new scope is needed here because it might be modified *)
let scope =

View file

@ -28,10 +28,11 @@ open struct
end
class stdout : OTEL.Exporter.t =
let open Opentelemetry_util in
let out = Format.std_formatter in
let mutex = Mutex.create () in
let ticker = Tick_callbacks.create () in
let tick_cbs = Cb_set.create () in
object
method send_trace l = pp_vlist mutex pp_span out l
@ -39,9 +40,9 @@ class stdout : OTEL.Exporter.t =
method send_logs l = pp_vlist mutex Proto.Logs.pp_log_record out l
method tick () = Tick_callbacks.tick ticker
method tick () = Cb_set.trigger tick_cbs
method add_on_tick_callback cb = Tick_callbacks.on_tick ticker cb
method add_on_tick_callback cb = Cb_set.register tick_cbs cb
method cleanup ~on_done () = on_done ()
end

59
src/client/sync_queue.ml Normal file
View file

@ -0,0 +1,59 @@
module UM = Opentelemetry.Util_mutex
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
UM.protect self.mutex @@ fun () ->
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond (* awake waiters so they fail *)
)
let push (self : _ t) x : unit =
UM.protect self.mutex @@ fun () ->
if self.closed then
raise Closed
else (
Queue.push x self.q;
Condition.signal self.cond
)
let pop (self : 'a t) : 'a =
let rec loop () =
if self.closed then
raise Closed
else if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
x
)
in
UM.protect self.mutex loop
let pop_all (self : 'a t) into : unit =
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then raise Closed;
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else
Queue.transfer self.q into
in
UM.protect self.mutex loop

24
src/client/sync_queue.mli Normal file
View file

@ -0,0 +1,24 @@
(** Simple blocking queue *)
type 'a t
val create : unit -> _ t
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] into [q], and returns [()].
@raise Closed if [close q] was previously called.*)
val pop : 'a t -> 'a
(** [pop q] pops the next element in [q]. It might block until an element comes.
@raise Closed if the queue was closed before a new element was available. *)
val pop_all : 'a t -> 'a Queue.t -> unit
(** [pop_all q into] pops all the elements of [q] and moves them into [into]. if
no element is available, it will block until it successfully transfers at
least one item to [into].
@raise Closed if the queue was closed before a new element was available. *)
val close : _ t -> unit
(** Close the queue, meaning there won't be any more [push] allowed. *)