Make Batch actually be thread safe

This commit is contained in:
Shon Feder 2025-09-07 10:46:29 -04:00
parent 474d43bdad
commit 8a8299020a
No known key found for this signature in database
2 changed files with 13 additions and 3 deletions

View file

@ -1,13 +1,20 @@
type 'a t = {
mutable size: int;
mutable q: 'a list;
(* The queue is a FIFO represented as a list in reverse order *)
(* The queue is a FIFO represented as a list in reverse order *)
batch: int; (* Minimum size to batch before popping *)
high_watermark: int;
timeout: Mtime.span option;
mutable start: Mtime.t;
mutex: Mutex.t;
}
(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *)
(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
let[@inline never] protect m f =
Mutex.lock m;
Fun.protect f ~finally:(fun () -> Mutex.unlock m)
let default_high_watermark batch_size =
if batch_size = 1 then
100
@ -25,8 +32,9 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
| Some x -> x
| None -> Mtime_clock.now ()
in
let mutex = Mutex.create () in
assert (batch > 0);
{ size = 0; q = []; start; batch; timeout; high_watermark }
{ size = 0; q = []; start; batch; timeout; high_watermark; mutex }
let timeout_expired_ ~now self : bool =
match self.timeout with
@ -42,6 +50,7 @@ let ready_to_pop ~force ~now self =
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
protect self.mutex @@ fun () ->
if ready_to_pop ~force ~now self then (
assert (self.q <> []);
let batch =
@ -60,6 +69,7 @@ let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list =
elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q)
let push (self : _ t) elems : [ `Dropped | `Ok ] =
protect self.mutex @@ fun () ->
if self.size >= self.high_watermark then
(* drop this to prevent queue from growing too fast *)
`Dropped

View file

@ -49,4 +49,4 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch
[b], or [`Dropped] if the current size of the batch has exceeded the high water
mark determined by the [batch] argument to {!make}]. ) *)
mark determined by the [batch] argument to [{!make}]. ) *)