diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 0acf18fc..c29bf49d 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -1,10 +1,3 @@ -type byte_stream = { - bs_fill_buf: unit -> (bytes * int * int); - bs_consume: int -> unit; - bs_close: unit -> unit; -} -(** A buffer input stream, with a view into the current buffer (or refill if empty), - and a function to consume [n] bytes *) let _debug_on = ref ( match String.trim @@ Sys.getenv "HTTP_DBG" with @@ -18,36 +11,50 @@ let _debug k = Printf.kfprintf (fun oc -> Printf.fprintf oc "\n%!") stdout fmt) ) -module Buf_ = struct +module Buf = struct type t = { mutable bytes: bytes; - mutable i: int; + mutable len: int; } let create ?(size=4_096) () : t = - { bytes=Bytes.make size ' '; i=0 } + { bytes=Bytes.make size ' '; len=0 } + + let[@inline] size self = self.len + let[@inline] cap self = Bytes.length self.bytes + let[@inline] bytes_slice self = self.bytes - let size self = self.i - let bytes_slice self = self.bytes let clear self : unit = if Bytes.length self.bytes > 4_096 * 1_024 then ( self.bytes <- Bytes.make 4096 ' '; (* free big buffer *) ); - self.i <- 0 + self.len <- 0 - let resize self new_size : unit = - let new_buf = Bytes.make new_size ' ' in - Bytes.blit self.bytes 0 new_buf 0 self.i; + let[@inline never] grow_to_ self new_cap : unit = + let new_buf = Bytes.make new_cap ' ' in + Bytes.blit self.bytes 0 new_buf 0 self.len; self.bytes <- new_buf - let add_bytes (self:t) s i len : unit = - if self.i + len >= Bytes.length self.bytes then ( - resize self (self.i + self.i / 2 + len + 10); - ); - Bytes.blit s i self.bytes self.i len; - self.i <- self.i + len + let next_cap_ self = + let n = cap self in n + n lsr 1 + 10 - let contents (self:t) : string = Bytes.sub_string self.bytes 0 self.i + let ensure self new_cap : unit = + if new_cap > cap self then ( + let new_cap = max new_cap (next_cap_ self) in + grow_to_ self new_cap + ) + + let[@inline never] grow_ self = + grow_to_ self (next_cap_ self) + + let[@inline] add_bytes (self:t) b i len : unit = + if self.len + len >= cap self then ( + grow_ self + ); + Bytes.blit b i self.bytes self.len len; + self.len <- self.len + len + + let contents (self:t) : string = Bytes.sub_string self.bytes 0 self.len let contents_and_clear (self:t) : string = let x = contents self in @@ -55,34 +62,51 @@ module Buf_ = struct x end -module Byte_stream = struct - type t = byte_stream +type istream = (bytes -> int -> int -> int) * (unit -> unit) +type ostream = (bytes -> int -> int -> unit) * (unit -> unit) - let close self = self.bs_close() +let copy_into ?(buf=Buf.create()) (is:istream) (os:ostream) = + let continue = ref true in + while !continue do + let n = (fst is) buf.bytes 0 (Bytes.length buf.bytes) in + if n=0 then continue := false + else ( + (fst os) buf.bytes 0 n + ) + done - let empty = { - bs_fill_buf=(fun () -> Bytes.empty, 0, 0); - bs_consume=(fun _ -> ()); - bs_close=(fun () -> ()); - } +module Ostream = struct + type t = ostream - let of_chan_ ?(buf_size=16 * 1024) ~close ic : t = - let i = ref 0 in - let len = ref 0 in - let buf = Bytes.make buf_size ' ' in - { bs_fill_buf=(fun () -> - if !i >= !len then ( - i := 0; - len := input ic buf 0 (Bytes.length buf); - ); - buf, !i,!len - !i); - bs_consume=(fun n -> i := !i + n); - bs_close=(fun () -> close ic) - } + let close (_,cl) = cl + + let of_chan oc : t = + let write b i len = output oc b i len in + let close () = close_out oc in + write, close + + let of_buf buf : t = + let write b i len = Buf.add_bytes buf b i len in + let close() = () in + write, close +end + +module Istream = struct + type t = istream + + let close (_,cl) = cl() + + let empty : t = (fun _ _ _ -> 0), (fun()->()) + + let of_chan_ ~close ic : t = + let read b i len = input ic b i len in + let close() = close ic in + read, close let of_chan = of_chan_ ~close:close_in let of_chan_close_noerr = of_chan_ ~close:close_in_noerr + (* let rec iter f (self:t) : unit = let s, i, len = self.bs_fill_buf () in if len=0 then ( @@ -92,56 +116,62 @@ module Byte_stream = struct self.bs_consume len; (iter [@tailcall]) f self ) + *) - let to_chan (oc:out_channel) (self:t) = - iter (fun s i len -> output oc s i len) self + let to_chan ?buf (oc:out_channel) (self:t) = + let os = Ostream.of_chan oc in + copy_into ?buf self os - let of_bytes ?(i=0) ?len s : t = + let of_bytes ?(i=0) ?len b : t = (* invariant: !i+!len is constant *) let len = ref ( match len with | Some n -> - if n > Bytes.length s - i then invalid_arg "Byte_stream.of_bytes"; + if n > Bytes.length b - i then invalid_arg "Byte_stream.of_bytes"; n - | None -> Bytes.length s - i + | None -> Bytes.length b - i ) in let i = ref i in - { bs_fill_buf=(fun () -> s, !i, !len); - bs_close=(fun () -> len := 0); - bs_consume=(fun n -> assert (n>=0 && n<= !len); i := !i + n; len := !len - n); - } + let close()= () in + let read b2 i2 n2 = + let n = min n2 !len in + Bytes.blit b !i b2 i2 n; + i := !i + n; + len := !len - n; + n + in + read, close let of_string s : t = of_bytes (Bytes.unsafe_of_string s) - let with_file ?buf_size file f = + let with_file file f = let ic = open_in file in try - let x = f (of_chan ?buf_size ic) in + let x = f (of_chan ic) in close_in ic; x with e -> close_in_noerr ic; raise e - let read_all ?(buf=Buf_.create()) (self:t) : string = + let read_all ?(buf=Buf.create()) (self:t) : string = let continue = ref true in while !continue do - let s, i, len = self.bs_fill_buf () in - _debug (fun k->k "read-all: got i=%d, len=%d, bufsize %d" i len (Buf_.size buf)); - if len > 0 then ( - Buf_.add_bytes buf s i len; - self.bs_consume len; - ); - assert (len >= 0); - if len = 0 then ( - continue := false + if Buf.size buf = Buf.cap buf then Buf.grow_ buf; + let n = (fst self) buf.bytes buf.len (Buf.cap buf - buf.len) in + if n=0 then continue := false + else ( + buf.len <- buf.len + n; ) done; - Buf_.contents_and_clear buf + let s = Buf.contents buf in + Buf.clear buf; + s + (* (* put [n] bytes from the input into bytes *) let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit = assert (Bytes.length bytes >= n); @@ -154,10 +184,11 @@ module Byte_stream = struct self.bs_consume n_read; if n_read=0 then too_short(); done + *) (* read a line into the buffer, after clearing it. *) let read_line_into (self:t) ~buf : unit = - Buf_.clear buf; + Buf.clear buf; let continue = ref true in while !continue do let s, i, len = self.bs_fill_buf () in @@ -230,9 +261,9 @@ module Byte_stream = struct } ) - let read_line ?(buf=Buf_.create()) self : string = + let read_line ?(buf=Buf.create()) self : string = read_line_into self ~buf; - Buf_.contents buf + Buf.contents buf end exception Bad_req of int * string @@ -413,7 +444,7 @@ module Request = struct self.path self.body pp_comp_ self.path_components pp_query self.query (* decode a "chunked" stream into a normal stream *) - let read_stream_chunked_ ?(buf=Buf_.create()) (bs:byte_stream) : byte_stream = + let read_stream_chunked_ ?(buf=Buf.create()) (bs:istream) : istream = _debug (fun k->k "body: start reading chunked stream..."); let first = ref true in let read_next_chunk_len () : int = @@ -591,7 +622,7 @@ end *) module Response = struct - type body = [`String of string | `Stream of byte_stream | `Void] + type body = [`String of string | `Stream of istream | `Void] type t = { code: Response_code.t; headers: Headers.t; @@ -648,7 +679,7 @@ module Response = struct self.code Headers.pp self.headers pp_body self.body (* print a stream as a series of chunks *) - let output_stream_chunked_ (oc:out_channel) (str:byte_stream) : unit = + let output_stream_chunked_ (oc:out_channel) (str:istream) : unit = let continue = ref true in while !continue do (* next chunk *) diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index 158a69fd..1f19ef03 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -75,19 +75,20 @@ echo: *) -(** {2 Tiny buffer implementation} +(** Tiny buffer implementation. These buffers are used to avoid allocating too many byte arrays when processing streams and parsing requests. *) - -module Buf_ : sig +module Buf : sig type t val size : t -> int val clear : t -> unit val create : ?size:int -> unit -> t val contents : t -> string + val ensure : t -> int -> unit + val bytes_slice : t -> bytes (** Access underlying slice of bytes. @since 0.5 *) @@ -101,35 +102,25 @@ module Buf_ : sig @since 0.5 *) end -(** {2 Generic stream of data} +(** Generic stream of data. Streams are used to represent a series of bytes that can arrive progressively. - For example, an uploaded file will be sent as a series of chunks. *) + For example, an uploaded file will be sent as a series of chunks. -type byte_stream = { - bs_fill_buf: unit -> (bytes * int * int); - (** See the current slice of the internal buffer as [bytes, i, len], - where the slice is [bytes[i] .. [bytes[i+len-1]]]. - Can block to refill the buffer if there is currently no content. - If [len=0] then there is no more data. *) - bs_consume: int -> unit; - (** Consume n bytes from the buffer. This should only be called with [n <= len] - after a call to [is_fill_buf] that returns a slice of length [len]. *) - bs_close: unit -> unit; - (** Close the stream. *) -} -(** A buffered stream, with a view into the current buffer (or refill if empty), - and a function to consume [n] bytes. - See {!Byte_stream} for more details. *) + It's a pair of function to read data, and to close the stream. *) +type istream = (bytes -> int -> int -> int) * (unit -> unit) -module Byte_stream : sig - type t = byte_stream +(** An output stream, into which we can push bytes. *) +type ostream = (bytes -> int -> int -> unit) * (unit -> unit) + +module Istream : sig + type t = istream val close : t -> unit val empty : t - val of_chan : ?buf_size:int -> in_channel -> t + val of_chan : in_channel -> t (** Make a buffered stream from the given channel. *) val of_chan_close_noerr : ?buf_size:int -> in_channel -> t @@ -141,27 +132,35 @@ module Byte_stream : sig val of_string : string -> t - val iter : (bytes -> int -> int -> unit) -> t -> unit - (** Iterate on the chunks of the stream - @since 0.3 *) - - val to_chan : out_channel -> t -> unit + val to_chan : ?buf:Buf.t -> out_channel -> t -> unit (** Write the stream to the channel. @since 0.3 *) - val with_file : ?buf_size:int -> string -> (t -> 'a) -> 'a + val with_file : string -> (t -> 'a) -> 'a (** Open a file with given name, and obtain an input stream on its content. When the function returns, the stream (and file) are closed. *) - val read_line : ?buf:Buf_.t -> t -> string + val read_line : ?buf:Buf.t -> t -> string (** Read a line from the stream. @param buf a buffer to (re)use. Its content will be cleared. *) - val read_all : ?buf:Buf_.t -> t -> string + val read_all : ?buf:Buf.t -> t -> string (** Read the whole stream into a string. @param buf a buffer to (re)use. Its content will be cleared. *) end +module Ostream : sig + type t = ostream + + val of_chan : out_channel -> t + + val of_buf : Buffer.t -> t + + val close : t -> unit +end + +val copy_into : ?buf:Buf.t -> istream -> ostream -> unit + (** {2 Methods} *) module Meth : sig @@ -289,13 +288,13 @@ module Request : sig (** time stamp (from {!Unix.gettimeofday}) after parsing the first line of the request @since 0.11 *) - val limit_body_size : max_size:int -> byte_stream t -> byte_stream t + val limit_body_size : max_size:int -> istream t -> istream t (** Limit the body size to [max_size] bytes, or return a [413] error. @since 0.3 *) - val read_body_full : ?buf_size:int -> byte_stream t -> string t + val read_body_full : ?buf_size:int -> istream t -> string t (** Read the whole body into a string. Potentially blocking. @param buf_size initial size of underlying buffer (since 0.11) *) @@ -303,8 +302,8 @@ module Request : sig (**/**) (* for testing purpose, do not use *) module Internal_ : sig - val parse_req_start : ?buf:Buf_.t -> get_time_s:(unit -> float) -> byte_stream -> unit t option - val parse_body : ?buf:Buf_.t -> unit t -> byte_stream -> byte_stream t + val parse_req_start : ?buf:Buf.t -> get_time_s:(unit -> float) -> istream -> unit t option + val parse_body : ?buf:Buf.t -> unit t -> istream -> istream t end (**/**) end @@ -334,7 +333,7 @@ end the client to answer a {!Request.t}*) module Response : sig - type body = [`String of string | `Stream of byte_stream | `Void] + type body = [`String of string | `Stream of istream | `Void] (** Body of a response, either as a simple string, or a stream of bytes, or nothing (for server-sent events). *) @@ -376,7 +375,7 @@ module Response : sig val make_raw_stream : ?headers:Headers.t -> code:Response_code.t -> - byte_stream -> + istream -> t (** Same as {!make_raw} but with a stream body. The body will be sent with the chunked transfer-encoding. *) @@ -398,7 +397,7 @@ module Response : sig val make_stream : ?headers:Headers.t -> - (byte_stream, Response_code.t * string) result -> t + (istream, Response_code.t * string) result -> t (** Same as {!make} but with a stream body. *) val fail : ?headers:Headers.t -> code:int -> @@ -479,7 +478,7 @@ end @since 0.11 *) module Middleware : sig - type handler = byte_stream Request.t -> resp:(Response.t -> unit) -> unit + type handler = istream Request.t -> resp:(Response.t -> unit) -> unit (** Handlers are functions returning a response to a request. The response can be delayed, hence the use of a continuation as the [resp] parameter. *) @@ -561,7 +560,7 @@ val active_connections : t -> int val add_decode_request_cb : t -> - (unit Request.t -> (unit Request.t * (byte_stream -> byte_stream)) option) -> unit + (unit Request.t -> (unit Request.t * (istream -> istream)) option) -> unit [@@deprecated "use add_middleware"] (** Add a callback for every request. The callback can provide a stream transformer and a new request (with @@ -634,7 +633,7 @@ val add_route_handler_stream : ?middlewares:Middleware.t list -> ?meth:Meth.t -> t -> - ('a, byte_stream Request.t -> Response.t) Route.t -> 'a -> + ('a, istream Request.t -> Response.t) Route.t -> 'a -> unit (** Similar to {!add_route_handler}, but where the body of the request is a stream of bytes that has not been read yet.