fix: read_all must return rather than blocking when done

This commit is contained in:
Simon Cruanes 2019-12-02 19:17:40 -06:00
parent 51519fec1f
commit c16e386338
2 changed files with 91 additions and 60 deletions

View file

@ -84,7 +84,9 @@ module Byte_stream = struct
let rec iter f (self:t) : unit =
let s, i, len = self.bs_fill_buf () in
if len > 0 then (
if len=0 then (
self.bs_close();
) else (
f s i len;
self.bs_consume len;
(iter [@tailcall]) f self
@ -98,13 +100,15 @@ module Byte_stream = struct
let len =
ref (
match len with
| Some n -> min n (Bytes.length s- i)
| None -> Bytes.length s- i
| Some n ->
if n > Bytes.length s -i then invalid_arg "Byte_stream.of_bytes";
n
| None -> Bytes.length s - i
)
in
let i = ref i in
{ bs_fill_buf=(fun () -> s, !i, !len);
bs_close=(fun () -> ());
bs_close=(fun () -> len := 0);
bs_consume=(fun n -> assert (n<= !len); i := !i + n; len := !len - n);
}
@ -121,20 +125,17 @@ module Byte_stream = struct
close_in_noerr ic;
raise e
(* Read as much as possible into [buf]. *)
let read_into_buf (self:t) (buf:Buf_.t) : int =
let s, i, len = self.bs_fill_buf () in
if len > 0 then (
Buf_.add_bytes buf s i len;
self.bs_consume len;
);
len
let read_all ?(buf=Buf_.create()) (self:t) : string =
let continue = ref true in
while !continue do
let n_rd = read_into_buf self buf in
if n_rd = 0 then (
let s, i, len = self.bs_fill_buf () in
_debug (fun k->k "read-all: got i=%d, len=%d, bufsize %d" i len (Buf_.size buf));
if len > 0 then (
Buf_.add_bytes buf s i len;
self.bs_consume len;
);
assert (len >= 0);
if len = 0 then (
continue := false
)
done;
@ -145,24 +146,24 @@ module Byte_stream = struct
assert (Bytes.length bytes >= n);
let offset = ref 0 in
while !offset < n do
let n_read =
let s, i, len = self.bs_fill_buf () in
let n_read = min len (n- !offset) in
Bytes.blit s i bytes !offset n_read;
offset := !offset + n_read;
self.bs_consume n_read;
n_read
in
let s, i, len = self.bs_fill_buf () in
let n_read = min len (n- !offset) in
Bytes.blit s i bytes !offset n_read;
offset := !offset + n_read;
self.bs_consume n_read;
if n_read=0 then too_short();
done
(* read a line into the buffer *)
(* read a line into the buffer, after clearing it. *)
let read_line_into (self:t) ~buf : unit =
Buf_.clear buf;
let continue = ref true in
while !continue do
let s, i, len = self.bs_fill_buf () in
if len=0 then continue := false;
if len=0 then (
continue := false;
if Buf_.size buf = 0 then raise End_of_file;
);
let j = ref i in
while !j < i+len && Bytes.get s !j <> '\n' do
incr j
@ -178,6 +179,56 @@ module Byte_stream = struct
)
done
(* new stream with maximum size [max_size].
@param close_rec if true, closing this will also close the input stream
@param too_big called with read size if the max size is reached *)
let limit_size_to ~close_rec ~max_size ~too_big (self:t) : t =
let size = ref 0 in
let continue = ref true in
{ bs_fill_buf =
(fun () ->
if !continue then self.bs_fill_buf() else Bytes.empty, 0, 0);
bs_close=(fun () ->
if close_rec then self.bs_close ());
bs_consume = (fun n ->
size := !size + n;
if !size > max_size then (
continue := false;
too_big !size
) else (
self.bs_consume n
));
}
(* read exactly [size] bytes from the stream *)
let read_exactly ~close_rec ~size ~too_short (self:t) : t =
if size=0 then (
empty
) else (
let size = ref size in
{ bs_fill_buf = (fun () ->
(* must not block on [self] if we're done *)
if !size = 0 then Bytes.empty, 0, 0
else (
let buf, i, len = self.bs_fill_buf () in
let len = min len !size in
if len = 0 && !size > 0 then (
too_short !size;
);
buf, i, len
)
);
bs_close=(fun () ->
(* close underlying stream if [close_rec] *)
if close_rec then self.bs_close();
size := 0);
bs_consume = (fun n ->
let n = min n !size in
size := !size - n;
self.bs_consume n);
}
)
let read_line ?(buf=Buf_.create()) self : string =
read_line_into self ~buf;
Buf_.contents buf
@ -306,6 +357,7 @@ module Request = struct
(* decode a "chunked" stream into a normal stream *)
let read_stream_chunked_ ?(buf=Buf_.create()) (bs:byte_stream) : byte_stream =
_debug (fun k->k "body: start reading chunked stream...");
let read_next_chunk_len () : int =
let line = Byte_stream.read_line ~buf bs in
(* parse chunk length, ignore extensions *)
@ -318,7 +370,7 @@ module Request = struct
chunk_size
in
let refill = ref true in
let bytes = Bytes.make 4096 ' ' in
let bytes = Bytes.make (16 * 4096) ' ' in
let offset = ref 0 in
let len = ref 0 in
let chunk_size = ref 0 in
@ -354,46 +406,24 @@ module Request = struct
let limit_body_size_ ~max_size (bs:byte_stream) : byte_stream =
_debug (fun k->k "limit size of body to max-size=%d" max_size);
let size = ref 0 in
{ bs_fill_buf = bs.bs_fill_buf;
bs_close=bs.bs_close;
bs_consume = (fun n ->
size := !size + n;
if !size > max_size then (
(* read too much *)
bad_reqf 413
"body size was supposed to be %d, but at least %d bytes received"
max_size !size
);
bs.bs_consume n);
}
Byte_stream.limit_size_to ~max_size ~close_rec:true bs
~too_big:(fun size ->
(* read too much *)
bad_reqf 413
"body size was supposed to be %d, but at least %d bytes received"
max_size size
)
let limit_body_size ~max_size (req:byte_stream t) : byte_stream t =
{ req with body=limit_body_size_ ~max_size req.body }
(* read exactly [size] bytes from the stream *)
let read_exactly ~size (bs:byte_stream) : byte_stream =
if size=0 then (
Byte_stream.empty
) else (
let size = ref size in
{ bs_fill_buf = (fun () ->
let buf, i, len = bs.bs_fill_buf () in
let len = min len !size in
if len = 0 && !size > 0 then (
bad_reqf 400 "body is too short"
);
buf, i, len
);
bs_close=(fun () ->
(* do not close underlying stream *)
size := 0);
bs_consume = (fun n ->
let n = min n !size in
size := !size - n;
bs.bs_consume n);
}
)
_debug (fun k->k "body: must read exactly %d bytes" size);
Byte_stream.read_exactly bs ~close_rec:false
~size ~too_short:(fun size ->
bad_reqf 400 "body is too short by %d bytes" size
)
(* parse request, but not body (yet) *)
let parse_req_start ~buf (bs:byte_stream) : unit t option resp_result =

View file

@ -141,6 +141,7 @@ let serve ~config (dir:string) : _ result =
S.Byte_stream.to_chan oc req.S.Request.body;
flush oc;
close_out oc;
S._debug (fun k->k "done uploading");
S.Response.make_raw ~code:201 "upload successful"
)
) else (