diff --git a/src/client/batch.ml b/src/client/batch.ml index ba17e090..5211ada7 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,13 +1,20 @@ type 'a t = { mutable size: int; mutable q: 'a list; - (* The queue is a FIFO represented as a list in reverse order *) + (* The queue is a FIFO represented as a list in reverse order *) batch: int; (* Minimum size to batch before popping *) high_watermark: int; timeout: Mtime.span option; mutable start: Mtime.t; + mutex: Mutex.t; } +(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08 *) +(* cannot inline, otherwise flambda might move code around. (as per Stdlib) *) +let[@inline never] protect m f = + Mutex.lock m; + Fun.protect f ~finally:(fun () -> Mutex.unlock m) + let default_high_watermark batch_size = if batch_size = 1 then 100 @@ -25,8 +32,9 @@ let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = | Some x -> x | None -> Mtime_clock.now () in + let mutex = Mutex.create () in assert (batch > 0); - { size = 0; q = []; start; batch; timeout; high_watermark } + { size = 0; q = []; start; batch; timeout; high_watermark; mutex } let timeout_expired_ ~now self : bool = match self.timeout with @@ -42,6 +50,7 @@ let ready_to_pop ~force ~now self = self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self) let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = + protect self.mutex @@ fun () -> if ready_to_pop ~force ~now self then ( assert (self.q <> []); let batch = @@ -60,6 +69,7 @@ let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) let push (self : _ t) elems : [ `Dropped | `Ok ] = + protect self.mutex @@ fun () -> if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped diff --git a/src/client/batch.mli b/src/client/batch.mli index def675b1..8bd410fd 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -49,4 +49,4 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option val push : 'a t -> 'a list -> [ `Dropped | `Ok ] (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch [b], or [`Dropped] if the current size of the batch has exceeded the high water - mark determined by the [batch] argument to {!make}]. ) *) + mark determined by the [batch] argument to [{!make}]. ) *)