feat: support chunked output streams in responses; set some socket opts

This commit is contained in:
Simon Cruanes 2019-11-17 12:08:52 -06:00
parent bca1466fe5
commit 43df91110c
2 changed files with 67 additions and 17 deletions

View file

@ -8,7 +8,7 @@ type input_stream = {
exception Bad_req of int * string
let bad_reqf c fmt = Printf.ksprintf (fun s ->raise (Bad_req (c,s))) fmt
let debug_ k =
let _debug k =
match Sys.getenv "HTTP_DBG" with
| _ ->
k (fun fmt->
@ -103,7 +103,7 @@ module Request = struct
self.path self.body
let read_body (is:input_stream) (n:int) : string =
debug_ (fun k->k "read body of size %d" n);
_debug (fun k->k "read body of size %d" n);
if Bytes.length is.buf < n then (
is.buf <- Bytes.make n ' ';
);
@ -116,7 +116,7 @@ module Request = struct
Bytes.sub_string is.buf 0 n
let read_body_chunked (is:input_stream) : string =
debug_ (fun k->k "read body with chunked encoding");
_debug (fun k->k "read body with chunked encoding");
let n = ref 0 in
let rec read_chunks () =
let line = input_line is.ic in
@ -127,7 +127,7 @@ module Request = struct
try Scanf.sscanf line "%x %s@\r" (fun n _ext -> n)
with _ -> bad_reqf 400 "cannot read chunk size from line %S" line
in
debug_ (fun k->k "chunk size: %d" chunk_size);
_debug (fun k->k "chunk size: %d" chunk_size);
if chunk_size = 0 then (
Bytes.sub_string is.buf 0 !n (* done *)
) else (
@ -143,7 +143,7 @@ module Request = struct
if read=0 then bad_reqf 400 "body is too short";
n := !n + read
done;
debug_ (fun k->k "read a chunk");
_debug (fun k->k "read a chunk");
read_chunks()
)
in
@ -159,7 +159,7 @@ module Request = struct
in
let meth = Meth.of_string meth in
let headers = Headers.parse_ is in
debug_ (fun k->k "got meth: %s, path %S" (Meth.to_string meth) path);
_debug (fun k->k "got meth: %s, path %S" (Meth.to_string meth) path);
Ok (Some {meth; path; headers; body=""})
with
| End_of_file | Sys_error _ -> Ok None
@ -190,10 +190,16 @@ module Request = struct
end
module Response = struct
type out_stream = bytes -> int -> int -> int
type body = [
| `String of string
| `Stream of out_stream
]
type t = {
code: Response_code.t;
headers: Headers.t;
body: string;
body: body;
}
(* TODO: if query had ["Accept-Encoding", "chunked"], we cna reply with chunks,
@ -204,7 +210,12 @@ module Response = struct
let headers =
Headers.set "Content-Length" (string_of_int (String.length body)) headers
in
{ code; headers; body; }
{ code; headers; body=`String body; }
let make_raw_chunked ?(headers=[]) ~code body : t =
(* add content length to response *)
let headers = Headers.set "Transfer-Encoding" "chunked" headers in
{ code; headers; body=`Stream body; }
let make ?headers r : t = match r with
| Ok body -> make_raw ?headers ~code:200 body
@ -215,16 +226,40 @@ module Response = struct
Printf.ksprintf (fun msg -> make_raw ?headers ~code msg) fmt
let pp out self : unit =
Format.fprintf out "{@[code=%d;@ headers=%a;@ body=%S@]}"
self.code Headers.pp self.headers self.body
let pp_body out = function
| `String s -> Format.fprintf out "%S" s
| `Stream _ -> Format.pp_print_string out "<stream>"
in
Format.fprintf out "{@[code=%d;@ headers=%a;@ body=%a@]}"
self.code Headers.pp self.headers pp_body self.body
(* print a stream as a series of chunks *)
let output_stream_ (oc:out_channel) (str:out_stream) : unit =
let buf = Bytes.make 4096 ' ' in
let continue = ref true in
while !continue do
(* next chunk *)
let n = str buf 0 (Bytes.length buf) in
_debug (fun k->k "send chunk of size %d" n);
Printf.fprintf oc "%x\r\n" n;
if n = 0 then (
continue := false;
) else (
output oc buf 0 n;
);
output_string oc "\r\n";
done;
()
let output_ (oc:out_channel) (self:t) : unit =
Printf.fprintf oc "HTTP/1.1 %d %s\r\n" self.code (Response_code.descr self.code);
List.iter (fun (k,v) -> Printf.fprintf oc "%s: %s\r\n" k v) self.headers;
Printf.fprintf oc "\r\n";
if self.body<>"" then (
output_string oc self.body;
);
begin match self.body with
| `String "" -> ()
| `String s -> output_string oc s;
| `Stream str -> output_stream_ oc str;
end;
flush oc
end
@ -287,7 +322,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
let ph_handlers = self.path_handlers in
let continue = ref true in
while !continue && self.running do
debug_ (fun k->k "read next request");
_debug (fun k->k "read next request");
match Request.parse_req_start is with
| Ok None -> continue := false
| Error (c,s) ->
@ -299,7 +334,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
(* handle expectations *)
begin match List.assoc "Expect" req.Request.headers with
| "100-continue" ->
debug_ (fun k->k "send back: 100 CONTINUE");
_debug (fun k->k "send back: 100 CONTINUE");
Response.output_ oc (Response.make_raw ~code:100 "");
| s -> bad_reqf 417 "unknown expectation %s" s
| exception Not_found -> ()
@ -334,7 +369,7 @@ let handle_client_ (self:t) (client_sock:Unix.file_descr) : unit =
continue := false; (* connection broken somehow *)
Unix.close client_sock;
done;
debug_ (fun k->k "done with client, exiting");
_debug (fun k->k "done with client, exiting");
()
let run (self:t) : (unit,_) result =
@ -343,6 +378,8 @@ let run (self:t) : (unit,_) result =
ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe] : _ list);
);
let sock = Unix.socket PF_INET Unix.SOCK_STREAM 0 in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.setsockopt_optint sock Unix.SO_LINGER None;
let inet_addr = Unix.inet_addr_of_string self.addr in
Unix.bind sock (Unix.ADDR_INET (inet_addr, self.port));
Unix.listen sock 10;

View file

@ -42,6 +42,7 @@ module Response_code : sig
end
module Response : sig
type out_stream = bytes -> int -> int -> int
type t
val make_raw :
@ -50,6 +51,12 @@ module Response : sig
string ->
t
val make_raw_chunked :
?headers:Headers.t ->
code:Response_code.t ->
out_stream ->
t
val make :
?headers:Headers.t ->
(string, Response_code.t * string) result -> t
@ -86,7 +93,7 @@ val add_request_cb : t -> (Request.t -> Request.t option) -> unit
val add_response_cb : t -> (Request.t -> Response.t -> Response.t option) -> unit
(** Add a callback for every request/response pair.
Similarly to {!add_request_cb} the callback can modify the response. *)
val set_top_handler : t -> (Request.t -> Response.t) -> unit
(** Setup a handler called by default.
If not installed, unhandled paths will return a 404 not found. *)
@ -110,3 +117,9 @@ val stop : t -> unit
val run : t -> (unit, exn) result
(**/**)
val _debug : ((('a, out_channel, unit, unit, unit, unit) format6 -> 'a) -> unit) -> unit
(**/**)