From 31a712dd30108673f55a21fba0b7ebb36ce50408 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Mon, 30 Jun 2025 22:28:30 -0400 Subject: [PATCH] Allowing configuring start time --- src/client/batch.ml | 24 +++++++++++++----------- src/client/batch.mli | 13 +++++++++++-- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/client/batch.ml b/src/client/batch.ml index 005644c2..be4d7c46 100644 --- a/src/client/batch.ml +++ b/src/client/batch.ml @@ -7,23 +7,25 @@ type 'a t = { mutable start: Mtime.t; } -let high_watermark batch_size = +let default_high_watermark batch_size = if batch_size = 1 then 100 else batch_size * 10 -let make ?(batch = 1) ?(high_watermark = high_watermark batch) ?timeout () : _ t - = +let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t = + let high_watermark = + match high_watermark with + | Some x -> x + | None -> default_high_watermark batch + in + let start = + match now with + | Some x -> x + | None -> Mtime_clock.now () + in assert (batch > 0); - { - size = 0; - start = Mtime_clock.now (); - q = []; - batch; - timeout; - high_watermark; - } + { size = 0; q = []; start; batch; timeout; high_watermark } let timeout_expired_ ~now self : bool = match self.timeout with diff --git a/src/client/batch.mli b/src/client/batch.mli index 39fdd4d4..def675b1 100644 --- a/src/client/batch.mli +++ b/src/client/batch.mli @@ -3,7 +3,12 @@ type 'a t val make : - ?batch:int -> ?high_watermark:int -> ?timeout:Mtime.span -> unit -> 'a t + ?batch:int -> + ?high_watermark:int -> + ?now:Mtime.t -> + ?timeout:Mtime.span -> + unit -> + 'a t (** [make ()] is a new batch @param batch @@ -17,6 +22,8 @@ val make : transmission in case of signal floods. Default [if batch = 1 then 100 else batch * 10]. + @param now the current time. Default [Mtime_clock.now ()]. + @param timeout the time span after which a batch is ready to pop, whether or not it is {b full}. *) @@ -35,7 +42,9 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option @param now the current time - @param force override the other batch conditions *) + @param force + override the other batch conditions, for when when we just want to emit + batches before exit or because the user asks for it *) val push : 'a t -> 'a list -> [ `Dropped | `Ok ] (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch