From 93803581b7d29e7a0ae12acee73eb0803073cd00 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Tue, 29 Jul 2025 23:42:33 -0400 Subject: [PATCH 1/3] fix: make metric callbacks atomic The use of a non-threadsafe mutable reference for the metrics callbacks was resulting in a race condition that would sometimes produce non-deterministic results in the integration tests. This has not affected the lwt-based collector, because of the single threaded concurrency Lwt enforces, but it began to show up in the WIP Eio rewrite, for which I am testing on cross-domain programs. I suspect this may have also bee affecting the ocurl collector, but we don't have integration test running on that yet. --- src/core/opentelemetry.ml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index c4ad4146..12c705cf 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -1071,7 +1071,8 @@ end = struct [ "exception.message", `String (Printexc.to_string exn); "exception.type", `String (Printexc.exn_slot_name exn); - "exception.stacktrace", `String (Printexc.raw_backtrace_to_string bt); + ( "exception.stacktrace", + `String (Printexc.raw_backtrace_to_string bt) ); ] in scope.items <- Ev (ev, scope.items) @@ -1454,7 +1455,7 @@ end alarms/intervals to emit them. *) module Metrics_callbacks = struct open struct - let cbs_ : (unit -> Metrics.t list) list ref = ref [] + let cbs_ : (unit -> Metrics.t list) AList.t = AList.make () end (** [register f] adds the callback [f] to the list. @@ -1463,12 +1464,12 @@ module Metrics_callbacks = struct of metrics. It might be called regularly by the backend, in particular (but not only) when {!Collector.tick} is called. *) let register f : unit = - if !cbs_ = [] then + if AList.is_empty cbs_ then (* make sure we call [f] (and others) at each tick *) Collector.on_tick (fun () -> - let m = List.map (fun f -> f ()) !cbs_ |> List.flatten in + let m = List.map (fun f -> f ()) (AList.get cbs_) |> List.flatten in Metrics.emit m); - cbs_ := f :: !cbs_ + AList.add cbs_ f end (** {2 Logs} *) From 54b62af1a2c530f8e79b4daf3c7373297d37e822 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Tue, 2 Sep 2025 15:00:11 -0400 Subject: [PATCH 2/3] Check for on_click registration exactly once --- src/core/opentelemetry.ml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index 12c705cf..be88a9db 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -1455,6 +1455,11 @@ end alarms/intervals to emit them. *) module Metrics_callbacks = struct open struct + (* [true] iff the initial list of metric callbacks has already been registered + with `on_tick`. This registration must only happen once, after which, + [registered_with_on_tick] will forever be [false]. *) + let registered_with_on_tick : bool Atomic.t = Atomic.make false + let cbs_ : (unit -> Metrics.t list) AList.t = AList.make () end @@ -1464,7 +1469,9 @@ module Metrics_callbacks = struct of metrics. It might be called regularly by the backend, in particular (but not only) when {!Collector.tick} is called. *) let register f : unit = - if AList.is_empty cbs_ then + (* sets [registered_with_on_tick] to [true] atomically, iff it is currently + [false]. *) + if Atomic.compare_and_set registered_with_on_tick false true then (* make sure we call [f] (and others) at each tick *) Collector.on_tick (fun () -> let m = List.map (fun f -> f ()) (AList.get cbs_) |> List.flatten in From fa610ed53597f125f9e342203456b6956ab414f8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 2 Sep 2025 15:08:22 -0400 Subject: [PATCH 3/3] Update src/core/opentelemetry.ml --- src/core/opentelemetry.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/opentelemetry.ml b/src/core/opentelemetry.ml index be88a9db..69a8325c 100644 --- a/src/core/opentelemetry.ml +++ b/src/core/opentelemetry.ml @@ -1471,7 +1471,7 @@ module Metrics_callbacks = struct let register f : unit = (* sets [registered_with_on_tick] to [true] atomically, iff it is currently [false]. *) - if Atomic.compare_and_set registered_with_on_tick false true then + if not (Atomic.exchange registered_with_on_tick true) then (* make sure we call [f] (and others) at each tick *) Collector.on_tick (fun () -> let m = List.map (fun f -> f ()) (AList.get cbs_) |> List.flatten in