From cfe6e8bca30d30e8294093b1407a4d39cfa75100 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 17 Feb 2026 16:51:34 -0500 Subject: [PATCH] various fixes --- .../opentelemetry_client_cohttp_eio.ml | 14 +++-- .../opentelemetry_client_ocurl.ml | 4 -- .../opentelemetry_client_ocurl.mli | 3 - src/client/batch.ml | 56 +++++++++++-------- src/client/batch.mli | 3 + src/client/generic_consumer.ml | 6 +- src/client/generic_consumer_exporter.ml | 6 +- src/client/generic_http_consumer.ml | 7 +-- src/client/generic_notifier.ml | 2 +- src/client/lwt/notifier_lwt.ml | 11 +++- src/client/sampler.ml | 15 +++-- src/client/sync/bounded_queue_sync.ml | 3 +- src/client/sync/notifier_sync.ml | 6 +- src/core/span.ml | 2 + src/core/span_ctx.ml | 6 +- .../cohttp/opentelemetry_cohttp_lwt.ml | 13 +++-- src/lib/metrics_callbacks.ml | 7 ++- tests/bin/emit1.ml | 2 - 18 files changed, 105 insertions(+), 61 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index 22cd88b8..9844f582 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -38,17 +38,19 @@ struct { mutex = Eio.Mutex.create (); cond = Eio.Condition.create () } let trigger self = - (* FIXME: this might be triggered from other threads!! how do we - ensure it runs in the Eio thread? *) + (* Eio.Condition.broadcast is lock-free since eio 0.8 (ocaml-multicore/eio#397) + and safe to call from other threads/domains and signal handlers. *) Eio.Condition.broadcast self.cond let delete self = trigger self; () - let wait self = + let wait self ~should_keep_waiting = Eio.Mutex.lock self.mutex; - Eio.Condition.await self.cond self.mutex; + while should_keep_waiting () do + Eio.Condition.await self.cond self.mutex + done; Eio.Mutex.unlock self.mutex (** Ensure we get signalled when the queue goes from empty to non-empty *) @@ -111,7 +113,9 @@ struct in Error err | Ok (resp, body) -> - let body = Eio.Buf_read.(parse_exn take_all) body ~max_size:max_int in + let body = + Eio.Buf_read.(parse_exn take_all) body ~max_size:(10 * 1024 * 1024) + in let code = Response.status resp |> Code.code_of_status in if not (Code.is_error code) then ( match decode with diff --git a/src/client-ocurl/opentelemetry_client_ocurl.ml b/src/client-ocurl/opentelemetry_client_ocurl.ml index 96699948..473c467d 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.ml +++ b/src/client-ocurl/opentelemetry_client_ocurl.ml @@ -8,8 +8,6 @@ module OTELC = Opentelemetry_client module OTEL = Opentelemetry open Common_ -let n_bytes_sent : int Atomic.t = Atomic.make 0 - type error = OTELC.Export_error.t open struct @@ -138,5 +136,3 @@ let with_setup ?after_shutdown ?(config : Config.t = Config.make ()) Fun.protect f ~finally:(fun () -> shutdown_and_wait ?after_shutdown exp) ) else f () - -let[@inline] n_bytes_sent () = Atomic.get n_bytes_sent diff --git a/src/client-ocurl/opentelemetry_client_ocurl.mli b/src/client-ocurl/opentelemetry_client_ocurl.mli index ee779256..be4797fd 100644 --- a/src/client-ocurl/opentelemetry_client_ocurl.mli +++ b/src/client-ocurl/opentelemetry_client_ocurl.mli @@ -5,9 +5,6 @@ module Config = Config -val n_bytes_sent : unit -> int -(** Global counter of bytes sent (or attempted to be sent) *) - val consumer : ?config:Config.t -> unit -> Opentelemetry_client.Consumer.any_signal_l_builder (** Consumer that pulls from a queue *) diff --git a/src/client/batch.ml b/src/client/batch.ml index c1c70884..b791b926 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -11,6 +11,7 @@ type 'a t = { batch: int; (** Minimum size to batch before popping *) high_watermark: int; (** Size above which we start dropping signals *) timeout: Mtime.span option; + n_dropped: int Atomic.t; } let max_batch_size = 100_000 @@ -45,6 +46,7 @@ let make ?(batch = 100) ?high_watermark ?mtime ?timeout () : _ t = batch; timeout; high_watermark; + n_dropped = Atomic.make 0; } let timeout_expired_ ~mtime ~timeout (self : _ state) : bool = @@ -94,35 +96,43 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = if elems = [] then `Ok else ( - let now = lazy (Mtime_clock.now ()) in - Util_atomic.update_cas self.st @@ fun state -> - if state.size >= self.high_watermark then - ( (* drop this to prevent queue from growing too fast *) - `Dropped, - state ) - else ( - let start = - if state.size = 0 && Option.is_some self.timeout then - Lazy.force now - else - state.start - in + let now = Mtime_clock.now () in + let res = + Util_atomic.update_cas self.st @@ fun state -> + if state.size >= self.high_watermark then + ( (* drop this to prevent queue from growing too fast *) + `Dropped, + state ) + else ( + let start = + if state.size = 0 && Option.is_some self.timeout then + now + else + state.start + in - (* add to queue *) - let state = - { - size = state.size + List.length elems; - q = List.rev_append elems state.q; - start; - } - in + (* add to queue *) + let state = + { + size = state.size + List.length elems; + q = List.rev_append elems state.q; + start; + } + in - `Ok, state - ) + `Ok, state + ) + in + (match res with + | `Dropped -> Atomic.incr self.n_dropped + | `Ok -> ()); + res ) let[@inline] push' self elems = ignore (push self elems : [ `Dropped | `Ok ]) +let[@inline] n_dropped self = Atomic.get self.n_dropped + module Internal_ = struct let mtime_dummy_ = mtime_dummy_ end diff --git a/src/client/batch.mli b/src/client/batch.mli index dc8e69a2..baa94068 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -56,6 +56,9 @@ val push' : 'a t -> 'a list -> unit val cur_size : _ t -> int (** Number of elements in the current batch *) +val n_dropped : _ t -> int +(** Number of elements dropped because the batch exceeded its high watermark *) + (**/**) module Internal_ : sig diff --git a/src/client/generic_consumer.ml b/src/client/generic_consumer.ml index 8489ffd9..1c4553a7 100644 --- a/src/client/generic_consumer.ml +++ b/src/client/generic_consumer.ml @@ -155,7 +155,11 @@ end = struct shutdown_worker self; IO.return () | Active -> - let* () = Notifier.wait self.notify in + let* () = + Notifier.wait self.notify ~should_keep_waiting:(fun () -> + Bounded_queue.Recv.size self.q = 0 + && Atomic.get self.status = Active) + in loop ()) in diff --git a/src/client/generic_consumer_exporter.ml b/src/client/generic_consumer_exporter.ml index 790462b0..b939dc20 100644 --- a/src/client/generic_consumer_exporter.ml +++ b/src/client/generic_consumer_exporter.ml @@ -87,7 +87,11 @@ end = struct shutdown_worker self; IO.return () | Active -> - let* () = Notifier.wait self.notify in + let* () = + Notifier.wait self.notify ~should_keep_waiting:(fun () -> + Bounded_queue.Recv.size self.q = 0 + && Atomic.get self.status = Active) + in loop ()) in diff --git a/src/client/generic_http_consumer.ml b/src/client/generic_http_consumer.ml index 35273945..dd1cdd8d 100644 --- a/src/client/generic_http_consumer.ml +++ b/src/client/generic_http_consumer.ml @@ -2,9 +2,6 @@ open Common_ type error = Export_error.t -(** Number of errors met during export *) -let n_errors = Atomic.make 0 - module type IO = Generic_io.S_WITH_CONCURRENCY module type HTTPC = sig @@ -133,8 +130,8 @@ end = struct let consumer ?override_n_workers ~ticker_task ~(config : Http_config.t) () : Consumer.any_signal_l_builder = let n_workers = - min 2 - (max 500 + max 2 + (min 500 (match override_n_workers, config.http_concurrency_level with | Some n, _ -> n | None, Some n -> n diff --git a/src/client/generic_notifier.ml b/src/client/generic_notifier.ml index 17ea42a7..5b74830b 100644 --- a/src/client/generic_notifier.ml +++ b/src/client/generic_notifier.ml @@ -13,7 +13,7 @@ module type S = sig val trigger : t -> unit - val wait : t -> unit IO.t + val wait : t -> should_keep_waiting:(unit -> bool) -> unit IO.t val register_bounded_queue : t -> _ Bounded_queue.Recv.t -> unit end diff --git a/src/client/lwt/notifier_lwt.ml b/src/client/lwt/notifier_lwt.ml index 9ec178ae..1e61a124 100644 --- a/src/client/lwt/notifier_lwt.ml +++ b/src/client/lwt/notifier_lwt.ml @@ -34,7 +34,16 @@ let trigger (self : t) : unit = else if not (Atomic.exchange self.notified true) then Lwt_unix.send_notification self.notification -let wait (self : t) : unit Lwt.t = Lwt_condition.wait self.cond +let wait (self : t) ~should_keep_waiting : unit Lwt.t = + let open Lwt.Syntax in + let rec loop () = + if should_keep_waiting () then + let* () = Lwt_condition.wait self.cond in + loop () + else + Lwt.return_unit + in + loop () let register_bounded_queue (self : t) (q : _ Bounded_queue.Recv.t) : unit = Bounded_queue.Recv.on_non_empty q (fun () -> trigger self) diff --git a/src/client/sampler.ml b/src/client/sampler.ml index dabdda6d..b6241884 100644 --- a/src/client/sampler.ml +++ b/src/client/sampler.ml @@ -1,15 +1,19 @@ type t = { proba_accept: float; + rng: Random.State.t; n_seen: int Atomic.t; n_accepted: int Atomic.t; } let create ~proba_accept () : t = - (* FIXME: either create a random state and protect it, or make sure - we Random.self_init() in the current domain?? *) if proba_accept < 0. || proba_accept > 1. then invalid_arg "sampler: proba_accept must be in [0., 1.]"; - { proba_accept; n_seen = Atomic.make 0; n_accepted = Atomic.make 0 } + { + proba_accept; + rng = Random.State.make_self_init (); + n_seen = Atomic.make 0; + n_accepted = Atomic.make 0; + } let[@inline] proba_accept self = self.proba_accept @@ -25,7 +29,10 @@ let actual_rate (self : t) : float = let accept (self : t) : bool = Atomic.incr self.n_seen; - let n = Random.float 1. in + (* WARNING: Random.State.float is not safe to call concurrently on the + same state from multiple domains. If a sampler is shared across domains, + consider creating one sampler per domain. *) + let n = Random.State.float self.rng 1. in let res = n < self.proba_accept in if res then Atomic.incr self.n_accepted; diff --git a/src/client/sync/bounded_queue_sync.ml b/src/client/sync/bounded_queue_sync.ml index 7b0a2b8e..79b524ee 100644 --- a/src/client/sync/bounded_queue_sync.ml +++ b/src/client/sync/bounded_queue_sync.ml @@ -43,8 +43,7 @@ end = struct a value of type [bool] which OCaml's memory model should guarantee. *) let[@inline] closed self = self.closed - (* NOTE: race condition here is also benign in absence of tearing. *) - let[@inline] size self = Queue.length self.q + let[@inline] size self = UM.protect self.mutex (fun () -> Queue.length self.q) let close (self : _ t) = UM.protect self.mutex @@ fun () -> diff --git a/src/client/sync/notifier_sync.ml b/src/client/sync/notifier_sync.ml index e62505ad..7fd1fca5 100644 --- a/src/client/sync/notifier_sync.ml +++ b/src/client/sync/notifier_sync.ml @@ -11,9 +11,11 @@ let[@inline] trigger self = Condition.broadcast self.cond let delete = ignore -let wait self = +let wait self ~should_keep_waiting = Mutex.lock self.mutex; - Condition.wait self.cond self.mutex; + while should_keep_waiting () do + Condition.wait self.cond self.mutex + done; Mutex.unlock self.mutex (** Ensure we get signalled when the queue goes from empty to non-empty *) diff --git a/src/core/span.ml b/src/core/span.ml index 787c11a7..14755e90 100644 --- a/src/core/span.ml +++ b/src/core/span.ml @@ -92,6 +92,8 @@ let to_span_link (self : t) : Span_link.t = let[@inline] to_span_ctx (self : t) : Span_ctx.t = Span_ctx.make ~trace_id:(trace_id self) ~parent_id:(id self) () +(* Note: a span must not be concurrently modified from multiple + threads or domains. *) let[@inline] add_event self ev : unit = if is_not_dummy self then span_set_events self (ev :: self.events) diff --git a/src/core/span_ctx.ml b/src/core/span_ctx.ml index 5b72755b..3f2c73e1 100644 --- a/src/core/span_ctx.ml +++ b/src/core/span_ctx.ml @@ -77,7 +77,11 @@ let of_w3c_trace_context bs : _ result = with Invalid_argument msg -> invalid_arg (spf "in span id: %s" msg) in if Bytes.get bs 52 <> '-' then invalid_arg "expected '-' after parent_id"; - let sampled = int_of_string_opt (Bytes.sub_string bs 53 2) = Some 1 in + let sampled = + match int_of_string_opt ("0x" ^ Bytes.sub_string bs 53 2) with + | Some flags -> flags land 1 = 1 + | None -> false + in (* ignore other flags *) Ok (make ~remote:true ~sampled ~trace_id ~parent_id ()) diff --git a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml index a0a7281b..6dd6e644 100644 --- a/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml +++ b/src/integrations/cohttp/opentelemetry_cohttp_lwt.ml @@ -151,16 +151,21 @@ let client ?(tracer = Otel.Tracer.dynamic_main) ?(span : Otel.Span.t option) [Cohttp_lwt.S.Client]. *) include C - let attrs_for ~uri ~meth:_ () = + let attrs_for ~uri ~meth () = [ - "http.method", `String (Code.string_of_method `GET); + "http.method", `String (Code.string_of_method meth); "http.url", `String (Uri.to_string uri); ] let context_for ~uri ~meth = - let trace_id = Option.map Otel.Span.trace_id span in + let parent = + match span with + | Some _ -> span + | None -> Otel.Ambient_span.get () + in + let trace_id = Option.map Otel.Span.trace_id parent in let attrs = attrs_for ~uri ~meth () in - trace_id, span, attrs + trace_id, parent, attrs let add_traceparent (span : Otel.Span.t) headers = let module Traceparent = Otel.Trace_context.Traceparent in diff --git a/src/lib/metrics_callbacks.ml b/src/lib/metrics_callbacks.ml index 4aae9bf5..c4a40a7f 100644 --- a/src/lib/metrics_callbacks.ml +++ b/src/lib/metrics_callbacks.ml @@ -56,8 +56,11 @@ module Main_set = struct | Some s -> s | None -> let s = create () in - if Atomic.compare_and_set cur_set_ None (Some s) then + if Atomic.compare_and_set cur_set_ None (Some s) then ( + (match Main_exporter.get () with + | Some exp -> add_to_exporter exp s + | None -> ()); s - else + ) else get () end diff --git a/tests/bin/emit1.ml b/tests/bin/emit1.ml index 9e8a2235..c5c275d9 100644 --- a/tests/bin/emit1.ml +++ b/tests/bin/emit1.ml @@ -98,8 +98,6 @@ let run () = [ sum ~name:"num-sleep" ~is_monotonic:true [ int ~now (Atomic.get num_sleep) ]; - sum ~name:"otel.bytes-sent" ~is_monotonic:true ~unit_:"B" - [ int ~now (Opentelemetry_client_ocurl.n_bytes_sent ()) ]; ])); let n_jobs = max 1 !n_jobs in