mirror of
https://github.com/c-cube/tiny_httpd.git
synced 2025-12-06 03:05:29 -05:00
feat: use rust-like streams, much better
This commit is contained in:
parent
8ba3c823c2
commit
989971bb04
2 changed files with 148 additions and 146 deletions
|
|
@ -1,6 +1,10 @@
|
||||||
type stream = (bytes -> int -> int -> int) * (unit -> unit)
|
type stream = {
|
||||||
(** An input stream is a function to read bytes into a buffer,
|
is_fill_buf: 'a. (bytes -> int -> int -> 'a) -> 'a;
|
||||||
and a function to close *)
|
is_consume: int -> unit;
|
||||||
|
is_close: unit -> unit;
|
||||||
|
}
|
||||||
|
(** A buffer input stream, with a view into the current buffer (or refill if empty),
|
||||||
|
and a function to consume [n] bytes *)
|
||||||
|
|
||||||
let _debug_on = ref (
|
let _debug_on = ref (
|
||||||
match String.trim @@ Sys.getenv "HTTP_DBG" with
|
match String.trim @@ Sys.getenv "HTTP_DBG" with
|
||||||
|
|
@ -35,34 +39,15 @@ module Buf_ = struct
|
||||||
Bytes.blit self.bytes 0 new_buf 0 self.i;
|
Bytes.blit self.bytes 0 new_buf 0 self.i;
|
||||||
self.bytes <- new_buf
|
self.bytes <- new_buf
|
||||||
|
|
||||||
let ensure_size self n : unit =
|
let add_bytes (self:t) s i len : unit =
|
||||||
if Bytes.length self.bytes < n then (
|
if self.i + len >= Bytes.length self.bytes then (
|
||||||
resize self n;
|
resize self (self.i + len + 10);
|
||||||
)
|
|
||||||
|
|
||||||
let read_once (self:t) ~read : int =
|
|
||||||
(* resize if needed *)
|
|
||||||
if self.i = Bytes.length self.bytes then (
|
|
||||||
resize self (self.i + self.i / 8 + 10);
|
|
||||||
);
|
);
|
||||||
let n_rd = read self.bytes self.i (Bytes.length self.bytes - self.i) in
|
Bytes.blit s i self.bytes self.i len;
|
||||||
self.i <- self.i + n_rd;
|
self.i <- self.i + len
|
||||||
n_rd
|
|
||||||
|
|
||||||
(* remove the first [i] bytes *)
|
|
||||||
let remove_prefix (self:t) (i:int) : unit =
|
|
||||||
if i > self.i then invalid_arg "Buf_.contents_slice";
|
|
||||||
if i<self.i then (
|
|
||||||
Bytes.blit self.bytes i self.bytes 0 (self.i - i);
|
|
||||||
);
|
|
||||||
self.i <- self.i - i
|
|
||||||
|
|
||||||
let contents (self:t) : string = Bytes.sub_string self.bytes 0 self.i
|
let contents (self:t) : string = Bytes.sub_string self.bytes 0 self.i
|
||||||
|
|
||||||
let contents_slice (self:t) i len : string =
|
|
||||||
if i+len > self.i then invalid_arg "Buf_.contents_slice";
|
|
||||||
Bytes.sub_string self.bytes i len
|
|
||||||
|
|
||||||
let contents_and_clear (self:t) : string =
|
let contents_and_clear (self:t) : string =
|
||||||
let x = contents self in
|
let x = contents self in
|
||||||
clear self;
|
clear self;
|
||||||
|
|
@ -72,94 +57,112 @@ end
|
||||||
module Stream_ = struct
|
module Stream_ = struct
|
||||||
type t = stream
|
type t = stream
|
||||||
|
|
||||||
let close (_,cl : t) = cl ()
|
let close self = self.is_close()
|
||||||
let of_chan ic : t = input ic, fun () -> close_in ic
|
|
||||||
let of_chan_close_noerr ic : t = input ic, fun () -> close_in_noerr ic
|
|
||||||
|
|
||||||
let of_buf_ ?(i=0) ?len ~get_len ~blit s : t =
|
let of_chan_ ~close ic : t =
|
||||||
let off = ref i in
|
let i = ref 0 in
|
||||||
let s_len = match len with
|
let len = ref 0 in
|
||||||
| Some n -> min n (get_len s-i)
|
let buf = Bytes.make 4096 ' ' in
|
||||||
| None -> get_len s-i
|
{ is_fill_buf=(fun k ->
|
||||||
in
|
if !i >= !len then (
|
||||||
let read buf i len =
|
i := 0;
|
||||||
let n = min len (s_len - !off) in
|
len := input ic buf 0 (Bytes.length buf);
|
||||||
if n > 0 then (
|
|
||||||
blit s !off buf i n;
|
|
||||||
off := !off + n;
|
|
||||||
);
|
);
|
||||||
n
|
k buf !i (!len - !i));
|
||||||
|
is_consume=(fun n -> i := !i + n);
|
||||||
|
is_close=(fun () -> close ic)
|
||||||
|
}
|
||||||
|
|
||||||
|
let of_chan = of_chan_ ~close:close_in
|
||||||
|
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
|
||||||
|
|
||||||
|
let of_bytes ?(i=0) ?len s : t =
|
||||||
|
let len =
|
||||||
|
ref (
|
||||||
|
match len with
|
||||||
|
| Some n -> min n (Bytes.length s- i)
|
||||||
|
| None -> Bytes.length s- i
|
||||||
|
)
|
||||||
in
|
in
|
||||||
read, (fun () -> ())
|
let i = ref i in
|
||||||
|
{ is_fill_buf=(fun k -> k s !i !len);
|
||||||
let of_string ?i ?len s : t =
|
is_close=(fun () -> ());
|
||||||
of_buf_ ?i ?len ~get_len:String.length ~blit:Bytes.blit_string s
|
is_consume=(fun n -> i := !i + n; len := !len - n);
|
||||||
|
}
|
||||||
let of_bytes ?i ?len s : t =
|
|
||||||
of_buf_ ?i ?len ~get_len:Bytes.length ~blit:Bytes.blit s
|
|
||||||
|
|
||||||
let with_file file f =
|
let with_file file f =
|
||||||
let ic = open_in file in
|
let ic = open_in file in
|
||||||
try
|
try
|
||||||
let x = f (of_chan_close_noerr ic) in
|
let x = f (of_chan ic) in
|
||||||
close_in ic;
|
close_in ic;
|
||||||
x
|
x
|
||||||
with e ->
|
with e ->
|
||||||
close_in_noerr ic;
|
close_in_noerr ic;
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
(* Read as much as possible into [buf]. *)
|
||||||
|
let read_into_buf (self:t) (buf:Buf_.t) : int =
|
||||||
|
self.is_fill_buf
|
||||||
|
(fun s i len ->
|
||||||
|
if len > 0 then (
|
||||||
|
Buf_.add_bytes buf s i len;
|
||||||
|
self.is_consume len;
|
||||||
|
);
|
||||||
|
len)
|
||||||
|
|
||||||
let read_all ?(buf=Buf_.create()) (self:t) : string =
|
let read_all ?(buf=Buf_.create()) (self:t) : string =
|
||||||
let (read, _) = self in
|
|
||||||
let continue = ref true in
|
let continue = ref true in
|
||||||
while !continue do
|
while !continue do
|
||||||
let n_rd = Buf_.read_once buf ~read in
|
let n_rd = read_into_buf self buf in
|
||||||
if n_rd = 0 then (
|
if n_rd = 0 then (
|
||||||
continue := false
|
continue := false
|
||||||
)
|
)
|
||||||
done;
|
done;
|
||||||
Buf_.contents_and_clear buf
|
Buf_.contents_and_clear buf
|
||||||
|
|
||||||
(* ensure that the buffer contains at least [n] input bytes *)
|
(* put [n] bytes from the input into bytes *)
|
||||||
let read_at_least_to_ ~too_short ?buf (self:t) (n:int) : unit =
|
let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit =
|
||||||
let buf = match buf with
|
assert (Bytes.length bytes >= n);
|
||||||
| Some buf ->
|
let offset = ref 0 in
|
||||||
Buf_.ensure_size buf n;
|
while !offset < n do
|
||||||
buf
|
let n_read =
|
||||||
| None -> Buf_.create ~size:n ()
|
self.is_fill_buf
|
||||||
in
|
(fun s i len ->
|
||||||
while buf.i < n do
|
let n_read = min len (n- !offset) in
|
||||||
_debug (fun k->k "read-exactly: buf.i=%d, n=%d" buf.i n);
|
Bytes.blit s i bytes !offset n_read;
|
||||||
let read_is, _ = self in
|
offset := !offset + n_read;
|
||||||
let n_read = read_is buf.bytes buf.i (n - buf.i) in
|
self.is_consume n_read;
|
||||||
|
n_read)
|
||||||
|
in
|
||||||
_debug (fun k->k "read %d" n_read);
|
_debug (fun k->k "read %d" n_read);
|
||||||
if n_read=0 then too_short();
|
if n_read=0 then too_short();
|
||||||
buf.i <- buf.i + n_read;
|
|
||||||
done
|
done
|
||||||
|
|
||||||
let read_line ?(buf=Buf_.create()) (self:t) : string =
|
(* read a line into the buffer *)
|
||||||
let rec find_newline acc =
|
let read_line_into (self:t) ~buf : unit =
|
||||||
match Bytes.index buf.bytes '\n' with
|
Buf_.clear buf;
|
||||||
| i when i< buf.i ->
|
let continue = ref true in
|
||||||
let s = Buf_.contents_slice buf 0 i in
|
while !continue do
|
||||||
Buf_.remove_prefix buf (i+1); (* remove \n too *)
|
self.is_fill_buf
|
||||||
s :: acc
|
(fun s i len ->
|
||||||
| _ -> read_chunk acc
|
let j = ref i in
|
||||||
| exception Not_found -> read_chunk acc
|
while !j < i+len && Bytes.get s !j <> '\n' do
|
||||||
and read_chunk acc =
|
incr j
|
||||||
(* read more data *)
|
done;
|
||||||
let acc =
|
if !j-i <= len then (
|
||||||
let s = Buf_.contents_and_clear buf in
|
assert (Bytes.get s !j = '\n');
|
||||||
if s<>"" then s :: acc else acc
|
Buf_.add_bytes buf s i (!j-i); (* without \n *)
|
||||||
in
|
self.is_consume (!j-i+1); (* remove \n *)
|
||||||
let is_read, _ = self in
|
continue := false
|
||||||
let _n = Buf_.read_once buf ~read:is_read in
|
) else (
|
||||||
find_newline acc
|
Buf_.add_bytes buf s i len;
|
||||||
in
|
self.is_consume len;
|
||||||
match find_newline [] with
|
));
|
||||||
| [] -> ""
|
done
|
||||||
| [s] -> s
|
|
||||||
| [s2;s1] -> s1^s2
|
let read_line ?(buf=Buf_.create()) self : string =
|
||||||
| l -> String.concat "" @@ List.rev l
|
read_line_into self ~buf;
|
||||||
|
Buf_.contents buf
|
||||||
end
|
end
|
||||||
|
|
||||||
exception Bad_req of int * string
|
exception Bad_req of int * string
|
||||||
|
|
@ -238,6 +241,7 @@ module Headers = struct
|
||||||
let parse_ ~buf (is:stream) : t =
|
let parse_ ~buf (is:stream) : t =
|
||||||
let rec loop acc =
|
let rec loop acc =
|
||||||
let line = Stream_.read_line ~buf is in
|
let line = Stream_.read_line ~buf is in
|
||||||
|
_debug (fun k->k "parsed header line %S" line);
|
||||||
if line = "\r" then (
|
if line = "\r" then (
|
||||||
acc
|
acc
|
||||||
) else (
|
) else (
|
||||||
|
|
@ -278,17 +282,15 @@ module Request = struct
|
||||||
(Meth.to_string self.meth) Headers.pp self.headers
|
(Meth.to_string self.meth) Headers.pp self.headers
|
||||||
self.path self.body
|
self.path self.body
|
||||||
|
|
||||||
let read_body_exact ~buf (is:stream) (n:int) : string =
|
let read_body_exact (is:stream) (n:int) : string =
|
||||||
_debug (fun k->k "read body of size %d, buf.i=%d" n buf.Buf_.i);
|
let bytes = Bytes.make n ' ' in
|
||||||
Stream_.read_at_least_to_ ~buf is n
|
Stream_.read_exactly_ is bytes n
|
||||||
~too_short:(fun () -> bad_reqf 400 "body is too short");
|
~too_short:(fun () -> bad_reqf 400 "body is too short");
|
||||||
let blob = Buf_.contents_slice buf 0 n in
|
Bytes.unsafe_to_string bytes
|
||||||
Buf_.remove_prefix buf n;
|
|
||||||
blob
|
|
||||||
|
|
||||||
(* decode a "chunked" stream into a normal stream *)
|
(* decode a "chunked" stream into a normal stream *)
|
||||||
let read_stream_chunked_ ~buf (is:stream) : stream =
|
let read_stream_chunked_ ?(buf=Buf_.create()) (is:stream) : stream =
|
||||||
let read_next_chunk () : int =
|
let read_next_chunk_len () : int =
|
||||||
let line = Stream_.read_line ~buf is in
|
let line = Stream_.read_line ~buf is in
|
||||||
(* parse chunk length, ignore extensions *)
|
(* parse chunk length, ignore extensions *)
|
||||||
let chunk_size = (
|
let chunk_size = (
|
||||||
|
|
@ -297,38 +299,39 @@ module Request = struct
|
||||||
try Scanf.sscanf line "%x %s@\r" (fun n _ext -> n)
|
try Scanf.sscanf line "%x %s@\r" (fun n _ext -> n)
|
||||||
with _ -> bad_reqf 400 "cannot read chunk size from line %S" line
|
with _ -> bad_reqf 400 "cannot read chunk size from line %S" line
|
||||||
) in
|
) in
|
||||||
_debug (fun k->k "read_stream_chunked: next chunk size: %d" chunk_size);
|
|
||||||
if chunk_size > 0 then (
|
|
||||||
(* now complete [buf_internal] if the line's leftover were not enough *)
|
|
||||||
Stream_.read_at_least_to_
|
|
||||||
~too_short:(fun () -> bad_reqf 400 "chunk is too short")
|
|
||||||
is ~buf chunk_size;
|
|
||||||
_debug (fun k->k "read_stream_chunked: just read a chunk of size %d" chunk_size);
|
|
||||||
);
|
|
||||||
chunk_size
|
chunk_size
|
||||||
in
|
in
|
||||||
let refill = ref true in
|
let refill = ref true in
|
||||||
|
let bytes = Bytes.make 4096 ' ' in
|
||||||
|
let offset = ref 0 in
|
||||||
|
let len = ref 0 in
|
||||||
let chunk_size = ref 0 in
|
let chunk_size = ref 0 in
|
||||||
let write_offset = ref 0 in (* offset for writing *)
|
{ is_fill_buf=
|
||||||
let write_to bytes i len : int =
|
(fun k ->
|
||||||
if !refill then (
|
(* do we need to refill? *)
|
||||||
write_offset := 0;
|
if !offset >= !len then (
|
||||||
refill := false;
|
if !chunk_size = 0 && !refill then (
|
||||||
chunk_size := read_next_chunk()
|
chunk_size := read_next_chunk_len();
|
||||||
);
|
);
|
||||||
let n = min len (!chunk_size - !write_offset) in
|
offset := 0;
|
||||||
if n > 0 then (
|
len := 0;
|
||||||
Bytes.blit buf.bytes !write_offset bytes i n;
|
if !chunk_size > 0 then (
|
||||||
write_offset := !write_offset + n;
|
(* read the whole chunk, or [Bytes.length bytes] of it *)
|
||||||
if !write_offset >= !chunk_size then (
|
let to_read = min !chunk_size (Bytes.length bytes) in
|
||||||
buf.i <- 0; (* consume *)
|
Stream_.read_exactly_
|
||||||
refill := true;
|
~too_short:(fun () -> bad_reqf 400 "chunk is too short")
|
||||||
)
|
is bytes to_read;
|
||||||
);
|
len := to_read;
|
||||||
n
|
chunk_size := !chunk_size - to_read;
|
||||||
in
|
) else (
|
||||||
let close () = Stream_.close is in
|
refill := false; (* stream is finished *)
|
||||||
write_to, close
|
)
|
||||||
|
);
|
||||||
|
k bytes !offset !len
|
||||||
|
);
|
||||||
|
is_consume=(fun n -> offset := !offset + n);
|
||||||
|
is_close=(fun () -> Stream_.close is);
|
||||||
|
}
|
||||||
|
|
||||||
let read_body_chunked ~tr_stream ~buf ~size:max_size (is:stream) : string =
|
let read_body_chunked ~tr_stream ~buf ~size:max_size (is:stream) : string =
|
||||||
_debug (fun k->k "read body with chunked encoding (max-size: %d)" max_size);
|
_debug (fun k->k "read body with chunked encoding (max-size: %d)" max_size);
|
||||||
|
|
@ -336,12 +339,10 @@ module Request = struct
|
||||||
let buf_res = Buf_.create() in (* store the accumulated chunks *)
|
let buf_res = Buf_.create() in (* store the accumulated chunks *)
|
||||||
(* TODO: extract this as a function [read_all_up_to ~max_size is]? *)
|
(* TODO: extract this as a function [read_all_up_to ~max_size is]? *)
|
||||||
let rec read_chunks () =
|
let rec read_chunks () =
|
||||||
let rd_is,_ = is in
|
let n = Stream_.read_into_buf is buf_res in
|
||||||
let n = Buf_.read_once buf_res ~read:rd_is in
|
|
||||||
if n = 0 then (
|
if n = 0 then (
|
||||||
Buf_.contents buf_res (* done *)
|
Buf_.contents buf_res (* done *)
|
||||||
) else (
|
) else (
|
||||||
_debug (fun k->k "read_body_chunked: read a chunk of size %d" n);
|
|
||||||
(* is the body bigger than expected? *)
|
(* is the body bigger than expected? *)
|
||||||
if max_size>0 && Buf_.size buf_res > max_size then (
|
if max_size>0 && Buf_.size buf_res > max_size then (
|
||||||
bad_reqf 413
|
bad_reqf 413
|
||||||
|
|
@ -362,8 +363,8 @@ module Request = struct
|
||||||
with _ -> raise (Bad_req (400, "Invalid request line"))
|
with _ -> raise (Bad_req (400, "Invalid request line"))
|
||||||
in
|
in
|
||||||
let meth = Meth.of_string meth in
|
let meth = Meth.of_string meth in
|
||||||
let headers = Headers.parse_ ~buf is in
|
|
||||||
_debug (fun k->k "got meth: %s, path %S" (Meth.to_string meth) path);
|
_debug (fun k->k "got meth: %s, path %S" (Meth.to_string meth) path);
|
||||||
|
let headers = Headers.parse_ ~buf is in
|
||||||
Ok (Some {meth; path; headers; body=()})
|
Ok (Some {meth; path; headers; body=()})
|
||||||
with
|
with
|
||||||
| End_of_file | Sys_error _ -> Ok None
|
| End_of_file | Sys_error _ -> Ok None
|
||||||
|
|
@ -383,10 +384,10 @@ module Request = struct
|
||||||
in
|
in
|
||||||
let body =
|
let body =
|
||||||
match get_header ~f:String.trim req "Transfer-Encoding" with
|
match get_header ~f:String.trim req "Transfer-Encoding" with
|
||||||
|
| None -> read_body_exact (tr_stream req.body) size
|
||||||
| Some "chunked" ->
|
| Some "chunked" ->
|
||||||
read_body_chunked ~tr_stream ~buf ~size req.body (* body sent by chunks *)
|
read_body_chunked ~tr_stream ~buf ~size req.body (* body sent by chunks *)
|
||||||
| Some s -> bad_reqf 500 "cannot handle transfer encoding: %s" s
|
| Some s -> bad_reqf 500 "cannot handle transfer encoding: %s" s
|
||||||
| None -> read_body_exact ~buf (tr_stream req.body) size
|
|
||||||
in
|
in
|
||||||
Ok {req with body}
|
Ok {req with body}
|
||||||
with
|
with
|
||||||
|
|
@ -452,20 +453,18 @@ module Response = struct
|
||||||
|
|
||||||
(* print a stream as a series of chunks *)
|
(* print a stream as a series of chunks *)
|
||||||
let output_stream_ (oc:out_channel) (str:stream) : unit =
|
let output_stream_ (oc:out_channel) (str:stream) : unit =
|
||||||
let buf = Buf_.create ~size:4096 () in
|
|
||||||
let continue = ref true in
|
let continue = ref true in
|
||||||
while !continue do
|
while !continue do
|
||||||
Buf_.clear buf;
|
|
||||||
(* next chunk *)
|
(* next chunk *)
|
||||||
let read, _ = str in
|
str.is_fill_buf
|
||||||
let n = Buf_.read_once buf ~read in
|
(fun s i len ->
|
||||||
_debug (fun k->k "send chunk of size %d" n);
|
Printf.fprintf oc "%x\r\n" len;
|
||||||
Printf.fprintf oc "%x\r\n" n;
|
output oc s i len;
|
||||||
if n = 0 then (
|
str.is_consume len;
|
||||||
continue := false;
|
if len = 0 then (
|
||||||
) else (
|
continue := false;
|
||||||
output oc buf.bytes 0 n;
|
)
|
||||||
);
|
);
|
||||||
output_string oc "\r\n";
|
output_string oc "\r\n";
|
||||||
done;
|
done;
|
||||||
()
|
()
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,10 @@
|
||||||
type stream = (bytes -> int -> int -> int) * (unit -> unit)
|
type stream = {
|
||||||
(** An input stream is a function to read bytes into a buffer,
|
is_fill_buf: 'a. (bytes -> int -> int -> 'a) -> 'a;
|
||||||
and a function to close *)
|
is_consume: int -> unit;
|
||||||
|
is_close: unit -> unit;
|
||||||
|
}
|
||||||
|
(** A buffer input stream, with a view into the current buffer (or refill if empty),
|
||||||
|
and a function to consume [n] bytes *)
|
||||||
|
|
||||||
(** {2 Tiny buffer implementation} *)
|
(** {2 Tiny buffer implementation} *)
|
||||||
module Buf_ : sig
|
module Buf_ : sig
|
||||||
|
|
@ -15,11 +19,10 @@ end
|
||||||
module Stream_ : sig
|
module Stream_ : sig
|
||||||
type t = stream
|
type t = stream
|
||||||
|
|
||||||
|
val close : t -> unit
|
||||||
val of_chan : in_channel -> t
|
val of_chan : in_channel -> t
|
||||||
val of_chan_close_noerr : in_channel -> t
|
val of_chan_close_noerr : in_channel -> t
|
||||||
val of_string : ?i:int -> ?len:int -> string -> t
|
|
||||||
val of_bytes : ?i:int -> ?len:int -> bytes -> t
|
val of_bytes : ?i:int -> ?len:int -> bytes -> t
|
||||||
val close : t -> unit
|
|
||||||
val with_file : string -> (t -> 'a) -> 'a
|
val with_file : string -> (t -> 'a) -> 'a
|
||||||
(** Open a file with given name, and obtain an input stream *)
|
(** Open a file with given name, and obtain an input stream *)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue