Allowing configuring start time

This commit is contained in:
Shon Feder 2025-06-30 22:28:30 -04:00
parent 18f58c3ac5
commit 31a712dd30
No known key found for this signature in database
2 changed files with 24 additions and 13 deletions

View file

@ -7,23 +7,25 @@ type 'a t = {
mutable start: Mtime.t; mutable start: Mtime.t;
} }
let high_watermark batch_size = let default_high_watermark batch_size =
if batch_size = 1 then if batch_size = 1 then
100 100
else else
batch_size * 10 batch_size * 10
let make ?(batch = 1) ?(high_watermark = high_watermark batch) ?timeout () : _ t let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
= let high_watermark =
match high_watermark with
| Some x -> x
| None -> default_high_watermark batch
in
let start =
match now with
| Some x -> x
| None -> Mtime_clock.now ()
in
assert (batch > 0); assert (batch > 0);
{ { size = 0; q = []; start; batch; timeout; high_watermark }
size = 0;
start = Mtime_clock.now ();
q = [];
batch;
timeout;
high_watermark;
}
let timeout_expired_ ~now self : bool = let timeout_expired_ ~now self : bool =
match self.timeout with match self.timeout with

View file

@ -3,7 +3,12 @@
type 'a t type 'a t
val make : val make :
?batch:int -> ?high_watermark:int -> ?timeout:Mtime.span -> unit -> 'a t ?batch:int ->
?high_watermark:int ->
?now:Mtime.t ->
?timeout:Mtime.span ->
unit ->
'a t
(** [make ()] is a new batch (** [make ()] is a new batch
@param batch @param batch
@ -17,6 +22,8 @@ val make :
transmission in case of signal floods. Default transmission in case of signal floods. Default
[if batch = 1 then 100 else batch * 10]. [if batch = 1 then 100 else batch * 10].
@param now the current time. Default [Mtime_clock.now ()].
@param timeout @param timeout
the time span after which a batch is ready to pop, whether or not it is the time span after which a batch is ready to pop, whether or not it is
{b full}. *) {b full}. *)
@ -35,7 +42,9 @@ val pop_if_ready : ?force:bool -> now:Mtime.t -> 'a t -> 'a list option
@param now the current time @param now the current time
@param force override the other batch conditions *) @param force
override the other batch conditions, for when when we just want to emit
batches before exit or because the user asks for it *)
val push : 'a t -> 'a list -> [ `Dropped | `Ok ] val push : 'a t -> 'a list -> [ `Dropped | `Ok ]
(** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch (** [push b xs] is [`Ok] if it succeeds in pushing the values in [xs] into the batch