diff --git a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml index b6266ad9..189b341e 100644 --- a/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml +++ b/src/client-cohttp-eio/opentelemetry_client_cohttp_eio.ml @@ -15,8 +15,6 @@ let ( let@ ) = ( @@ ) let spf = Printf.sprintf -let tid () = Thread.id @@ Thread.self () - let set_headers = Config.Env.set_headers let get_headers = Config.Env.get_headers @@ -330,7 +328,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) = let tick () = if Config.Env.get_debug () then - Printf.eprintf "tick (from %d)\n%!" (tid ()); + Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int); run_tick_callbacks (); sample_gc_metrics_if_needed (); emit_all ~force:false diff --git a/src/client/batch.ml b/src/client/batch.ml index ba17e090..550479ba 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,13 +1,20 @@ type 'a t = { mutable size: int; mutable q: 'a list; - (* The queue is a FIFO represented as a list in reverse order *) - batch: int; (* Minimum size to batch before popping *) - high_watermark: int; + (** The queue is a FIFO represented as a list in reverse order *) + batch: int; (** Minimum size to batch before popping *) + high_watermark: int; (** Size above which we start dropping signals *) timeout: Mtime.span option; mutable start: Mtime.t; + mutex: Mutex.t; } +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08. + cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +let[@inline never] protect_mutex m f = + Mutex.lock m; + Fun.protect f ~finally:(fun () -> Mutex.unlock m) + let default_high_watermark batch_size = if batch_size = 1 then 100 @@ -25,8 +32,9 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = | Some x -> x | None -> Mtime_clock.now () in + let mutex = Mutex.create () in assert (batch > 0); - { size = 0; q = []; start; batch; timeout; high_watermark } + { size = 0; q = []; start; batch; timeout; high_watermark; mutex } let timeout_expired_ ~now self : bool = match self.timeout with @@ -42,24 +50,38 @@ let ready_to_pop ~force ~now self = self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = - if ready_to_pop ~force ~now self then ( - assert (self.q <> []); - let batch = - (* Reverse the list to retrieve the FIFO order. *) - List.rev self.q - in - self.q <- []; - self.size <- 0; - Some batch - ) else - None + let rev_batch_opt = + protect_mutex self.mutex @@ fun () -> + if ready_to_pop ~force ~now self then ( + assert (self.q <> []); + let batch = self.q in + self.q <- []; + self.size <- 0; + Some batch + ) else + None + in + match rev_batch_opt with + | None -> None + | Some batch -> + (* Reverse the list to retrieve the FIFO order. *) + Some (List.rev batch) (* Helper so we can count new elements and prepend them onto the existing [q] in one pass. *) let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) +let rec push_unprotected (self : _ t) ~(elems : _ list) : unit = + match elems with + | [] -> () + | x :: xs -> + self.q <- x :: self.q; + self.size <- 1 + self.size; + push_unprotected self ~elems:xs + let push (self : _ t) elems : [ `Dropped | `Ok ] = + protect_mutex self.mutex @@ fun () -> if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped @@ -68,9 +90,7 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] = (* current batch starts now *) self.start <- Mtime_clock.now (); - let count, q' = append_with_count ~elems ~q:self.q in (* add to queue *) - self.size <- self.size + count; - self.q <- q'; + push_unprotected self ~elems; `Ok ) diff --git a/src/client/batch.mli b/src/client/batch.mli index def675b1..8bd410fd 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -49,4 +49,4 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option val push : 'a t -> 'a list -> [ `Dropped | `Ok ] (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch [b], or [`Dropped] if the current size of the batch has exceeded the high water - mark determined by the [batch] argument to {!make}]. ) *) + mark determined by the [batch] argument to [{!make}]. ) *) diff --git a/tests/bin/emit1_eio.ml b/tests/bin/emit1_eio.ml index 83a9481e..9990b227 100644 --- a/tests/bin/emit1_eio.ml +++ b/tests/bin/emit1_eio.ml @@ -11,22 +11,22 @@ let sleep_outer = ref 2.0 let n_jobs = ref 1 -let num_sleep = Atomic.make 0 - let stress_alloc_ = ref true +let num_sleep = Atomic.make 0 + let stop = Atomic.make false let num_tr = Atomic.make 0 (* Counter used to mark simulated failures *) -let i = ref 0 +let i = Atomic.make 0 let run_job clock _job_id iterations : unit = let@ scope = Atomic.incr num_tr; OT.Trace.with_ ~kind:OT.Span.Span_kind_producer "loop.outer" - ~attrs:[ "i", `Int !i ] + ~attrs:[ "i", `Int (Atomic.get i) ] in for j = 0 to iterations do @@ -52,7 +52,7 @@ let run_job clock _job_id iterations : unit = ~severity:Severity_number_info "inner at %d" j; ]); - incr i; + Atomic.incr i; try Atomic.incr num_tr; @@ -68,7 +68,7 @@ let run_job clock _job_id iterations : unit = let () = Eio.Time.sleep clock !sleep_inner in Atomic.incr num_sleep; - if j = 4 && !i mod 13 = 0 then failwith "oh no"; + if j = 4 && Atomic.get i mod 13 = 0 then failwith "oh no"; (* simulate a failure *) Opentelemetry.Scope.add_event scope (fun () ->