From 8ba3c823c2c328137be876f3768f664d51fa5581 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sun, 17 Nov 2019 21:44:37 -0600 Subject: [PATCH] refactor API for streams, heavy bugfixing --- src/Tiny_httpd.ml | 288 ++++++++++++++++++++--------------------- src/Tiny_httpd.mli | 50 +++---- src/bin/dune | 1 + src/bin/http_of_dir.ml | 2 +- 4 files changed, 156 insertions(+), 185 deletions(-) diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 7b0cfe67..2e70603a 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -1,10 +1,18 @@ -type input_stream = (bytes -> int -> int -> int) * (unit -> unit) +type stream = (bytes -> int -> int -> int) * (unit -> unit) (** An input stream is a function to read bytes into a buffer, and a function to close *) -type output_stream = (string -> int -> int -> unit) * (unit -> unit) * (unit -> unit) -(** An output stream is a function to output bytes, a function to [flush], - and a function to close. *) +let _debug_on = ref ( + match String.trim @@ Sys.getenv "HTTP_DBG" with + | "" -> false | _ -> true | exception _ -> false +) +let _enable_debug b = _debug_on := b +let _debug k = + if !_debug_on then ( + k (fun fmt-> + Printf.fprintf stdout "[thread %d]: " Thread.(id @@ self()); + Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt) + ) module Buf_ = struct type t = { @@ -32,13 +40,6 @@ module Buf_ = struct resize self n; ) - let add_string (self:t) s i len : unit = - if Bytes.length self.bytes < self.i + len then ( - resize self (self.i + self.i / 8 + len + 10); - ); - Bytes.blit_string s i self.bytes self.i len; - self.i <- self.i + len - let read_once (self:t) ~read : int = (* resize if needed *) if self.i = Bytes.length self.bytes then ( @@ -68,8 +69,8 @@ module Buf_ = struct x end -module Input_stream = struct - type t = input_stream +module Stream_ = struct + type t = stream let close (_,cl : t) = cl () let of_chan ic : t = input ic, fun () -> close_in ic @@ -118,108 +119,52 @@ module Input_stream = struct done; Buf_.contents_and_clear buf - let read_exactly ~too_short ?buf (self:t) (n:int) : unit = + (* ensure that the buffer contains at least [n] input bytes *) + let read_at_least_to_ ~too_short ?buf (self:t) (n:int) : unit = let buf = match buf with | Some buf -> Buf_.ensure_size buf n; buf - | None -> Buf_.create ~size:n () + | None -> Buf_.create ~size:n () in - let i = ref 0 in - while !i < n do - let is_read, _ = self in - let n_read = is_read buf.bytes !i (n- !i) in + while buf.i < n do + _debug (fun k->k "read-exactly: buf.i=%d, n=%d" buf.i n); + let read_is, _ = self in + let n_read = read_is buf.bytes buf.i (n - buf.i) in + _debug (fun k->k "read %d" n_read); if n_read=0 then too_short(); - i := !i + n_read + buf.i <- buf.i + n_read; done let read_line ?(buf=Buf_.create()) (self:t) : string = - let rec read_chunk acc = - Buf_.clear buf; + let rec find_newline acc = + match Bytes.index buf.bytes '\n' with + | i when i< buf.i -> + let s = Buf_.contents_slice buf 0 i in + Buf_.remove_prefix buf (i+1); (* remove \n too *) + s :: acc + | _ -> read_chunk acc + | exception Not_found -> read_chunk acc + and read_chunk acc = + (* read more data *) + let acc = + let s = Buf_.contents_and_clear buf in + if s<>"" then s :: acc else acc + in let is_read, _ = self in let _n = Buf_.read_once buf ~read:is_read in - match Bytes.index buf.Buf_.bytes '\n' with - | i -> - let s = Buf_.contents_slice buf 0 i in - Buf_.remove_prefix buf (i+1); - s :: acc - | exception Not_found -> - read_chunk (Buf_.contents_and_clear buf :: acc) + find_newline acc in - match read_chunk [] with + match find_newline [] with | [] -> "" | [s] -> s - | [s1;s2] -> s1^s2 - | l -> String.concat "" l - + | [s2;s1] -> s1^s2 + | l -> String.concat "" @@ List.rev l end -module Output_stream = struct - type t = output_stream - - let of_chan oc : t = - (output_substring oc, (fun () -> flush oc), (fun () -> close_out oc)) - - let of_chan_close_noerr oc : t = - (output_substring oc, (fun () -> flush oc), (fun () -> close_out_noerr oc)) - - let of_buf (buf:Buf_.t) : t = - let wr b i len = - Buf_.add_string buf b i len - in - (wr, (fun()->()), (fun()->())) - - let with_file file f = - let oc = open_out file in - try - let x = f (of_chan_close_noerr oc) in - close_out oc; - x - with e -> - close_out_noerr oc; - raise e - - let write (self:t) s = - let wr, _, _ = self in - wr s 0 (String.length s) - let flush self : unit = let _, fl, _ = self in fl() - let close self : unit = let _, _, cl = self in cl() -end - -let pipe ?(buf=Buf_.create()) (is:input_stream) (os:output_stream) : unit = - let continue = ref true in - while !continue do - Buf_.clear buf; - let rd, _ = is in - let n = Buf_.read_once buf ~read:rd in - if n=0 then ( - continue := false - ) else ( - let wr, _, _ = os in - wr (Bytes.unsafe_to_string buf.bytes) 0 n - ) - done; - Output_stream.flush os; - Input_stream.close is; - Output_stream.close os; - () - - exception Bad_req of int * string let bad_reqf c fmt = Printf.ksprintf (fun s ->raise (Bad_req (c,s))) fmt -let _debug_on = ref ( - match String.trim @@ Sys.getenv "HTTP_DBG" with - | "" -> false | _ -> true | exception _ -> false -) -let _enable_debug b = _debug_on := b -let _debug k = - if !_debug_on then ( - k (fun fmt-> - Printf.fprintf stdout "[thread %d]: " Thread.(id @@ self()); - Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt) - ) - module Response_code = struct type t = int @@ -284,17 +229,17 @@ end module Headers = struct type t = (string * string) list let contains = List.mem_assoc - let get x h = try Some (List.assoc x h) with Not_found -> None + let get ?(f=fun x->x) x h = try Some (List.assoc x h |> f) with Not_found -> None let set x y h = (x,y) :: List.filter (fun (k,_) -> k<>x) h let pp out l = let pp_pair out (k,v) = Format.fprintf out "@[%s: %s@]" k v in Format.fprintf out "@[%a@]" (Format.pp_print_list pp_pair) l - let parse_ ~buf (is:input_stream) : t = + let parse_ ~buf (is:stream) : t = let rec loop acc = - let line = Input_stream.read_line ~buf is in + let line = Stream_.read_line ~buf is in if line = "\r" then ( - List.rev acc + acc ) else ( let k,v = try Scanf.sscanf line "%s@: %s@\r" (fun k v->k,v) @@ -319,59 +264,99 @@ module Request = struct let path self = self.path let body self = self.body - let get_header self h = Headers.get h self.headers + let get_header ?f self h = Headers.get ?f h self.headers let get_header_int self h = match get_header self h with | Some x -> (try Some (int_of_string x) with _ -> None) | None -> None + let set_header self k v = {self with headers=Headers.set k v self.headers} + let pp_ out self : unit = + Format.fprintf out "{@[meth=%s;@ headers=%a;@ path=%S;@ body=?@]}" + (Meth.to_string self.meth) Headers.pp self.headers self.path let pp out self : unit = Format.fprintf out "{@[meth=%s;@ headers=%a;@ path=%S;@ body=%S@]}" (Meth.to_string self.meth) Headers.pp self.headers self.path self.body - let read_body ~buf (is:input_stream) (n:int) : string = - _debug (fun k->k "read body of size %d" n); - Input_stream.read_exactly ~buf is n + let read_body_exact ~buf (is:stream) (n:int) : string = + _debug (fun k->k "read body of size %d, buf.i=%d" n buf.Buf_.i); + Stream_.read_at_least_to_ ~buf is n ~too_short:(fun () -> bad_reqf 400 "body is too short"); - Buf_.contents_and_clear buf + let blob = Buf_.contents_slice buf 0 n in + Buf_.remove_prefix buf n; + blob - let read_body_chunked ~buf:buf_line ~size:max_size (is:input_stream) : string = - _debug (fun k->k "read body with chunked encoding (max-size: %d)" max_size); - let buf_res = Buf_.create() in (* store the accumulated chunks *) - let rec read_chunks () = - Buf_.clear buf_line; - let line = Input_stream.read_line ~buf:buf_line is in + (* decode a "chunked" stream into a normal stream *) + let read_stream_chunked_ ~buf (is:stream) : stream = + let read_next_chunk () : int = + let line = Stream_.read_line ~buf is in (* parse chunk length, ignore extensions *) - let chunk_size = + let chunk_size = ( if String.trim line = "" then 0 else try Scanf.sscanf line "%x %s@\r" (fun n _ext -> n) with _ -> bad_reqf 400 "cannot read chunk size from line %S" line - in - _debug (fun k->k "chunk size: %d" chunk_size); - if chunk_size = 0 then ( + ) in + _debug (fun k->k "read_stream_chunked: next chunk size: %d" chunk_size); + if chunk_size > 0 then ( + (* now complete [buf_internal] if the line's leftover were not enough *) + Stream_.read_at_least_to_ + ~too_short:(fun () -> bad_reqf 400 "chunk is too short") + is ~buf chunk_size; + _debug (fun k->k "read_stream_chunked: just read a chunk of size %d" chunk_size); + ); + chunk_size + in + let refill = ref true in + let chunk_size = ref 0 in + let write_offset = ref 0 in (* offset for writing *) + let write_to bytes i len : int = + if !refill then ( + write_offset := 0; + refill := false; + chunk_size := read_next_chunk() + ); + let n = min len (!chunk_size - !write_offset) in + if n > 0 then ( + Bytes.blit buf.bytes !write_offset bytes i n; + write_offset := !write_offset + n; + if !write_offset >= !chunk_size then ( + buf.i <- 0; (* consume *) + refill := true; + ) + ); + n + in + let close () = Stream_.close is in + write_to, close + + let read_body_chunked ~tr_stream ~buf ~size:max_size (is:stream) : string = + _debug (fun k->k "read body with chunked encoding (max-size: %d)" max_size); + let is = tr_stream @@ read_stream_chunked_ ~buf is in + let buf_res = Buf_.create() in (* store the accumulated chunks *) + (* TODO: extract this as a function [read_all_up_to ~max_size is]? *) + let rec read_chunks () = + let rd_is,_ = is in + let n = Buf_.read_once buf_res ~read:rd_is in + if n = 0 then ( Buf_.contents buf_res (* done *) ) else ( - let new_size = chunk_size + Buf_.size buf_res in + _debug (fun k->k "read_body_chunked: read a chunk of size %d" n); (* is the body bigger than expected? *) - if max_size>0 && new_size > max_size then ( + if max_size>0 && Buf_.size buf_res > max_size then ( bad_reqf 413 "body size was supposed to be %d, but at least %d bytes received" - max_size new_size + max_size (Buf_.size buf_res) ); - Input_stream.read_exactly - ~too_short:(fun () -> bad_reqf 400 "chunk is too short") - is ~buf:buf_res chunk_size; - _debug (fun k->k "read a chunk of size %d" chunk_size); read_chunks() ) in read_chunks() (* parse request, but not body (yet) *) - let parse_req_start ~buf (is:input_stream) : unit t option resp_result = + let parse_req_start ~buf (is:stream) : unit t option resp_result = try - let line = Input_stream.read_line ~buf is in + let line = Stream_.read_line ~buf is in let meth, path = try Scanf.sscanf line "%s %s HTTP/1.1\r" (fun x y->x,y) with _ -> raise (Bad_req (400, "Invalid request line")) @@ -386,21 +371,22 @@ module Request = struct | e -> Error (400, Printexc.to_string e) - (* parse body, given the headers *) - let parse_body_ ~buf (req:input_stream t) : string t resp_result = + (* parse body, given the headers. + @param tr_stream a transformation of the input stream. *) + let parse_body_ ~tr_stream ~buf (req:stream t) : string t resp_result = try - let n = + let size = match List.assoc "Content-Length" req.headers |> int_of_string with | n -> n (* body of fixed size *) | exception Not_found -> 0 | exception _ -> bad_reqf 400 "invalid content-length" in let body = - match List.assoc "Transfer-Encoding" req.headers |> String.trim with - | "chunked" -> read_body_chunked ~buf ~size:n req.body (* body sent by chunks *) - | s -> bad_reqf 500 "cannot handle transfer encoding: %s" s - | exception Not_found -> - read_body ~buf req.body n + match get_header ~f:String.trim req "Transfer-Encoding" with + | Some "chunked" -> + read_body_chunked ~tr_stream ~buf ~size req.body (* body sent by chunks *) + | Some s -> bad_reqf 500 "cannot handle transfer encoding: %s" s + | None -> read_body_exact ~buf (tr_stream req.body) size in Ok {req with body} with @@ -409,9 +395,9 @@ module Request = struct | e -> Error (400, Printexc.to_string e) - let read_body_full ?buf (self:input_stream t) : string t = + let read_body_full (self:stream t) : string t = try - let body = Input_stream.read_all ?buf self.body in + let body = Stream_.read_all self.body in { self with body } with | Bad_req _ as e -> raise e @@ -419,16 +405,13 @@ module Request = struct end module Response = struct - type body = [`String of string | `Stream of input_stream] + type body = [`String of string | `Stream of stream] type t = { code: Response_code.t; headers: Headers.t; body: body; } - (* TODO: if query had ["Accept-Encoding", "chunked"], we cna reply with chunks, - if [body] was a stream|string instead of just a string *) - let make_raw ?(headers=[]) ~code body : t = (* add content length to response *) let headers = @@ -468,7 +451,7 @@ module Response = struct self.code Headers.pp self.headers pp_body self.body (* print a stream as a series of chunks *) - let output_stream_ (oc:out_channel) (str:input_stream) : unit = + let output_stream_ (oc:out_channel) (str:stream) : unit = let buf = Buf_.create ~size:4096 () in let continue = ref true in while !continue do @@ -508,7 +491,7 @@ type t = { masksigpipe: bool; mutable handler: (string Request.t -> Response.t); mutable path_handlers : (unit Request.t -> cb_path_handler resp_result option) list; - mutable cb_decode_req: (input_stream Request.t -> input_stream Request.t option) list; + mutable cb_decode_req: (unit Request.t -> (unit Request.t * (stream -> stream)) option) list; mutable cb_encode_resp: (string Request.t -> Response.t -> Response.t option) list; mutable running: bool; } @@ -565,7 +548,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = let ic = Unix.in_channel_of_descr client_sock in let oc = Unix.out_channel_of_descr client_sock in let buf = Buf_.create() in - let is = Input_stream.of_chan ic in + let is = Stream_.of_chan ic in let continue = ref true in while !continue && self.running do _debug (fun k->k "read next request"); @@ -584,22 +567,27 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit = | None -> self.handler in (* handle expectations *) - begin match List.assoc "Expect" req.Request.headers with - | "100-continue" -> + begin match Request.get_header ~f:String.trim req "Expect" with + | Some "100-continue" -> _debug (fun k->k "send back: 100 CONTINUE"); Response.output_ oc (Response.make_raw ~code:100 ""); - | s -> bad_reqf 417 "unknown expectation %s" s - | exception Not_found -> () + | Some s -> bad_reqf 417 "unknown expectation %s" s + | None -> () end; (* preprocess request's input stream *) - let req = {req with body=is} in - let req = + let req, tr_stream = List.fold_left - (fun req cb -> match cb req with None -> req | Some r' -> r') - req self.cb_decode_req + (fun (req,tr) cb -> + match cb req with + | None -> req, tr + | Some (r',f) -> r', (fun is -> tr is |> f)) + (req, (fun is->is)) self.cb_decode_req in (* now actually read request's body *) - let req = Request.parse_body_ ~buf req |> unwrap_resp_result in + let req = + Request.parse_body_ ~tr_stream ~buf {req with body=is} + |> unwrap_resp_result + in let resp = handler req in (* post-process response *) List.fold_left diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index df6a454b..a6739b12 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -1,22 +1,19 @@ -type input_stream = (bytes -> int -> int -> int) * (unit -> unit) +type stream = (bytes -> int -> int -> int) * (unit -> unit) (** An input stream is a function to read bytes into a buffer, and a function to close *) -type output_stream = (string -> int -> int -> unit) * (unit -> unit) * (unit -> unit) -(** An output stream is a function to output bytes, a function to [flush], - and a function to close. *) - (** {2 Tiny buffer implementation} *) module Buf_ : sig type t + val size : t -> int val clear : t -> unit val create : ?size:int -> unit -> t val contents : t -> string end -(** {2 Generic input stream} *) -module Input_stream : sig - type t = input_stream +(** {2 Generic stream of data} *) +module Stream_ : sig + type t = stream val of_chan : in_channel -> t val of_chan_close_noerr : in_channel -> t @@ -30,23 +27,6 @@ module Input_stream : sig val read_all : ?buf:Buf_.t -> t -> string end -(** {2 Generic output stream} *) -module Output_stream : sig - type t = output_stream - val of_chan : out_channel -> t - val of_chan_close_noerr : out_channel -> t - val of_buf : Buf_.t -> t - val write : t -> string -> unit - val flush : t -> unit - val close : t -> unit - - val with_file : string -> (t -> 'a) -> 'a - (** Open a file with given name, and obtain an output stream *) -end - -val pipe : ?buf:Buf_.t -> input_stream -> output_stream -> unit -(** [pipe is os] pipes the content of [is] into [os]. *) - module Meth : sig type t = [ | `GET @@ -62,7 +42,7 @@ end module Headers : sig type t = (string * string) list - val get : string -> t -> string option + val get : ?f:(string->string) -> string -> t -> string option val set : string -> string -> t -> t val contains : string -> t -> bool val pp : Format.formatter -> t -> unit @@ -77,14 +57,16 @@ module Request : sig } val pp : Format.formatter -> string t -> unit + val pp_ : Format.formatter -> _ t -> unit val headers : _ t -> Headers.t - val get_header : _ t -> string -> string option + val get_header : ?f:(string->string) -> _ t -> string -> string option val get_header_int : _ t -> string -> int option + val set_header : 'a t -> string -> string -> 'a t val meth : _ t -> Meth.t val path : _ t -> string val body : 'b t -> 'b - val read_body_full : ?buf:Buf_.t -> input_stream t -> string t + val read_body_full : stream t -> string t end module Response_code : sig @@ -95,7 +77,7 @@ module Response_code : sig end module Response : sig - type body = [`String of string | `Stream of input_stream] + type body = [`String of string | `Stream of stream] type t val make_raw : @@ -107,7 +89,7 @@ module Response : sig val make_raw_stream : ?headers:Headers.t -> code:Response_code.t -> - input_stream -> + stream -> t val make : @@ -120,7 +102,7 @@ module Response : sig val make_stream : ?headers:Headers.t -> - (input_stream, Response_code.t * string) result -> t + (stream, Response_code.t * string) result -> t val fail : ?headers:Headers.t -> code:int -> ('a, unit, string, t) format4 -> 'a @@ -153,10 +135,10 @@ val port : t -> int val add_decode_request_cb : t -> - (input_stream Request.t -> input_stream Request.t option) -> unit + (unit Request.t -> (unit Request.t * (stream -> stream)) option) -> unit (** Add a callback for every request. - The callback can modify the request by returning [Some r'] where [r'] - is the new request, or just perform side effects (logging?) and return [None]. + The callback can provide a stream transformer and a new request (with + modified headers, typically). *) val add_encode_response_cb: diff --git a/src/bin/dune b/src/bin/dune index bdd06eb7..ab8a97e0 100644 --- a/src/bin/dune +++ b/src/bin/dune @@ -2,5 +2,6 @@ (executable (name http_of_dir) (public_name http_of_dir) + (package tiny_httpd) (flags :standard -warn-error -3) (libraries tiny_httpd str)) diff --git a/src/bin/http_of_dir.ml b/src/bin/http_of_dir.ml index af03327b..28b8bb2c 100644 --- a/src/bin/http_of_dir.ml +++ b/src/bin/http_of_dir.ml @@ -118,7 +118,7 @@ let serve ~config (dir:string) : _ result = ) else ( try let ic = open_in full_path in - S.Response.make_raw_stream ~code:200 (S.Input_stream.of_chan ic) + S.Response.make_raw_stream ~code:200 (S.Stream_.of_chan ic) with e -> S.Response.fail ~code:500 "error while reading file: %s" (Printexc.to_string e) ));