feat: use a pbrt encoder pool in client-ocurl

This commit is contained in:
Simon Cruanes 2025-12-03 15:36:10 -05:00
parent 6f96d5271a
commit 723b523af5
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -94,6 +94,7 @@ end = struct
stop: bool Atomic.t; stop: bool Atomic.t;
cleaned: bool Atomic.t; (** True when we cleaned up after closing *) cleaned: bool Atomic.t; (** True when we cleaned up after closing *)
config: Config.t; config: Config.t;
encoder_pool: Pbrt.Encoder.t Rpool.t;
send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *) send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *)
traces: Proto.Trace.span Batch.t; traces: Proto.Trace.span Batch.t;
logs: Proto.Logs.log_record Batch.t; logs: Proto.Logs.log_record Batch.t;
@ -173,7 +174,11 @@ end = struct
Self_trace.with_ ~kind:Span_kind_producer name Self_trace.with_ ~kind:Span_kind_producer name
~attrs:[ "n", `Int (List.length signals) ] ~attrs:[ "n", `Int (List.length signals) ]
in in
let msg = conv signals in let msg : string =
(* borrow encoder from buffer pool and turn [signals] into bytes *)
let@ encoder = Rpool.with_resource self.encoder_pool in
conv ?encoder:(Some encoder) signals
in
ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int);
send_http_ ~stop ~config ~url client msg send_http_ ~stop ~config ~url client msg
in in
@ -199,12 +204,19 @@ end = struct
let create_state ~stop ~config () : state = let create_state ~stop ~config () : state =
let n_send_threads = max 2 config.Config.bg_threads in let n_send_threads = max 2 config.Config.bg_threads in
let encoder_pool =
Rpool.create
~mk_item:(fun () -> Pbrt.Encoder.create ~size:1024 ())
~max_size:32 ~clear:Pbrt.Encoder.reset ()
in
let self = let self =
{ {
stop; stop;
config; config;
send_threads = [||]; send_threads = [||];
send_q = Sync_queue.create (); send_q = Sync_queue.create ();
encoder_pool;
cleaned = Atomic.make false; cleaned = Atomic.make false;
traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ ();
@ -239,21 +251,28 @@ end = struct
in in
To_send.Send_trace [ traces ]) To_send.Send_trace [ traces ])
let[@inline] push_to_batch b e =
if e <> [] then (
match Batch.push b e with
| `Ok -> ()
| `Dropped -> Atomic.incr n_dropped
)
let create ~stop ~config () : #t = let create ~stop ~config () : #t =
let open Opentelemetry_util in let open Opentelemetry_util in
let st = create_state ~stop ~config () in let st = create_state ~stop ~config () in
let ticker = Cb_set.create () in let ticker = Cb_set.create () in
object (self : #t) object (self : #t)
method send_trace spans = method send_trace spans =
Batch.push' st.traces spans; push_to_batch st.traces spans;
maybe_send_traces st ~force:false maybe_send_traces st ~force:false
method send_metrics m = method send_metrics m =
Batch.push' st.metrics m; push_to_batch st.metrics m;
maybe_send_metrics st ~force:false maybe_send_metrics st ~force:false
method send_logs m = method send_logs m =
Batch.push' st.logs m; push_to_batch st.logs m;
maybe_send_logs st ~force:false maybe_send_logs st ~force:false
method add_on_tick_callback cb = Cb_set.register ticker cb method add_on_tick_callback cb = Cb_set.register ticker cb