This commit is contained in:
Simon Cruanes 2022-02-16 10:20:51 -05:00
parent 4aade13cec
commit 105722f6b6
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 143 additions and 113 deletions

View file

@ -1,10 +1,3 @@
type byte_stream = {
bs_fill_buf: unit -> (bytes * int * int);
bs_consume: int -> unit;
bs_close: unit -> unit;
}
(** A buffer input stream, with a view into the current buffer (or refill if empty),
and a function to consume [n] bytes *)
let _debug_on = ref ( let _debug_on = ref (
match String.trim @@ Sys.getenv "HTTP_DBG" with match String.trim @@ Sys.getenv "HTTP_DBG" with
@ -18,36 +11,50 @@ let _debug k =
Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt) Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt)
) )
module Buf_ = struct module Buf = struct
type t = { type t = {
mutable bytes: bytes; mutable bytes: bytes;
mutable i: int; mutable len: int;
} }
let create ?(size=4_096) () : t = let create ?(size=4_096) () : t =
{ bytes=Bytes.make size ' '; i=0 } { bytes=Bytes.make size ' '; len=0 }
let[@inline] size self = self.len
let[@inline] cap self = Bytes.length self.bytes
let[@inline] bytes_slice self = self.bytes
let size self = self.i
let bytes_slice self = self.bytes
let clear self : unit = let clear self : unit =
if Bytes.length self.bytes > 4_096 * 1_024 then ( if Bytes.length self.bytes > 4_096 * 1_024 then (
self.bytes <- Bytes.make 4096 ' '; (* free big buffer *) self.bytes <- Bytes.make 4096 ' '; (* free big buffer *)
); );
self.i <- 0 self.len <- 0
let resize self new_size : unit = let[@inline never] grow_to_ self new_cap : unit =
let new_buf = Bytes.make new_size ' ' in let new_buf = Bytes.make new_cap ' ' in
Bytes.blit self.bytes 0 new_buf 0 self.i; Bytes.blit self.bytes 0 new_buf 0 self.len;
self.bytes <- new_buf self.bytes <- new_buf
let add_bytes (self:t) s i len : unit = let next_cap_ self =
if self.i + len >= Bytes.length self.bytes then ( let n = cap self in n + n lsr 1 + 10
resize self (self.i + self.i / 2 + len + 10);
);
Bytes.blit s i self.bytes self.i len;
self.i <- self.i + len
let contents (self:t) : string = Bytes.sub_string self.bytes 0 self.i let ensure self new_cap : unit =
if new_cap > cap self then (
let new_cap = max new_cap (next_cap_ self) in
grow_to_ self new_cap
)
let[@inline never] grow_ self =
grow_to_ self (next_cap_ self)
let[@inline] add_bytes (self:t) b i len : unit =
if self.len + len >= cap self then (
grow_ self
);
Bytes.blit b i self.bytes self.len len;
self.len <- self.len + len
let contents (self:t) : string = Bytes.sub_string self.bytes 0 self.len
let contents_and_clear (self:t) : string = let contents_and_clear (self:t) : string =
let x = contents self in let x = contents self in
@ -55,34 +62,51 @@ module Buf_ = struct
x x
end end
module Byte_stream = struct type istream = (bytes -> int -> int -> int) * (unit -> unit)
type t = byte_stream type ostream = (bytes -> int -> int -> unit) * (unit -> unit)
let close self = self.bs_close() let copy_into ?(buf=Buf.create()) (is:istream) (os:ostream) =
let continue = ref true in
while !continue do
let n = (fst is) buf.bytes 0 (Bytes.length buf.bytes) in
if n=0 then continue := false
else (
(fst os) buf.bytes 0 n
)
done
let empty = { module Ostream = struct
bs_fill_buf=(fun () -> Bytes.empty, 0, 0); type t = ostream
bs_consume=(fun _ -> ());
bs_close=(fun () -> ());
}
let of_chan_ ?(buf_size=16 * 1024) ~close ic : t = let close (_,cl) = cl
let i = ref 0 in
let len = ref 0 in let of_chan oc : t =
let buf = Bytes.make buf_size ' ' in let write b i len = output oc b i len in
{ bs_fill_buf=(fun () -> let close () = close_out oc in
if !i >= !len then ( write, close
i := 0;
len := input ic buf 0 (Bytes.length buf); let of_buf buf : t =
); let write b i len = Buf.add_bytes buf b i len in
buf, !i,!len - !i); let close() = () in
bs_consume=(fun n -> i := !i + n); write, close
bs_close=(fun () -> close ic) end
}
module Istream = struct
type t = istream
let close (_,cl) = cl()
let empty : t = (fun _ _ _ -> 0), (fun()->())
let of_chan_ ~close ic : t =
let read b i len = input ic b i len in
let close() = close ic in
read, close
let of_chan = of_chan_ ~close:close_in let of_chan = of_chan_ ~close:close_in
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
(*
let rec iter f (self:t) : unit = let rec iter f (self:t) : unit =
let s, i, len = self.bs_fill_buf () in let s, i, len = self.bs_fill_buf () in
if len=0 then ( if len=0 then (
@ -92,56 +116,62 @@ module Byte_stream = struct
self.bs_consume len; self.bs_consume len;
(iter [@tailcall]) f self (iter [@tailcall]) f self
) )
*)
let to_chan (oc:out_channel) (self:t) = let to_chan ?buf (oc:out_channel) (self:t) =
iter (fun s i len -> output oc s i len) self let os = Ostream.of_chan oc in
copy_into ?buf self os
let of_bytes ?(i=0) ?len s : t = let of_bytes ?(i=0) ?len b : t =
(* invariant: !i+!len is constant *) (* invariant: !i+!len is constant *)
let len = let len =
ref ( ref (
match len with match len with
| Some n -> | Some n ->
if n > Bytes.length s - i then invalid_arg "Byte_stream.of_bytes"; if n > Bytes.length b - i then invalid_arg "Byte_stream.of_bytes";
n n
| None -> Bytes.length s - i | None -> Bytes.length b - i
) )
in in
let i = ref i in let i = ref i in
{ bs_fill_buf=(fun () -> s, !i, !len); let close()= () in
bs_close=(fun () -> len := 0); let read b2 i2 n2 =
bs_consume=(fun n -> assert (n>=0 && n<= !len); i := !i + n; len := !len - n); let n = min n2 !len in
} Bytes.blit b !i b2 i2 n;
i := !i + n;
len := !len - n;
n
in
read, close
let of_string s : t = let of_string s : t =
of_bytes (Bytes.unsafe_of_string s) of_bytes (Bytes.unsafe_of_string s)
let with_file ?buf_size file f = let with_file file f =
let ic = open_in file in let ic = open_in file in
try try
let x = f (of_chan ?buf_size ic) in let x = f (of_chan ic) in
close_in ic; close_in ic;
x x
with e -> with e ->
close_in_noerr ic; close_in_noerr ic;
raise e raise e
let read_all ?(buf=Buf_.create()) (self:t) : string = let read_all ?(buf=Buf.create()) (self:t) : string =
let continue = ref true in let continue = ref true in
while !continue do while !continue do
let s, i, len = self.bs_fill_buf () in if Buf.size buf = Buf.cap buf then Buf.grow_ buf;
_debug (fun k->k "read-all: got i=%d, len=%d, bufsize %d" i len (Buf_.size buf)); let n = (fst self) buf.bytes buf.len (Buf.cap buf - buf.len) in
if len > 0 then ( if n=0 then continue := false
Buf_.add_bytes buf s i len; else (
self.bs_consume len; buf.len <- buf.len + n;
);
assert (len >= 0);
if len = 0 then (
continue := false
) )
done; done;
Buf_.contents_and_clear buf let s = Buf.contents buf in
Buf.clear buf;
s
(*
(* put [n] bytes from the input into bytes *) (* put [n] bytes from the input into bytes *)
let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit = let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit =
assert (Bytes.length bytes >= n); assert (Bytes.length bytes >= n);
@ -154,10 +184,11 @@ module Byte_stream = struct
self.bs_consume n_read; self.bs_consume n_read;
if n_read=0 then too_short(); if n_read=0 then too_short();
done done
*)
(* read a line into the buffer, after clearing it. *) (* read a line into the buffer, after clearing it. *)
let read_line_into (self:t) ~buf : unit = let read_line_into (self:t) ~buf : unit =
Buf_.clear buf; Buf.clear buf;
let continue = ref true in let continue = ref true in
while !continue do while !continue do
let s, i, len = self.bs_fill_buf () in let s, i, len = self.bs_fill_buf () in
@ -230,9 +261,9 @@ module Byte_stream = struct
} }
) )
let read_line ?(buf=Buf_.create()) self : string = let read_line ?(buf=Buf.create()) self : string =
read_line_into self ~buf; read_line_into self ~buf;
Buf_.contents buf Buf.contents buf
end end
exception Bad_req of int * string exception Bad_req of int * string
@ -413,7 +444,7 @@ module Request = struct
self.path self.body pp_comp_ self.path_components pp_query self.query self.path self.body pp_comp_ self.path_components pp_query self.query
(* decode a "chunked" stream into a normal stream *) (* decode a "chunked" stream into a normal stream *)
let read_stream_chunked_ ?(buf=Buf_.create()) (bs:byte_stream) : byte_stream = let read_stream_chunked_ ?(buf=Buf.create()) (bs:istream) : istream =
_debug (fun k->k "body: start reading chunked stream..."); _debug (fun k->k "body: start reading chunked stream...");
let first = ref true in let first = ref true in
let read_next_chunk_len () : int = let read_next_chunk_len () : int =
@ -591,7 +622,7 @@ end
*) *)
module Response = struct module Response = struct
type body = [`String of string | `Stream of byte_stream | `Void] type body = [`String of string | `Stream of istream | `Void]
type t = { type t = {
code: Response_code.t; code: Response_code.t;
headers: Headers.t; headers: Headers.t;
@ -648,7 +679,7 @@ module Response = struct
self.code Headers.pp self.headers pp_body self.body self.code Headers.pp self.headers pp_body self.body
(* print a stream as a series of chunks *) (* print a stream as a series of chunks *)
let output_stream_chunked_ (oc:out_channel) (str:byte_stream) : unit = let output_stream_chunked_ (oc:out_channel) (str:istream) : unit =
let continue = ref true in let continue = ref true in
while !continue do while !continue do
(* next chunk *) (* next chunk *)

View file

@ -75,19 +75,20 @@ echo:
*) *)
(** {2 Tiny buffer implementation} (** Tiny buffer implementation.
These buffers are used to avoid allocating too many byte arrays when These buffers are used to avoid allocating too many byte arrays when
processing streams and parsing requests. processing streams and parsing requests.
*) *)
module Buf : sig
module Buf_ : sig
type t type t
val size : t -> int val size : t -> int
val clear : t -> unit val clear : t -> unit
val create : ?size:int -> unit -> t val create : ?size:int -> unit -> t
val contents : t -> string val contents : t -> string
val ensure : t -> int -> unit
val bytes_slice : t -> bytes val bytes_slice : t -> bytes
(** Access underlying slice of bytes. (** Access underlying slice of bytes.
@since 0.5 *) @since 0.5 *)
@ -101,35 +102,25 @@ module Buf_ : sig
@since 0.5 *) @since 0.5 *)
end end
(** {2 Generic stream of data} (** Generic stream of data.
Streams are used to represent a series of bytes that can arrive progressively. Streams are used to represent a series of bytes that can arrive progressively.
For example, an uploaded file will be sent as a series of chunks. *) For example, an uploaded file will be sent as a series of chunks.
type byte_stream = { It's a pair of function to read data, and to close the stream. *)
bs_fill_buf: unit -> (bytes * int * int); type istream = (bytes -> int -> int -> int) * (unit -> unit)
(** See the current slice of the internal buffer as [bytes, i, len],
where the slice is [bytes[i] .. [bytes[i+len-1]]].
Can block to refill the buffer if there is currently no content.
If [len=0] then there is no more data. *)
bs_consume: int -> unit;
(** Consume n bytes from the buffer. This should only be called with [n <= len]
after a call to [is_fill_buf] that returns a slice of length [len]. *)
bs_close: unit -> unit;
(** Close the stream. *)
}
(** A buffered stream, with a view into the current buffer (or refill if empty),
and a function to consume [n] bytes.
See {!Byte_stream} for more details. *)
module Byte_stream : sig (** An output stream, into which we can push bytes. *)
type t = byte_stream type ostream = (bytes -> int -> int -> unit) * (unit -> unit)
module Istream : sig
type t = istream
val close : t -> unit val close : t -> unit
val empty : t val empty : t
val of_chan : ?buf_size:int -> in_channel -> t val of_chan : in_channel -> t
(** Make a buffered stream from the given channel. *) (** Make a buffered stream from the given channel. *)
val of_chan_close_noerr : ?buf_size:int -> in_channel -> t val of_chan_close_noerr : ?buf_size:int -> in_channel -> t
@ -141,27 +132,35 @@ module Byte_stream : sig
val of_string : string -> t val of_string : string -> t
val iter : (bytes -> int -> int -> unit) -> t -> unit val to_chan : ?buf:Buf.t -> out_channel -> t -> unit
(** Iterate on the chunks of the stream
@since 0.3 *)
val to_chan : out_channel -> t -> unit
(** Write the stream to the channel. (** Write the stream to the channel.
@since 0.3 *) @since 0.3 *)
val with_file : ?buf_size:int -> string -> (t -> 'a) -> 'a val with_file : string -> (t -> 'a) -> 'a
(** Open a file with given name, and obtain an input stream (** Open a file with given name, and obtain an input stream
on its content. When the function returns, the stream (and file) are closed. *) on its content. When the function returns, the stream (and file) are closed. *)
val read_line : ?buf:Buf_.t -> t -> string val read_line : ?buf:Buf.t -> t -> string
(** Read a line from the stream. (** Read a line from the stream.
@param buf a buffer to (re)use. Its content will be cleared. *) @param buf a buffer to (re)use. Its content will be cleared. *)
val read_all : ?buf:Buf_.t -> t -> string val read_all : ?buf:Buf.t -> t -> string
(** Read the whole stream into a string. (** Read the whole stream into a string.
@param buf a buffer to (re)use. Its content will be cleared. *) @param buf a buffer to (re)use. Its content will be cleared. *)
end end
module Ostream : sig
type t = ostream
val of_chan : out_channel -> t
val of_buf : Buffer.t -> t
val close : t -> unit
end
val copy_into : ?buf:Buf.t -> istream -> ostream -> unit
(** {2 Methods} *) (** {2 Methods} *)
module Meth : sig module Meth : sig
@ -289,13 +288,13 @@ module Request : sig
(** time stamp (from {!Unix.gettimeofday}) after parsing the first line of the request (** time stamp (from {!Unix.gettimeofday}) after parsing the first line of the request
@since 0.11 *) @since 0.11 *)
val limit_body_size : max_size:int -> byte_stream t -> byte_stream t val limit_body_size : max_size:int -> istream t -> istream t
(** Limit the body size to [max_size] bytes, or return (** Limit the body size to [max_size] bytes, or return
a [413] error. a [413] error.
@since 0.3 @since 0.3
*) *)
val read_body_full : ?buf_size:int -> byte_stream t -> string t val read_body_full : ?buf_size:int -> istream t -> string t
(** Read the whole body into a string. Potentially blocking. (** Read the whole body into a string. Potentially blocking.
@param buf_size initial size of underlying buffer (since 0.11) *) @param buf_size initial size of underlying buffer (since 0.11) *)
@ -303,8 +302,8 @@ module Request : sig
(**/**) (**/**)
(* for testing purpose, do not use *) (* for testing purpose, do not use *)
module Internal_ : sig module Internal_ : sig
val parse_req_start : ?buf:Buf_.t -> get_time_s:(unit -> float) -> byte_stream -> unit t option val parse_req_start : ?buf:Buf.t -> get_time_s:(unit -> float) -> istream -> unit t option
val parse_body : ?buf:Buf_.t -> unit t -> byte_stream -> byte_stream t val parse_body : ?buf:Buf.t -> unit t -> istream -> istream t
end end
(**/**) (**/**)
end end
@ -334,7 +333,7 @@ end
the client to answer a {!Request.t}*) the client to answer a {!Request.t}*)
module Response : sig module Response : sig
type body = [`String of string | `Stream of byte_stream | `Void] type body = [`String of string | `Stream of istream | `Void]
(** Body of a response, either as a simple string, (** Body of a response, either as a simple string,
or a stream of bytes, or nothing (for server-sent events). *) or a stream of bytes, or nothing (for server-sent events). *)
@ -376,7 +375,7 @@ module Response : sig
val make_raw_stream : val make_raw_stream :
?headers:Headers.t -> ?headers:Headers.t ->
code:Response_code.t -> code:Response_code.t ->
byte_stream -> istream ->
t t
(** Same as {!make_raw} but with a stream body. The body will be sent with (** Same as {!make_raw} but with a stream body. The body will be sent with
the chunked transfer-encoding. *) the chunked transfer-encoding. *)
@ -398,7 +397,7 @@ module Response : sig
val make_stream : val make_stream :
?headers:Headers.t -> ?headers:Headers.t ->
(byte_stream, Response_code.t * string) result -> t (istream, Response_code.t * string) result -> t
(** Same as {!make} but with a stream body. *) (** Same as {!make} but with a stream body. *)
val fail : ?headers:Headers.t -> code:int -> val fail : ?headers:Headers.t -> code:int ->
@ -479,7 +478,7 @@ end
@since 0.11 @since 0.11
*) *)
module Middleware : sig module Middleware : sig
type handler = byte_stream Request.t -> resp:(Response.t -> unit) -> unit type handler = istream Request.t -> resp:(Response.t -> unit) -> unit
(** Handlers are functions returning a response to a request. (** Handlers are functions returning a response to a request.
The response can be delayed, hence the use of a continuation The response can be delayed, hence the use of a continuation
as the [resp] parameter. *) as the [resp] parameter. *)
@ -561,7 +560,7 @@ val active_connections : t -> int
val add_decode_request_cb : val add_decode_request_cb :
t -> t ->
(unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit (unit Request.t -> (unit Request.t * (istream -> istream)) option) -> unit
[@@deprecated "use add_middleware"] [@@deprecated "use add_middleware"]
(** Add a callback for every request. (** Add a callback for every request.
The callback can provide a stream transformer and a new request (with The callback can provide a stream transformer and a new request (with
@ -634,7 +633,7 @@ val add_route_handler_stream :
?middlewares:Middleware.t list -> ?middlewares:Middleware.t list ->
?meth:Meth.t -> ?meth:Meth.t ->
t -> t ->
('a, byte_stream Request.t -> Response.t) Route.t -> 'a -> ('a, istream Request.t -> Response.t) Route.t -> 'a ->
unit unit
(** Similar to {!add_route_handler}, but where the body of the request (** Similar to {!add_route_handler}, but where the body of the request
is a stream of bytes that has not been read yet. is a stream of bytes that has not been read yet.