From 8b488434592f37c2976d1fdeca58a42aa7dae5ab Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 30 Jun 2025 22:52:49 -0400 Subject: [PATCH] Correct size and optimize representation Since we need to traverse the elements added to count up the new size, we can use that pass to add the elements onto our FIFO queue, and then drain the queue in one last pass to reverse. IIUC, this should give us liner complexity of the batch retrieval. --- src/client/batch.ml | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index be4d7c46..ba17e090 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -1,6 +1,7 @@ type 'a t = { mutable size: int; - mutable q: 'a list list; + mutable q: 'a list; + (* The queue is a FIFO represented as a list in reverse order *) batch: int; (* Minimum size to batch before popping *) high_watermark: int; timeout: Mtime.span option; @@ -42,16 +43,23 @@ let ready_to_pop ~force ~now self = let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option = if ready_to_pop ~force ~now self then ( - let l = self.q in + assert (self.q <> []); + let batch = + (* Reverse the list to retrieve the FIFO order. *) + List.rev self.q + in self.q <- []; self.size <- 0; - assert (l <> []); - let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in - Some ls + Some batch ) else None -let push (self : _ t) x : [ `Dropped | `Ok ] = +(* Helper so we can count new elements and prepend them onto the existing [q] in + one pass. *) +let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list = + elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q) + +let push (self : _ t) elems : [ `Dropped | `Ok ] = if self.size >= self.high_watermark then (* drop this to prevent queue from growing too fast *) `Dropped @@ -60,8 +68,9 @@ let push (self : _ t) x : [ `Dropped | `Ok ] = (* current batch starts now *) self.start <- Mtime_clock.now (); + let count, q' = append_with_count ~elems ~q:self.q in (* add to queue *) - self.size <- 1 + self.size; - self.q <- x :: self.q; + self.size <- self.size + count; + self.q <- q'; `Ok )