diff --git a/src/client/batch.ml b/src/client/batch.ml index be4d7c46..ba17e090 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,6 +1,7 @@ type 'a t = { mutable size: int; - mutable q: 'a list list; + mutable q: 'a list; + (* The queue is a FIFO represented as a list in reverse order *) batch: int; (* Minimum size to batch before popping *) high_watermark: int; timeout: Mtime.span option; @@ -42,16 +43,23 @@ let ready_to_pop ~force ~now self = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = if ready_to_pop ~force ~now self then ( - let l = self.q in + assert (self.q <> []); + let batch = + (* Reverse the list to retrieve the FIFO order. *) + List.rev self.q + in self.q <- []; self.size <- 0; - assert (l <> []); - let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Some ls + Some batch ) else None -let push (self : _ t) x : [ `Dropped | `Ok ] = +(* Helper so we can count new elements and prepend them onto the existing [q] in + one pass. *) +let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = + elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) + +let push (self : _ t) elems : [ `Dropped | `Ok ] = if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped @@ -60,8 +68,9 @@ let push (self : _ t) x : [ `Dropped | `Ok ] = (* current batch starts now *) self.start <- Mtime_clock.now (); + let count, q' = append_with_count ~elems ~q:self.q in (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; + self.size <- self.size + count; + self.q <- q'; `Ok )