From c30f3b1c0c6956958a1892184d69dbc77ee7c7b8 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Sun, 7 Sep 2025 10:44:04 -0400 Subject: [PATCH 1/6] Fix possible data races in eio test bin Since this test runs with multiple domains, we cannot mutate plain refs as we were without inviting data races. --- tests/bin/emit1_eio.ml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml index 83a9481e..9990b227 100644 --- a/tests/bin/emit1_eio.ml +++ b/tests/bin/emit1_eio.ml @@ -11,22 +11,22 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 -let num_sleep = Atomic.make 0 - let stress_alloc_ = ref true +let num_sleep = Atomic.make 0 + let stop = Atomic.make false let num_tr = Atomic.make 0 (* Counter used to mark simulated failures *) -let i = ref 0 +let i = Atomic.make 0 let run_job clock _job_id iterations : unit = let@ scope = Atomic.incr num_tr; OT.Trace.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int !i ] + ~attrs:[ "i", `Int (Atomic.get i) ] in for j = 0 to iterations do @@ -52,7 +52,7 @@ let run_job clock _job_id iterations : unit = ~severity:Severity_number_info "inner at %d" j; ]); - incr i; + Atomic.incr i; try Atomic.incr num_tr; @@ -68,7 +68,7 @@ let run_job clock _job_id iterations : unit = let () = Eio.Time.sleep clock !sleep_inner in Atomic.incr num_sleep; - if j = 4 && !i mod 13 = 0 then failwith "oh no"; + if j = 4 && Atomic.get i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) Opentelemetry.Scope.add_event scope (fun () -> From 474d43bdad93d0738dd8c6975af903a3884ab592 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Sun, 7 Sep 2025 10:45:49 -0400 Subject: [PATCH 2/6] Use domain ID instead of thread ID in Eio collector Eio programs are not generally expected to use threads for concurrency, but they may well use different domains which we'd want to track during debugging. --- src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index b6266ad9..189b341e 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -15,8 +15,6 @@ let ( let@ ) = ( @@ ) let spf = Printf.sprintf -let tid () = Thread.id @@ Thread.self () - let set_headers = Config.Env.set_headers let get_headers = Config.Env.get_headers @@ -330,7 +328,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = let tick () = if Config.Env.get_debug () then - Printf.eprintf "tick (from %d)\n%!" (tid ()); + Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int); run_tick_callbacks (); sample_gc_metrics_if_needed (); emit_all ~force:false From 8a8299020ad9c40e653985bf0322ac1068f62560 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Sun, 7 Sep 2025 10:46:29 -0400 Subject: [PATCH 3/6] Make Batch actually be thread safe --- src/client/batch.ml | 14 ++++++++++++-- src/client/batch.mli | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index ba17e090..5211ada7 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,13 +1,20 @@ type 'a t = { mutable size: int; mutable q: 'a list; - (* The queue is a FIFO represented as a list in reverse order *) + (* The queue is a FIFO represented as a list in reverse order *) batch: int; (* Minimum size to batch before popping *) high_watermark: int; timeout: Mtime.span option; mutable start: Mtime.t; + mutex: Mutex.t; } +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *) +(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +let[@inline never] protect m f = + Mutex.lock m; + Fun.protect f ~finally:(fun () -> Mutex.unlock m) + let default_high_watermark batch_size = if batch_size = 1 then 100 @@ -25,8 +32,9 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = | Some x -> x | None -> Mtime_clock.now () in + let mutex = Mutex.create () in assert (batch > 0); - { size = 0; q = []; start; batch; timeout; high_watermark } + { size = 0; q = []; start; batch; timeout; high_watermark; mutex } let timeout_expired_ ~now self : bool = match self.timeout with @@ -42,6 +50,7 @@ let ready_to_pop ~force ~now self = self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + protect self.mutex @@ fun () -> if ready_to_pop ~force ~now self then ( assert (self.q <> []); let batch = @@ -60,6 +69,7 @@ let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) let push (self : _ t) elems : [ `Dropped | `Ok ] = + protect self.mutex @@ fun () -> if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped diff --git a/src/client/batch.mli b/src/client/batch.mli index def675b1..8bd410fd 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -49,4 +49,4 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option val push : 'a t -> 'a list -> [ `Dropped | `Ok ] (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch [b], or [`Dropped] if the current size of the batch has exceeded the high water - mark determined by the [batch] argument to {!make}]. ) *) + mark determined by the [batch] argument to [{!make}]. ) *) From 76efa381c361e447283900babca1dd07bdc5b1c2 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Sep 2025 08:08:18 -0400 Subject: [PATCH 4/6] comments --- src/client/batch.ml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 5211ada7..88895354 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,17 +1,17 @@ type 'a t = { mutable size: int; mutable q: 'a list; - (* The queue is a FIFO represented as a list in reverse order *) - batch: int; (* Minimum size to batch before popping *) - high_watermark: int; + (** The queue is a FIFO represented as a list in reverse order *) + batch: int; (** Minimum size to batch before popping *) + high_watermark: int; (** Size above which we start dropping signals *) timeout: Mtime.span option; mutable start: Mtime.t; mutex: Mutex.t; } -(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *) -(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *) -let[@inline never] protect m f = +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08. + cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +let[@inline never] protect_mutex m f = Mutex.lock m; Fun.protect f ~finally:(fun () -> Mutex.unlock m) From 026465f770e473e34bdd1f2c4e493109a46cc318 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Sep 2025 08:08:29 -0400 Subject: [PATCH 5/6] reduce size of critical section better to reverse the list without holding the lock, as it allocates and might have to yield to another thread or domain, pause, etc. --- src/client/batch.ml | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 88895354..3b8d9e42 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -50,18 +50,22 @@ let ready_to_pop ~force ~now self = self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - protect self.mutex @@ fun () -> - if ready_to_pop ~force ~now self then ( - assert (self.q <> []); - let batch = - (* Reverse the list to retrieve the FIFO order. *) - List.rev self.q - in - self.q <- []; - self.size <- 0; - Some batch - ) else - None + let rev_batch_opt = + protect_mutex self.mutex @@ fun () -> + if ready_to_pop ~force ~now self then ( + assert (self.q <> []); + let batch = self.q in + self.q <- []; + self.size <- 0; + Some batch + ) else + None + in + match rev_batch_opt with + | None -> None + | Some batch -> + (* Reverse the list to retrieve the FIFO order. *) + Some (List.rev batch) (* Helper so we can count new elements and prepend them onto the existing [q] in one pass. *) From b778ffdac3c6897505838428522848919f20104b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 8 Sep 2025 08:09:05 -0400 Subject: [PATCH 6/6] reduce allocations in `push` --- src/client/batch.ml | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 3b8d9e42..550479ba 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -72,8 +72,16 @@ let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) +let rec push_unprotected (self : _ t) ~(elems : _ list) : unit = + match elems with + | [] -> () + | x :: xs -> + self.q <- x :: self.q; + self.size <- 1 + self.size; + push_unprotected self ~elems:xs + let push (self : _ t) elems : [ `Dropped | `Ok ] = - protect self.mutex @@ fun () -> + protect_mutex self.mutex @@ fun () -> if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped @@ -82,9 +90,7 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = (* current batch starts now *) self.start <- Mtime_clock.now (); - let count, q' = append_with_count ~elems ~q:self.q in (* add to queue *) - self.size <- self.size + count; - self.q <- q'; + push_unprotected self ~elems; `Ok )