Correct size and optimize representation

Since we need to traverse the elements added to count up the new size,
we can use that pass to add the elements onto our FIFO queue, and then
drain the queue in one last pass to reverse. IIUC, this should give us
liner complexity of the batch retrieval.
This commit is contained in:
Shon Feder 2025-06-30 22:52:49 -04:00
parent 31a712dd30
commit 8b48843459
No known key found for this signature in database

View file

@ -1,6 +1,7 @@
type 'a t = {
mutable size: int;
mutable q: 'a list list;
mutable q: 'a list;
(* The queue is a FIFO represented as a list in reverse order *)
batch: int; (* Minimum size to batch before popping *)
high_watermark: int;
timeout: Mtime.span option;
@ -42,16 +43,23 @@ let ready_to_pop ~force ~now self =
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
if ready_to_pop ~force ~now self then (
let l = self.q in
assert (self.q <> []);
let batch =
(* Reverse the list to retrieve the FIFO order. *)
List.rev self.q
in
self.q <- [];
self.size <- 0;
assert (l <> []);
let ls = List.fold_left (fun acc l -> List.rev_append l acc) [] l in
Some ls
Some batch
) else
None
let push (self : _ t) x : [ `Dropped | `Ok ] =
(* Helper so we can count new elements and prepend them onto the existing [q] in
one pass. *)
let append_with_count ~(elems : 'a list) ~(q : 'a list) : int * 'a list =
elems |> List.fold_left (fun (count, q') x -> succ count, x :: q') (0, q)
let push (self : _ t) elems : [ `Dropped | `Ok ] =
if self.size >= self.high_watermark then
(* drop this to prevent queue from growing too fast *)
`Dropped
@ -60,8 +68,9 @@ let push (self : _ t) x : [ `Dropped | `Ok ] =
(* current batch starts now *)
self.start <- Mtime_clock.now ();
let count, q' = append_with_count ~elems ~q:self.q in
(* add to queue *)
self.size <- 1 + self.size;
self.q <- x :: self.q;
self.size <- self.size + count;
self.q <- q';
`Ok
)