bugfixes in streams

This commit is contained in:
Simon Cruanes 2022-03-31 08:45:23 -04:00
parent 5aa29fb8b5
commit ad92acbee2
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -32,7 +32,8 @@ let make ?(bs=Bytes.create @@ 16 * 1024) ?(close=ignore) ~consume ~fill () : t =
off=0; off=0;
len=0; len=0;
close=(fun () -> close self); close=(fun () -> close self);
fill_buf=(fun () -> fill self); fill_buf=(fun () ->
if self.len = 0 then fill self);
consume= consume=
(fun n -> (fun n ->
assert (n <= self.len); assert (n <= self.len);
@ -46,7 +47,9 @@ let of_chan_ ?(buf_size=16 * 1024) ~close ic : t =
make make
~bs:(Bytes.create buf_size) ~bs:(Bytes.create buf_size)
~close:(fun _ -> close ic) ~close:(fun _ -> close ic)
~consume:(fun buf n -> buf.off <- buf.off + n) ~consume:(fun self n ->
self.off <- self.off + n;
self.len <- self.len - n)
~fill:(fun self -> ~fill:(fun self ->
if self.off >= self.len then ( if self.off >= self.len then (
self.off <- 0; self.off <- 0;
@ -164,69 +167,71 @@ let read_line_into (self:t) ~buf : unit =
(* new stream with maximum size [max_size]. (* new stream with maximum size [max_size].
@param close_rec if true, closing this will also close the input stream @param close_rec if true, closing this will also close the input stream
@param too_big called with read size if the max size is reached *) @param too_big called with read size if the max size is reached *)
let limit_size_to ~close_rec ~max_size ~too_big (self:t) : t = let limit_size_to ~close_rec ~max_size ~too_big (arg:t) : t =
let size = ref 0 in let size = ref 0 in
let continue = ref true in let continue = ref true in
make make
~bs:Bytes.empty ~bs:Bytes.empty
~close:(fun _ -> ~close:(fun _ ->
if close_rec then self.close ()) if close_rec then arg.close ())
~fill:(fun buf -> ~fill:(fun res ->
if buf.len = 0 && !continue then ( if res.len = 0 && !continue then (
self.fill_buf(); arg.fill_buf();
buf.bs <- self.bs; res.bs <- arg.bs;
buf.off <- self.off; res.off <- arg.off;
buf.len <- self.len; res.len <- arg.len;
) else ( ) else (
self.bs <- Bytes.empty; arg.bs <- Bytes.empty;
self.off <- 0; arg.off <- 0;
self.len <- 0; arg.len <- 0;
) )
) )
~consume:(fun buf n -> ~consume:(fun res n ->
size := !size + n; size := !size + n;
if !size > max_size then ( if !size > max_size then (
continue := false; continue := false;
too_big !size too_big !size
) else ( ) else (
self.consume n; arg.consume n;
buf.len <- buf.len - n; res.off <- res.off + n;
res.len <- res.len - n;
)) ))
() ()
(* read exactly [size] bytes from the stream *) (* read exactly [size] bytes from the stream *)
let read_exactly ~close_rec ~size ~too_short (self:t) : t = let read_exactly ~close_rec ~size ~too_short (arg:t) : t =
if size=0 then ( if size=0 then (
empty empty
) else ( ) else (
let size = ref size in let size = ref size in
make ~bs:Bytes.empty make ~bs:Bytes.empty
~fill:(fun buf -> ~fill:(fun res ->
(* must not block on [self] if we're done *) (* must not block on [arg] if we're done *)
if !size = 0 then ( if !size = 0 then (
buf.bs <- Bytes.empty; res.bs <- Bytes.empty;
buf.off <- 0; res.off <- 0;
buf.len <- 0; res.len <- 0;
) else ( ) else (
self.fill_buf(); arg.fill_buf();
buf.bs <- self.bs; res.bs <- arg.bs;
buf.off <- self.off; res.off <- arg.off;
let len = min self.len !size in let len = min arg.len !size in
if len = 0 && !size > 0 then ( if len = 0 && !size > 0 then (
too_short !size; too_short !size;
); );
buf.len <- len; res.len <- len;
)) ))
~close:(fun _buf -> ~close:(fun _res ->
(* close underlying stream if [close_rec] *) (* close underlying stream if [close_rec] *)
if close_rec then self.close(); if close_rec then arg.close();
size := 0 size := 0
) )
~consume:(fun buf n -> ~consume:(fun res n ->
let n = min n !size in let n = min n !size in
size := !size - n; size := !size - n;
buf.len <- buf.len - n; arg.consume n;
self.consume n res.off <- res.off + n;
res.len <- res.len - n;
) )
() ()
) )
@ -263,7 +268,6 @@ let read_chunked ?(buf=Buf.create()) ~fail (bs:t) : t=
if self.off >= self.len then ( if self.off >= self.len then (
if !chunk_size = 0 && !refill then ( if !chunk_size = 0 && !refill then (
chunk_size := read_next_chunk_len(); chunk_size := read_next_chunk_len();
(* _debug (fun k->k"read next chunk of size %d" !chunk_size); *)
); );
self.off <- 0; self.off <- 0;
self.len <- 0; self.len <- 0;
@ -280,7 +284,9 @@ let read_chunked ?(buf=Buf.create()) ~fail (bs:t) : t=
) )
); );
) )
~consume:(fun self n -> self.off <- self.off + n) ~consume:(fun self n ->
self.off <- self.off + n;
self.len <- self.len - n)
~close:(fun self -> ~close:(fun self ->
(* close this overlay, do not close underlying stream *) (* close this overlay, do not close underlying stream *)
self.len <- 0; self.len <- 0;