Mutex the batches

This commit is contained in:
Shon Feder 2025-05-28 14:25:01 -04:00
parent 1289fe7d5f
commit f23d22adb0
No known key found for this signature in database

View file

@ -179,9 +179,7 @@ end
(** Batch of resources to be pushed later. (** Batch of resources to be pushed later.
This type is thread-safe This type is safe accross threads and domains. *)
TODO: This is NOT safe accross domains. Need to wrap in mutex or re-architect. *)
module Batch : sig module Batch : sig
type 'a t type 'a t
@ -197,68 +195,104 @@ module Batch : sig
val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t val make : ?batch:int -> ?timeout:Mtime.span -> unit -> 'a t
(** Create a new batch *) (** Create a new batch *)
end = struct end = struct
type 'a t = {
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
}
let make ?batch ?timeout () : _ t = module Q : sig
Option.iter (fun b -> assert (b > 0)) batch; type _ t
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in val make : ?batch:int -> ?timeout:Mtime.span -> unit -> _ t
{ val timeout : _ t -> Mtime.span option
size = 0; val size : _ t -> int
start = Mtime_clock.now (); val start : _ t -> Mtime.t
q = []; val is_full : _ t -> bool
batch;
timeout; (* Take all items queued for the batch, emptying the queue in the process *)
high_watermark; val drain_q : 'a t -> 'a list
val push : 'a t -> 'a -> bool
end = struct
let mutex = Eio.Mutex.create ()
let w_mutex f = Eio.Mutex.use_rw ~protect:true mutex f
let r_mutex f = Eio.Mutex.use_ro mutex f
type 'a t = {
mutable size: int;
mutable q: 'a list;
batch: int option;
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
} }
let make ?batch ?timeout () : _ t =
Option.iter (fun b -> assert (b > 0)) batch;
let high_watermark = Option.fold ~none:100 ~some:(fun x -> x * 10) batch in
{
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let drain_q t = w_mutex (fun () ->
let l = t.q in
t.q <- [];
t.size <- 0;
l)
(* Immutable fields that don't need a mutex for safe access *)
let size t = t.size
let is_full_ t : bool =
match t.batch with
| None -> t.size > 0
| Some b -> t.size >= b
let is_full t : bool = r_mutex (fun () -> is_full_ t)
let push (t : _ t) x : bool = w_mutex (fun () ->
if t.size >= t.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if t.size = 0 && Option.is_some (t.timeout) then
(* current batch starts now *)
t.start <- Mtime_clock.now ();
(* add to queue *)
t.size <- 1 + t.size;
t.q <- x :: t.q;
let ready = is_full_ t in
ready
))
(* Access to mutable fields, requiring a mutex for safe access *)
let timeout t = r_mutex (fun () -> t.timeout)
let start t = r_mutex (fun () -> t.start)
end
type 'a t = 'a Q.t
let make = Q.make
let timeout_expired_ ~now self : bool = let timeout_expired_ ~now self : bool =
match self.timeout with match Q.timeout self with
| Some t -> | Some t ->
let elapsed = Mtime.span now self.start in let elapsed = Mtime.span now (Q.start self) in
Mtime.Span.compare elapsed t >= 0 Mtime.Span.compare elapsed t >= 0
| None -> false | None -> false
let is_full_ self : bool =
match self.batch with
| None -> self.size > 0
| Some b -> self.size >= b
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) if Q.size self > 0 && (force || Q.is_full self || timeout_expired_ ~now self)
then ( then (
let l = self.q in let l = Q.drain_q self in
self.q <- [];
self.size <- 0;
assert (l <> []); assert (l <> []);
Some l Some l
) else ) else
None None
let push (self : _ t) x : bool = let push' self x = ignore (Q.push self x : bool)
if self.size >= self.high_watermark then (
(* drop this to prevent queue from growing too fast *)
Atomic.incr n_dropped;
true
) else (
if self.size = 0 && Option.is_some self.timeout then
(* current batch starts now *)
self.start <- Mtime_clock.now ();
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
let ready = is_full_ self in
ready
)
let push' self x = ignore (push self x : bool)
end end
(** An emitter. This is used by {!Backend} below to forward traces/metrics/… (** An emitter. This is used by {!Backend} below to forward traces/metrics/…