diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 507bc845..3e9be01e 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -94,6 +94,7 @@ end = struct stop: bool Atomic.t; cleaned: bool Atomic.t; (** True when we cleaned up after closing *) config: Config.t; + encoder_pool: Pbrt.Encoder.t Rpool.t; send_q: To_send.t Sync_queue.t; (** Queue for the send worker threads *) traces: Proto.Trace.span Batch.t; logs: Proto.Logs.log_record Batch.t; @@ -173,7 +174,11 @@ end = struct Self_trace.with_ ~kind:Span_kind_producer name ~attrs:[ "n", `Int (List.length signals) ] in - let msg = conv signals in + let msg : string = + (* borrow encoder from buffer pool and turn [signals] into bytes *) + let@ encoder = Rpool.with_resource self.encoder_pool in + conv ?encoder:(Some encoder) signals + in ignore (Atomic.fetch_and_add n_bytes_sent (String.length msg) : int); send_http_ ~stop ~config ~url client msg in @@ -199,12 +204,19 @@ end = struct let create_state ~stop ~config () : state = let n_send_threads = max 2 config.Config.bg_threads in + let encoder_pool = + Rpool.create + ~mk_item:(fun () -> Pbrt.Encoder.create ~size:1024 ()) + ~max_size:32 ~clear:Pbrt.Encoder.reset () + in + let self = { stop; config; send_threads = [||]; send_q = Sync_queue.create (); + encoder_pool; cleaned = Atomic.make false; traces = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); logs = Batch.make ~batch:batch_max_size_ ~timeout:batch_timeout_ (); @@ -239,21 +251,28 @@ end = struct in To_send.Send_trace [ traces ]) + let[@inline] push_to_batch b e = + if e <> [] then ( + match Batch.push b e with + | `Ok -> () + | `Dropped -> Atomic.incr n_dropped + ) + let create ~stop ~config () : #t = let open Opentelemetry_util in let st = create_state ~stop ~config () in let ticker = Cb_set.create () in object (self : #t) method send_trace spans = - Batch.push' st.traces spans; + push_to_batch st.traces spans; maybe_send_traces st ~force:false method send_metrics m = - Batch.push' st.metrics m; + push_to_batch st.metrics m; maybe_send_metrics st ~force:false method send_logs m = - Batch.push' st.logs m; + push_to_batch st.logs m; maybe_send_logs st ~force:false method add_on_tick_callback cb = Cb_set.register ticker cb