From 989971bb0463c13692d148d97c61af3143b4a774 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 18 Nov 2019 18:47:43 -0600 Subject: [PATCH] feat: use rust-like streams, much better --- src/Tiny_httpd.ml | 281 ++++++++++++++++++++++----------------------- src/Tiny_httpd.mli | 13 ++- 2 files changed, 148 insertions(+), 146 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 2e70603a..412825b4 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -1,6 +1,10 @@ -type stream = (bytes -> int -> int -> int) * (unit -> unit) -(** An input stream is a function to read bytes into a buffer, - and a function to close *) +type stream = { + is_fill_buf: 'a. (bytes -> int -> int -> 'a) -> 'a; + is_consume: int -> unit; + is_close: unit -> unit; +} +(** A buffer input stream, with a view into the current buffer (or refill if empty), + and a function to consume [n] bytes *) let _debug_on = ref ( match String.trim @@ Sys.getenv "HTTP_DBG" with @@ -35,34 +39,15 @@ module Buf_ = struct Bytes.blit self.bytes 0 new_buf 0 self.i; self.bytes <- new_buf - let ensure_size self n : unit = - if Bytes.length self.bytes < n then ( - resize self n; - ) - - let read_once (self:t) ~read : int = - (* resize if needed *) - if self.i = Bytes.length self.bytes then ( - resize self (self.i + self.i / 8 + 10); + let add_bytes (self:t) s i len : unit = + if self.i + len >= Bytes.length self.bytes then ( + resize self (self.i + len + 10); ); - let n_rd = read self.bytes self.i (Bytes.length self.bytes - self.i) in - self.i <- self.i + n_rd; - n_rd - - (* remove the first [i] bytes *) - let remove_prefix (self:t) (i:int) : unit = - if i > self.i then invalid_arg "Buf_.contents_slice"; - if i self.i then invalid_arg "Buf_.contents_slice"; - Bytes.sub_string self.bytes i len - let contents_and_clear (self:t) : string = let x = contents self in clear self; @@ -72,94 +57,112 @@ end module Stream_ = struct type t = stream - let close (_,cl : t) = cl () - let of_chan ic : t = input ic, fun () -> close_in ic - let of_chan_close_noerr ic : t = input ic, fun () -> close_in_noerr ic + let close self = self.is_close() - let of_buf_ ?(i=0) ?len ~get_len ~blit s : t = - let off = ref i in - let s_len = match len with - | Some n -> min n (get_len s-i) - | None -> get_len s-i - in - let read buf i len = - let n = min len (s_len - !off) in - if n > 0 then ( - blit s !off buf i n; - off := !off + n; + let of_chan_ ~close ic : t = + let i = ref 0 in + let len = ref 0 in + let buf = Bytes.make 4096 ' ' in + { is_fill_buf=(fun k -> + if !i >= !len then ( + i := 0; + len := input ic buf 0 (Bytes.length buf); ); - n + k buf !i (!len - !i)); + is_consume=(fun n -> i := !i + n); + is_close=(fun () -> close ic) + } + + let of_chan = of_chan_ ~close:close_in + let of_chan_close_noerr = of_chan_ ~close:close_in_noerr + + let of_bytes ?(i=0) ?len s : t = + let len = + ref ( + match len with + | Some n -> min n (Bytes.length s- i) + | None -> Bytes.length s- i + ) in - read, (fun () -> ()) - - let of_string ?i ?len s : t = - of_buf_ ?i ?len ~get_len:String.length ~blit:Bytes.blit_string s - - let of_bytes ?i ?len s : t = - of_buf_ ?i ?len ~get_len:Bytes.length ~blit:Bytes.blit s + let i = ref i in + { is_fill_buf=(fun k -> k s !i !len); + is_close=(fun () -> ()); + is_consume=(fun n -> i := !i + n; len := !len - n); + } let with_file file f = let ic = open_in file in try - let x = f (of_chan_close_noerr ic) in + let x = f (of_chan ic) in close_in ic; x with e -> close_in_noerr ic; raise e + (* Read as much as possible into [buf]. *) + let read_into_buf (self:t) (buf:Buf_.t) : int = + self.is_fill_buf + (fun s i len -> + if len > 0 then ( + Buf_.add_bytes buf s i len; + self.is_consume len; + ); + len) + let read_all ?(buf=Buf_.create()) (self:t) : string = - let (read, _) = self in let continue = ref true in while !continue do - let n_rd = Buf_.read_once buf ~read in + let n_rd = read_into_buf self buf in if n_rd = 0 then ( continue := false ) done; Buf_.contents_and_clear buf - (* ensure that the buffer contains at least [n] input bytes *) - let read_at_least_to_ ~too_short ?buf (self:t) (n:int) : unit = - let buf = match buf with - | Some buf -> - Buf_.ensure_size buf n; - buf - | None -> Buf_.create ~size:n () - in - while buf.i < n do - _debug (fun k->k "read-exactly: buf.i=%d, n=%d" buf.i n); - let read_is, _ = self in - let n_read = read_is buf.bytes buf.i (n - buf.i) in + (* put [n] bytes from the input into bytes *) + let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit = + assert (Bytes.length bytes >= n); + let offset = ref 0 in + while !offset < n do + let n_read = + self.is_fill_buf + (fun s i len -> + let n_read = min len (n- !offset) in + Bytes.blit s i bytes !offset n_read; + offset := !offset + n_read; + self.is_consume n_read; + n_read) + in _debug (fun k->k "read %d" n_read); if n_read=0 then too_short(); - buf.i <- buf.i + n_read; done - let read_line ?(buf=Buf_.create()) (self:t) : string = - let rec find_newline acc = - match Bytes.index buf.bytes '\n' with - | i when i< buf.i -> - let s = Buf_.contents_slice buf 0 i in - Buf_.remove_prefix buf (i+1); (* remove \n too *) - s :: acc - | _ -> read_chunk acc - | exception Not_found -> read_chunk acc - and read_chunk acc = - (* read more data *) - let acc = - let s = Buf_.contents_and_clear buf in - if s<>"" then s :: acc else acc - in - let is_read, _ = self in - let _n = Buf_.read_once buf ~read:is_read in - find_newline acc - in - match find_newline [] with - | [] -> "" - | [s] -> s - | [s2;s1] -> s1^s2 - | l -> String.concat "" @@ List.rev l + (* read a line into the buffer *) + let read_line_into (self:t) ~buf : unit = + Buf_.clear buf; + let continue = ref true in + while !continue do + self.is_fill_buf + (fun s i len -> + let j = ref i in + while !j < i+len && Bytes.get s !j <> '\n' do + incr j + done; + if !j-i <= len then ( + assert (Bytes.get s !j = '\n'); + Buf_.add_bytes buf s i (!j-i); (* without \n *) + self.is_consume (!j-i+1); (* remove \n *) + continue := false + ) else ( + Buf_.add_bytes buf s i len; + self.is_consume len; + )); + done + + let read_line ?(buf=Buf_.create()) self : string = + read_line_into self ~buf; + Buf_.contents buf end exception Bad_req of int * string @@ -238,6 +241,7 @@ module Headers = struct let parse_ ~buf (is:stream) : t = let rec loop acc = let line = Stream_.read_line ~buf is in + _debug (fun k->k "parsed header line %S" line); if line = "\r" then ( acc ) else ( @@ -278,17 +282,15 @@ module Request = struct (Meth.to_string self.meth) Headers.pp self.headers self.path self.body - let read_body_exact ~buf (is:stream) (n:int) : string = - _debug (fun k->k "read body of size %d, buf.i=%d" n buf.Buf_.i); - Stream_.read_at_least_to_ ~buf is n + let read_body_exact (is:stream) (n:int) : string = + let bytes = Bytes.make n ' ' in + Stream_.read_exactly_ is bytes n ~too_short:(fun () -> bad_reqf 400 "body is too short"); - let blob = Buf_.contents_slice buf 0 n in - Buf_.remove_prefix buf n; - blob + Bytes.unsafe_to_string bytes (* decode a "chunked" stream into a normal stream *) - let read_stream_chunked_ ~buf (is:stream) : stream = - let read_next_chunk () : int = + let read_stream_chunked_ ?(buf=Buf_.create()) (is:stream) : stream = + let read_next_chunk_len () : int = let line = Stream_.read_line ~buf is in (* parse chunk length, ignore extensions *) let chunk_size = ( @@ -297,38 +299,39 @@ module Request = struct try Scanf.sscanf line "%x %s@\r" (fun n _ext -> n) with _ -> bad_reqf 400 "cannot read chunk size from line %S" line ) in - _debug (fun k->k "read_stream_chunked: next chunk size: %d" chunk_size); - if chunk_size > 0 then ( - (* now complete [buf_internal] if the line's leftover were not enough *) - Stream_.read_at_least_to_ - ~too_short:(fun () -> bad_reqf 400 "chunk is too short") - is ~buf chunk_size; - _debug (fun k->k "read_stream_chunked: just read a chunk of size %d" chunk_size); - ); chunk_size in let refill = ref true in + let bytes = Bytes.make 4096 ' ' in + let offset = ref 0 in + let len = ref 0 in let chunk_size = ref 0 in - let write_offset = ref 0 in (* offset for writing *) - let write_to bytes i len : int = - if !refill then ( - write_offset := 0; - refill := false; - chunk_size := read_next_chunk() - ); - let n = min len (!chunk_size - !write_offset) in - if n > 0 then ( - Bytes.blit buf.bytes !write_offset bytes i n; - write_offset := !write_offset + n; - if !write_offset >= !chunk_size then ( - buf.i <- 0; (* consume *) - refill := true; - ) - ); - n - in - let close () = Stream_.close is in - write_to, close + { is_fill_buf= + (fun k -> + (* do we need to refill? *) + if !offset >= !len then ( + if !chunk_size = 0 && !refill then ( + chunk_size := read_next_chunk_len(); + ); + offset := 0; + len := 0; + if !chunk_size > 0 then ( + (* read the whole chunk, or [Bytes.length bytes] of it *) + let to_read = min !chunk_size (Bytes.length bytes) in + Stream_.read_exactly_ + ~too_short:(fun () -> bad_reqf 400 "chunk is too short") + is bytes to_read; + len := to_read; + chunk_size := !chunk_size - to_read; + ) else ( + refill := false; (* stream is finished *) + ) + ); + k bytes !offset !len + ); + is_consume=(fun n -> offset := !offset + n); + is_close=(fun () -> Stream_.close is); + } let read_body_chunked ~tr_stream ~buf ~size:max_size (is:stream) : string = _debug (fun k->k "read body with chunked encoding (max-size: %d)" max_size); @@ -336,12 +339,10 @@ module Request = struct let buf_res = Buf_.create() in (* store the accumulated chunks *) (* TODO: extract this as a function [read_all_up_to ~max_size is]? *) let rec read_chunks () = - let rd_is,_ = is in - let n = Buf_.read_once buf_res ~read:rd_is in + let n = Stream_.read_into_buf is buf_res in if n = 0 then ( Buf_.contents buf_res (* done *) ) else ( - _debug (fun k->k "read_body_chunked: read a chunk of size %d" n); (* is the body bigger than expected? *) if max_size>0 && Buf_.size buf_res > max_size then ( bad_reqf 413 @@ -362,8 +363,8 @@ module Request = struct with _ -> raise (Bad_req (400, "Invalid request line")) in let meth = Meth.of_string meth in - let headers = Headers.parse_ ~buf is in _debug (fun k->k "got meth: %s, path %S" (Meth.to_string meth) path); + let headers = Headers.parse_ ~buf is in Ok (Some {meth; path; headers; body=()}) with | End_of_file | Sys_error _ -> Ok None @@ -383,10 +384,10 @@ module Request = struct in let body = match get_header ~f:String.trim req "Transfer-Encoding" with + | None -> read_body_exact (tr_stream req.body) size | Some "chunked" -> read_body_chunked ~tr_stream ~buf ~size req.body (* body sent by chunks *) | Some s -> bad_reqf 500 "cannot handle transfer encoding: %s" s - | None -> read_body_exact ~buf (tr_stream req.body) size in Ok {req with body} with @@ -452,20 +453,18 @@ module Response = struct (* print a stream as a series of chunks *) let output_stream_ (oc:out_channel) (str:stream) : unit = - let buf = Buf_.create ~size:4096 () in let continue = ref true in while !continue do - Buf_.clear buf; (* next chunk *) - let read, _ = str in - let n = Buf_.read_once buf ~read in - _debug (fun k->k "send chunk of size %d" n); - Printf.fprintf oc "%x\r\n" n; - if n = 0 then ( - continue := false; - ) else ( - output oc buf.bytes 0 n; - ); + str.is_fill_buf + (fun s i len -> + Printf.fprintf oc "%x\r\n" len; + output oc s i len; + str.is_consume len; + if len = 0 then ( + continue := false; + ) + ); output_string oc "\r\n"; done; () diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index a6739b12..8eb784a9 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -1,6 +1,10 @@ -type stream = (bytes -> int -> int -> int) * (unit -> unit) -(** An input stream is a function to read bytes into a buffer, - and a function to close *) +type stream = { + is_fill_buf: 'a. (bytes -> int -> int -> 'a) -> 'a; + is_consume: int -> unit; + is_close: unit -> unit; +} +(** A buffer input stream, with a view into the current buffer (or refill if empty), + and a function to consume [n] bytes *) (** {2 Tiny buffer implementation} *) module Buf_ : sig @@ -15,11 +19,10 @@ end module Stream_ : sig type t = stream + val close : t -> unit val of_chan : in_channel -> t val of_chan_close_noerr : in_channel -> t - val of_string : ?i:int -> ?len:int -> string -> t val of_bytes : ?i:int -> ?len:int -> bytes -> t - val close : t -> unit val with_file : string -> (t -> 'a) -> 'a (** Open a file with given name, and obtain an input stream *)