mirror of
https://github.com/ocaml-tracing/ocaml-opentelemetry.git
synced 2026-03-09 04:17:56 -04:00
Merge pull request #106 from shonfeder/fix-races
Make the signal `Batch`ing module thread safe
This commit is contained in:
commit
85b6126b78
4 changed files with 46 additions and 28 deletions
|
|
@ -15,8 +15,6 @@ let ( let@ ) = ( @@ )
|
||||||
|
|
||||||
let spf = Printf.sprintf
|
let spf = Printf.sprintf
|
||||||
|
|
||||||
let tid () = Thread.id @@ Thread.self ()
|
|
||||||
|
|
||||||
let set_headers = Config.Env.set_headers
|
let set_headers = Config.Env.set_headers
|
||||||
|
|
||||||
let get_headers = Config.Env.get_headers
|
let get_headers = Config.Env.get_headers
|
||||||
|
|
@ -330,7 +328,7 @@ let mk_emitter ~stop ~net (config : Config.t) : (module EMITTER) =
|
||||||
|
|
||||||
let tick () =
|
let tick () =
|
||||||
if Config.Env.get_debug () then
|
if Config.Env.get_debug () then
|
||||||
Printf.eprintf "tick (from %d)\n%!" (tid ());
|
Printf.eprintf "tick (from domain %d)\n%!" (Domain.self () :> int);
|
||||||
run_tick_callbacks ();
|
run_tick_callbacks ();
|
||||||
sample_gc_metrics_if_needed ();
|
sample_gc_metrics_if_needed ();
|
||||||
emit_all ~force:false
|
emit_all ~force:false
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,20 @@
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
mutable size: int;
|
mutable size: int;
|
||||||
mutable q: 'a list;
|
mutable q: 'a list;
|
||||||
(* The queue is a FIFO represented as a list in reverse order *)
|
(** The queue is a FIFO represented as a list in reverse order *)
|
||||||
batch: int; (* Minimum size to batch before popping *)
|
batch: int; (** Minimum size to batch before popping *)
|
||||||
high_watermark: int;
|
high_watermark: int; (** Size above which we start dropping signals *)
|
||||||
timeout: Mtime.span option;
|
timeout: Mtime.span option;
|
||||||
mutable start: Mtime.t;
|
mutable start: Mtime.t;
|
||||||
|
mutex: Mutex.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08.
|
||||||
|
cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
|
||||||
|
let[@inline never] protect_mutex m f =
|
||||||
|
Mutex.lock m;
|
||||||
|
Fun.protect f ~finally:(fun () -> Mutex.unlock m)
|
||||||
|
|
||||||
let default_high_watermark batch_size =
|
let default_high_watermark batch_size =
|
||||||
if batch_size = 1 then
|
if batch_size = 1 then
|
||||||
100
|
100
|
||||||
|
|
@ -25,8 +32,9 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
|
||||||
| Some x -> x
|
| Some x -> x
|
||||||
| None -> Mtime_clock.now ()
|
| None -> Mtime_clock.now ()
|
||||||
in
|
in
|
||||||
|
let mutex = Mutex.create () in
|
||||||
assert (batch > 0);
|
assert (batch > 0);
|
||||||
{ size = 0; q = []; start; batch; timeout; high_watermark }
|
{ size = 0; q = []; start; batch; timeout; high_watermark; mutex }
|
||||||
|
|
||||||
let timeout_expired_ ~now self : bool =
|
let timeout_expired_ ~now self : bool =
|
||||||
match self.timeout with
|
match self.timeout with
|
||||||
|
|
@ -42,24 +50,38 @@ let ready_to_pop ~force ~now self =
|
||||||
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
|
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
|
||||||
|
|
||||||
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
|
||||||
if ready_to_pop ~force ~now self then (
|
let rev_batch_opt =
|
||||||
assert (self.q <> []);
|
protect_mutex self.mutex @@ fun () ->
|
||||||
let batch =
|
if ready_to_pop ~force ~now self then (
|
||||||
(* Reverse the list to retrieve the FIFO order. *)
|
assert (self.q <> []);
|
||||||
List.rev self.q
|
let batch = self.q in
|
||||||
in
|
self.q <- [];
|
||||||
self.q <- [];
|
self.size <- 0;
|
||||||
self.size <- 0;
|
Some batch
|
||||||
Some batch
|
) else
|
||||||
) else
|
None
|
||||||
None
|
in
|
||||||
|
match rev_batch_opt with
|
||||||
|
| None -> None
|
||||||
|
| Some batch ->
|
||||||
|
(* Reverse the list to retrieve the FIFO order. *)
|
||||||
|
Some (List.rev batch)
|
||||||
|
|
||||||
(* Helper so we can count new elements and prepend them onto the existing [q] in
|
(* Helper so we can count new elements and prepend them onto the existing [q] in
|
||||||
one pass. *)
|
one pass. *)
|
||||||
let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list =
|
let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list =
|
||||||
elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q)
|
elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q)
|
||||||
|
|
||||||
|
let rec push_unprotected (self : _ t) ~(elems : _ list) : unit =
|
||||||
|
match elems with
|
||||||
|
| [] -> ()
|
||||||
|
| x :: xs ->
|
||||||
|
self.q <- x :: self.q;
|
||||||
|
self.size <- 1 + self.size;
|
||||||
|
push_unprotected self ~elems:xs
|
||||||
|
|
||||||
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
||||||
|
protect_mutex self.mutex @@ fun () ->
|
||||||
if self.size >= self.high_watermark then
|
if self.size >= self.high_watermark then
|
||||||
(* drop this to prevent queue from growing too fast *)
|
(* drop this to prevent queue from growing too fast *)
|
||||||
`Dropped
|
`Dropped
|
||||||
|
|
@ -68,9 +90,7 @@ let push (self : _ t) elems : [ `Dropped | `Ok ] =
|
||||||
(* current batch starts now *)
|
(* current batch starts now *)
|
||||||
self.start <- Mtime_clock.now ();
|
self.start <- Mtime_clock.now ();
|
||||||
|
|
||||||
let count, q' = append_with_count ~elems ~q:self.q in
|
|
||||||
(* add to queue *)
|
(* add to queue *)
|
||||||
self.size <- self.size + count;
|
push_unprotected self ~elems;
|
||||||
self.q <- q';
|
|
||||||
`Ok
|
`Ok
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -49,4 +49,4 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
|
||||||
val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
|
val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
|
||||||
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch
|
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch
|
||||||
[b], or [`Dropped] if the current size of the batch has exceeded the high water
|
[b], or [`Dropped] if the current size of the batch has exceeded the high water
|
||||||
mark determined by the [batch] argument to {!make}]. ) *)
|
mark determined by the [batch] argument to [{!make}]. ) *)
|
||||||
|
|
|
||||||
|
|
@ -11,22 +11,22 @@ let sleep_outer = ref 2.0
|
||||||
|
|
||||||
let n_jobs = ref 1
|
let n_jobs = ref 1
|
||||||
|
|
||||||
let num_sleep = Atomic.make 0
|
|
||||||
|
|
||||||
let stress_alloc_ = ref true
|
let stress_alloc_ = ref true
|
||||||
|
|
||||||
|
let num_sleep = Atomic.make 0
|
||||||
|
|
||||||
let stop = Atomic.make false
|
let stop = Atomic.make false
|
||||||
|
|
||||||
let num_tr = Atomic.make 0
|
let num_tr = Atomic.make 0
|
||||||
|
|
||||||
(* Counter used to mark simulated failures *)
|
(* Counter used to mark simulated failures *)
|
||||||
let i = ref 0
|
let i = Atomic.make 0
|
||||||
|
|
||||||
let run_job clock _job_id iterations : unit =
|
let run_job clock _job_id iterations : unit =
|
||||||
let@ scope =
|
let@ scope =
|
||||||
Atomic.incr num_tr;
|
Atomic.incr num_tr;
|
||||||
OT.Trace.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
OT.Trace.with_ ~kind:OT.Span.Span_kind_producer "loop.outer"
|
||||||
~attrs:[ "i", `Int !i ]
|
~attrs:[ "i", `Int (Atomic.get i) ]
|
||||||
in
|
in
|
||||||
|
|
||||||
for j = 0 to iterations do
|
for j = 0 to iterations do
|
||||||
|
|
@ -52,7 +52,7 @@ let run_job clock _job_id iterations : unit =
|
||||||
~severity:Severity_number_info "inner at %d" j;
|
~severity:Severity_number_info "inner at %d" j;
|
||||||
]);
|
]);
|
||||||
|
|
||||||
incr i;
|
Atomic.incr i;
|
||||||
|
|
||||||
try
|
try
|
||||||
Atomic.incr num_tr;
|
Atomic.incr num_tr;
|
||||||
|
|
@ -68,7 +68,7 @@ let run_job clock _job_id iterations : unit =
|
||||||
let () = Eio.Time.sleep clock !sleep_inner in
|
let () = Eio.Time.sleep clock !sleep_inner in
|
||||||
Atomic.incr num_sleep;
|
Atomic.incr num_sleep;
|
||||||
|
|
||||||
if j = 4 && !i mod 13 = 0 then failwith "oh no";
|
if j = 4 && Atomic.get i mod 13 = 0 then failwith "oh no";
|
||||||
|
|
||||||
(* simulate a failure *)
|
(* simulate a failure *)
|
||||||
Opentelemetry.Scope.add_event scope (fun () ->
|
Opentelemetry.Scope.add_event scope (fun () ->
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue