Merge pull request #68 from c-cube/wip-writer

introduce writer streams, use them in responses
This commit is contained in:
Simon Cruanes 2023-08-07 10:08:06 -04:00 committed by GitHub
commit b3b99af7ae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 456 additions and 126 deletions

View file

@ -105,6 +105,32 @@ it allows downloading the files, and listing directories.
If a directory contains `index.html` then this will be served If a directory contains `index.html` then this will be served
instead of listing the content. instead of listing the content.
## Steaming response body
Tiny_httpd provides multiple ways of returning a body in a response.
The response body type is:
```ocaml
type body =
[ `String of string
| `Stream of byte_stream
| `Writer of Tiny_httpd_io.Writer.t
| `Void ]
```
The simplest way is to return, say, `` `String "hello" ``. The response
will have a set content-length header and its body is just the string.
Some responses don't have a body at all, which is where `` `Void `` is useful.
The `` `Stream _ `` case is more advanced and really only intended for experts.
The `` `Writer w `` is new, and is intended as an easy way to write the
body in a streaming fashion. See 'examples/writer.ml' to see a full example.
Typically the idea is to create the body with `Tiny_httpd_io.Writer.make ~write ()`
where `write` will be called with an output channel (the connection to the client),
and can write whatever it wants to this channel. Once the `write` function returns
the body has been fully sent and the next request can be processed.
## Socket activation ## Socket activation
Since version 0.10, socket activation is supported indirectly, by allowing a Since version 0.10, socket activation is supported indirectly, by allowing a

View file

@ -21,6 +21,12 @@
(libraries tiny_httpd tiny_httpd_camlzip (libraries tiny_httpd tiny_httpd_camlzip
tiny_httpd_eio eio eio_posix)) tiny_httpd_eio eio eio_posix))
(executable
(name writer)
(flags :standard -warn-error -a+8)
(modules writer)
(libraries tiny_httpd))
(rule (rule
(targets test_output.txt) (targets test_output.txt)
(deps (deps

View file

@ -88,8 +88,8 @@ let () =
"echo [option]*"; "echo [option]*";
let server = S.create ~port:!port_ ~max_connections:!j () in let server = S.create ~port:!port_ ~max_connections:!j () in
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
let m_stats, get_stats = middleware_stat () in let m_stats, get_stats = middleware_stat () in
S.add_middleware server ~stage:(`Stage 1) m_stats; S.add_middleware server ~stage:(`Stage 1) m_stats;

62
examples/writer.ml Normal file
View file

@ -0,0 +1,62 @@
module H = Tiny_httpd
let serve_zeroes server : unit =
H.add_route_handler server H.(Route.(exact "zeroes" @/ int @/ return))
@@ fun n _req ->
(* stream [n] zeroes *)
let write (oc : H.IO.Out_channel.t) : unit =
let buf = Bytes.make 1 '0' in
for _i = 1 to n do
H.IO.Out_channel.output oc buf 0 1
done
in
let writer = H.IO.Writer.make ~write () in
H.Response.make_writer @@ Ok writer
let serve_file server : unit =
H.add_route_handler server H.(Route.(exact "file" @/ string @/ return))
@@ fun file _req ->
if Sys.file_exists file then (
(* stream the content of the file *)
let write oc =
let buf = Bytes.create 4096 in
let ic = open_in file in
Fun.protect ~finally:(fun () -> close_in_noerr ic) @@ fun () ->
while
let n = input ic buf 0 (Bytes.length buf) in
if n > 0 then H.IO.Out_channel.output oc buf 0 n;
n > 0
do
()
done
in
let writer = H.IO.Writer.make ~write () in
H.Response.make_writer @@ Ok writer
) else
H.Response.fail ~code:404 "file not found"
let () =
let port = ref 8085 in
Arg.parse [ "-p", Arg.Set_int port, " port" ] ignore "";
let server = H.create ~port:!port () in
Printf.printf "listen on http://localhost:%d/\n%!" !port;
serve_file server;
serve_zeroes server;
H.add_route_handler server H.Route.return (fun _req ->
let body =
H.Html.(
div []
[
p [] [ txt "routes" ];
ul []
[
li []
[ a [ A.href "/zeroes/1000" ] [ txt "get 1000 zeroes" ] ];
li [] [ a [ A.href "/file/f_13M" ] [ txt "read file" ] ];
];
])
|> H.Html.to_string_top
in
H.Response.make_string @@ Ok body);
H.run_exn server

View file

@ -1,12 +1,19 @@
type t = { mutable bytes: bytes; mutable i: int } type t = { mutable bytes: bytes; mutable i: int; original: bytes }
let create ?(size = 4_096) () : t = { bytes = Bytes.make size ' '; i = 0 } let create ?(size = 4_096) () : t =
let size self = self.i let bytes = Bytes.make size ' ' in
let bytes_slice self = self.bytes { bytes; i = 0; original = bytes }
let[@inline] size self = self.i
let[@inline] bytes_slice self = self.bytes
let clear self : unit = let clear self : unit =
if Bytes.length self.bytes > 4_096 * 1_024 then if
self.bytes <- Bytes.make 4096 ' ' (* free big buffer *); Bytes.length self.bytes > 500 * 1_024
&& Bytes.length self.bytes > Bytes.length self.original
then
(* free big buffer *)
self.bytes <- self.original;
self.i <- 0 self.i <- 0
let resize self new_size : unit = let resize self new_size : unit =
@ -14,6 +21,12 @@ let resize self new_size : unit =
Bytes.blit self.bytes 0 new_buf 0 self.i; Bytes.blit self.bytes 0 new_buf 0 self.i;
self.bytes <- new_buf self.bytes <- new_buf
let add_char self c : unit =
if self.i + 1 >= Bytes.length self.bytes then
resize self (self.i + (self.i / 2) + 10);
Bytes.set self.bytes self.i c;
self.i <- 1 + self.i
let add_bytes (self : t) s i len : unit = let add_bytes (self : t) s i len : unit =
if self.i + len >= Bytes.length self.bytes then if self.i + len >= Bytes.length self.bytes then
resize self (self.i + (self.i / 2) + len + 10); resize self (self.i + (self.i / 2) + len + 10);

View file

@ -21,6 +21,10 @@ val contents_and_clear : t -> string
(** Get contents of the buffer and clear it. (** Get contents of the buffer and clear it.
@since 0.5 *) @since 0.5 *)
val add_char : t -> char -> unit
(** Add a single char.
@since NEXT_RELEASE *)
val add_bytes : t -> bytes -> int -> int -> unit val add_bytes : t -> bytes -> int -> int -> unit
(** Append given bytes slice to the buffer. (** Append given bytes slice to the buffer.
@since 0.5 *) @since 0.5 *)

View file

@ -6,33 +6,59 @@
@since 0.12 @since 0.12
*) *)
module IO = Tiny_httpd_io
include Tiny_httpd_html_ include Tiny_httpd_html_
(** @inline *) (** @inline *)
(** Write an HTML element to this out channel.
@param top if true, add DOCTYPE at the beginning. The top element should then
be a "html" tag.
@since NEXT_RELEASE
*)
let to_out_channel ?(top = false) (self : elt) (out : IO.Out_channel.t) : unit =
let out = Out.create_of_out out in
if top then Out.add_string out "<!DOCTYPE html>\n";
self out;
Out.add_format_nl out;
Out.flush out
(** Convert a HTML element to a string. (** Convert a HTML element to a string.
@param top if true, add DOCTYPE at the beginning. The top element should then @param top if true, add DOCTYPE at the beginning. The top element should then
be a "html" tag. *) be a "html" tag. *)
let to_string ?(top = false) (self : elt) : string = let to_string ?top (self : elt) : string =
let out = Out.create () in let buf = Buffer.create 64 in
if top then Out.add_string out "<!DOCTYPE html>\n"; let out = IO.Out_channel.of_buffer buf in
self out; to_out_channel ?top self out;
Out.to_string out Buffer.contents buf
(** Convert a list of HTML elements to a string. (** Convert a list of HTML elements to a string.
This is designed for fragments of HTML that are to be injected inside This is designed for fragments of HTML that are to be injected inside
a bigger context, as it's invalid to have multiple elements at the toplevel a bigger context, as it's invalid to have multiple elements at the toplevel
of a HTML document. *) of a HTML document. *)
let to_string_l (l : elt list) = let to_string_l (l : elt list) =
let out = Out.create () in let buf = Buffer.create 64 in
let out = Out.create_of_buffer buf in
List.iter List.iter
(fun f -> (fun f ->
f out; f out;
Out.add_format_nl out) Out.add_format_nl out)
l; l;
Out.to_string out Buffer.contents buf
let to_string_top = to_string ~top:true let to_string_top = to_string ~top:true
(** Write a toplevel element to an output channel.
@since NEXT_RELEASE *)
let to_out_channel_top = to_out_channel ~top:true
(** Produce a streaming writer from this HTML element.
@param top if true, add a DOCTYPE. See {!to_out_channel}.
@since NEXT_RELEASE *)
let to_writer ?top (self : elt) : IO.Writer.t =
let write oc = to_out_channel ?top self oc in
IO.Writer.make ~write ()
(** Convert a HTML element to a stream. This might just convert (** Convert a HTML element to a stream. This might just convert
it to a string first, do not assume it to be more efficient. *) it to a string first, do not assume it to be more efficient. *)
let to_stream (self : elt) : Tiny_httpd_stream.t = let to_stream (self : elt) : Tiny_httpd_stream.t =

View file

@ -10,13 +10,18 @@
module Buf = Tiny_httpd_buf module Buf = Tiny_httpd_buf
(** Input channel (byte source) *)
module In_channel = struct module In_channel = struct
type t = { type t = {
input: bytes -> int -> int -> int; input: bytes -> int -> int -> int;
(** Read into the slice. Returns [0] only if the (** Read into the slice. Returns [0] only if the
channel is closed. *) channel is closed. *)
close: unit -> unit; close: unit -> unit; (** Close the input. Must be idempotent. *)
} }
(** An input channel, i.e an incoming stream of bytes.
This can be a [string], an [int_channel], an [Unix.file_descr], a
decompression wrapper around another input channel, etc. *)
let of_in_channel ?(close_noerr = false) (ic : in_channel) : t = let of_in_channel ?(close_noerr = false) (ic : in_channel) : t =
{ {
@ -40,19 +45,32 @@ module In_channel = struct
Unix.close fd); Unix.close fd);
} }
(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : t) buf i len = self.input buf i len let[@inline] input (self : t) buf i len = self.input buf i len
(** Close the channel. *)
let[@inline] close self : unit = self.close () let[@inline] close self : unit = self.close ()
end end
(** Output channel (byte sink) *)
module Out_channel = struct module Out_channel = struct
type t = { type t = {
output_char: char -> unit; (** Output a single char *)
output: bytes -> int -> int -> unit; (** Output slice *) output: bytes -> int -> int -> unit; (** Output slice *)
flush: unit -> unit; (** Flush underlying buffer *) flush: unit -> unit; (** Flush underlying buffer *)
close: unit -> unit; close: unit -> unit; (** Close the output. Must be idempotent. *)
} }
(** An output channel, ie. a place into which we can write bytes.
This can be a [Buffer.t], an [out_channel], a [Unix.file_descr], etc. *)
(** [of_out_channel oc] wraps the channel into a {!Out_channel.t}.
@param close_noerr if true, then closing the result uses [close_out_noerr]
instead of [close_out] to close [oc] *)
let of_out_channel ?(close_noerr = false) (oc : out_channel) : t = let of_out_channel ?(close_noerr = false) (oc : out_channel) : t =
{ {
output_char = (fun c -> output_char oc c);
output = (fun buf i len -> output oc buf i len); output = (fun buf i len -> output oc buf i len);
flush = (fun () -> flush oc); flush = (fun () -> flush oc);
close = close =
@ -63,20 +81,110 @@ module Out_channel = struct
close_out oc); close_out oc);
} }
(** [of_buffer buf] is an output channel that writes directly into [buf].
[flush] and [close] have no effect. *)
let of_buffer (buf : Buffer.t) : t =
{
output_char = Buffer.add_char buf;
output = Buffer.add_subbytes buf;
flush = ignore;
close = ignore;
}
(** Output the buffer slice into this channel *)
let[@inline] output_char (self : t) c : unit = self.output_char c
(** Output the buffer slice into this channel *)
let[@inline] output (self : t) buf i len : unit = self.output buf i len let[@inline] output (self : t) buf i len : unit = self.output buf i len
let[@inline] output_string (self : t) (str : string) : unit = let[@inline] output_string (self : t) (str : string) : unit =
self.output (Bytes.unsafe_of_string str) 0 (String.length str) self.output (Bytes.unsafe_of_string str) 0 (String.length str)
(** Close the channel. *)
let[@inline] close self : unit = self.close () let[@inline] close self : unit = self.close ()
(** Flush (ie. force write) any buffered bytes. *)
let[@inline] flush self : unit = self.flush () let[@inline] flush self : unit = self.flush ()
let output_buf (self : t) (buf : Buf.t) : unit = let output_buf (self : t) (buf : Buf.t) : unit =
let b = Buf.bytes_slice buf in let b = Buf.bytes_slice buf in
output self b 0 (Buf.size buf) output self b 0 (Buf.size buf)
(** [chunk_encoding oc] makes a new channel that outputs its content into [oc]
in chunk encoding form.
@param close_rec if true, closing the result will also close [oc]
@param buf a buffer used to accumulate data into chunks.
Chunks are emitted when [buf]'s size gets over a certain threshold,
or when [flush] is called.
*)
let chunk_encoding ?(buf = Buf.create ()) ~close_rec (self : t) : t =
(* write content of [buf] as a chunk if it's big enough.
If [force=true] then write content of [buf] if it's simply non empty. *)
let write_buf ~force () =
let n = Buf.size buf in
if (force && n > 0) || n > 4_096 then (
output_string self (Printf.sprintf "%x\r\n" n);
self.output (Buf.bytes_slice buf) 0 n;
output_string self "\r\n";
Buf.clear buf
)
in
let flush () =
write_buf ~force:true ();
self.flush ()
in
let close () =
write_buf ~force:true ();
(* write an empty chunk to close the stream *)
output_string self "0\r\n";
(* write another crlf after the stream (see #56) *)
output_string self "\r\n";
self.flush ();
if close_rec then self.close ()
in
let output b i n =
Buf.add_bytes buf b i n;
write_buf ~force:false ()
in
let output_char c =
Buf.add_char buf c;
write_buf ~force:false ()
in
{ output_char; flush; close; output }
end end
(** A TCP server abstraction *) (** A writer abstraction. *)
module Writer = struct
type t = { write: Out_channel.t -> unit } [@@unboxed]
(** Writer.
A writer is a push-based stream of bytes.
Give it an output channel and it will write the bytes in it.
This is useful for responses: an http endpoint can return a writer
as its response's body, and output into it as if it were a regular
[out_channel], including controlling calls to [flush].
@since NEXT_RELEASE
*)
let[@inline] make ~write () : t = { write }
(** Write into the channel. *)
let[@inline] write (oc : Out_channel.t) (self : t) : unit = self.write oc
(** Empty writer, will output 0 bytes. *)
let empty : t = { write = ignore }
(** A writer that just emits the bytes from the given string. *)
let[@inline] of_string (str : string) : t =
let write oc = Out_channel.output_string oc str in
{ write }
end
(** A TCP server abstraction. *)
module TCP_server = struct module TCP_server = struct
type conn_handler = { type conn_handler = {
handle: In_channel.t -> Out_channel.t -> unit; handle: In_channel.t -> Out_channel.t -> unit;
@ -90,14 +198,27 @@ module TCP_server = struct
(** Number of connections currently active *) (** Number of connections currently active *)
running: unit -> bool; (** Is the server currently running? *) running: unit -> bool; (** Is the server currently running? *)
stop: unit -> unit; stop: unit -> unit;
(** Ask the server to stop. This might not take effect immediately. *) (** Ask the server to stop. This might not take effect immediately,
and is idempotent. After this [server.running()] must return [false]. *)
} }
(** Running server. *) (** A running TCP server.
This contains some functions that provide information about the running
server, including whether it's active (as opposed to stopped), a function
to stop it, and statistics about the number of connections. *)
type builder = { type builder = {
serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit; serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit;
(** Blocking call to listen for incoming connections and handle them. (** Blocking call to listen for incoming connections and handle them.
Uses the connection handler to handle individual client connections. *) Uses the connection handler [handle] to handle individual client
connections in individual threads/fibers/tasks.
@param after_init is called once with the server after the server
has started. *)
} }
(** A TCP server implementation. *) (** A TCP server builder implementation.
Calling [builder.serve ~after_init ~handle ()] starts a new TCP server on
an unspecified endpoint
(most likely coming from the function returning this builder)
and returns the running server. *)
end end

View file

@ -366,7 +366,12 @@ end
*) *)
module Response = struct module Response = struct
type body = [ `String of string | `Stream of byte_stream | `Void ] type body =
[ `String of string
| `Stream of byte_stream
| `Writer of IO.Writer.t
| `Void ]
type t = { code: Response_code.t; headers: Headers.t; body: body } type t = { code: Response_code.t; headers: Headers.t; body: body }
let set_body body self = { self with body } let set_body body self = { self with body }
@ -383,10 +388,13 @@ module Response = struct
{ code; headers; body = `String body } { code; headers; body = `String body }
let make_raw_stream ?(headers = []) ~code body : t = let make_raw_stream ?(headers = []) ~code body : t =
(* add content length to response *)
let headers = Headers.set "Transfer-Encoding" "chunked" headers in let headers = Headers.set "Transfer-Encoding" "chunked" headers in
{ code; headers; body = `Stream body } { code; headers; body = `Stream body }
let make_raw_writer ?(headers = []) ~code body : t =
let headers = Headers.set "Transfer-Encoding" "chunked" headers in
{ code; headers; body = `Writer body }
let make_void_force_ ?(headers = []) ~code () : t = let make_void_force_ ?(headers = []) ~code () : t =
{ code; headers; body = `Void } { code; headers; body = `Void }
@ -407,11 +415,17 @@ module Response = struct
| Ok body -> make_raw_stream ?headers ~code:200 body | Ok body -> make_raw_stream ?headers ~code:200 body
| Error (code, msg) -> make_raw ?headers ~code msg | Error (code, msg) -> make_raw ?headers ~code msg
let make_writer ?headers r : t =
match r with
| Ok body -> make_raw_writer ?headers ~code:200 body
| Error (code, msg) -> make_raw ?headers ~code msg
let make ?headers r : t = let make ?headers r : t =
match r with match r with
| Ok (`String body) -> make_raw ?headers ~code:200 body | Ok (`String body) -> make_raw ?headers ~code:200 body
| Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body | Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body
| Ok `Void -> make_void ?headers ~code:200 () | Ok `Void -> make_void ?headers ~code:200 ()
| Ok (`Writer f) -> make_raw_writer ?headers ~code:200 f
| Error (code, msg) -> make_raw ?headers ~code msg | Error (code, msg) -> make_raw ?headers ~code msg
let fail ?headers ~code fmt = let fail ?headers ~code fmt =
@ -424,6 +438,7 @@ module Response = struct
let pp_body out = function let pp_body out = function
| `String s -> Format.fprintf out "%S" s | `String s -> Format.fprintf out "%S" s
| `Stream _ -> Format.pp_print_string out "<stream>" | `Stream _ -> Format.pp_print_string out "<stream>"
| `Writer _ -> Format.pp_print_string out "<writer>"
| `Void -> () | `Void -> ()
in in
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
@ -446,9 +461,10 @@ module Response = struct
match self.body with match self.body with
| `String s when String.length s > 1024 * 500 -> | `String s when String.length s > 1024 * 500 ->
(* chunk-encode large bodies *) (* chunk-encode large bodies *)
`Stream (Byte_stream.of_string s), true `Writer (IO.Writer.of_string s), true
| `String _ as b -> b, false | `String _ as b -> b, false
| `Stream _ as b -> b, true | `Stream _ as b -> b, true
| `Writer _ as b -> b, true
| `Void as b -> b, false | `Void as b -> b, false
in in
let headers = let headers =
@ -462,9 +478,9 @@ module Response = struct
let self = { self with headers; body } in let self = { self with headers; body } in
_debug (fun k -> _debug (fun k ->
k "output response: %s" k "output response: %s"
(Format.asprintf "%a" pp { self with body = `String "<>" })); (Format.asprintf "%a" pp { self with body = `String "<...>" }));
(* write headers *) (* write headers, using [buf] to batch writes *)
List.iter List.iter
(fun (k, v) -> (fun (k, v) ->
Printf.bprintf tmp_buffer "%s: %s\r\n" k v; Printf.bprintf tmp_buffer "%s: %s\r\n" k v;
@ -474,13 +490,23 @@ module Response = struct
IO.Out_channel.output_buf oc buf; IO.Out_channel.output_buf oc buf;
IO.Out_channel.output_string oc "\r\n"; IO.Out_channel.output_string oc "\r\n";
Buf.clear buf;
(match body with (match body with
| `String "" | `Void -> () | `String "" | `Void -> ()
| `String s -> IO.Out_channel.output_string oc s | `String s -> IO.Out_channel.output_string oc s
| `Writer w ->
(* use buffer to chunk encode [w] *)
let oc' = IO.Out_channel.chunk_encoding ~buf ~close_rec:false oc in
(try
IO.Writer.write oc' w;
IO.Out_channel.close oc'
with e ->
IO.Out_channel.close oc';
raise e)
| `Stream str -> | `Stream str ->
(try (try
Byte_stream.output_chunked' oc str; Byte_stream.output_chunked' ~buf oc str;
Byte_stream.close str Byte_stream.close str
with e -> with e ->
Byte_stream.close str; Byte_stream.close str;

View file

@ -194,9 +194,21 @@ end
the client to answer a {!Request.t}*) the client to answer a {!Request.t}*)
module Response : sig module Response : sig
type body = [ `String of string | `Stream of byte_stream | `Void ] type body =
[ `String of string
| `Stream of byte_stream
| `Writer of Tiny_httpd_io.Writer.t
| `Void ]
(** Body of a response, either as a simple string, (** Body of a response, either as a simple string,
or a stream of bytes, or nothing (for server-sent events notably). *) or a stream of bytes, or nothing (for server-sent events notably).
- [`String str] replies with a body set to this string, and a known content-length.
- [`Stream str] replies with a body made from this string, using chunked encoding.
- [`Void] replies with no body.
- [`Writer w] replies with a body created by the writer [w], using
a chunked encoding.
It is available since NEXT_RELEASE.
*)
type t = private { type t = private {
code: Response_code.t; (** HTTP response code. See {!Response_code}. *) code: Response_code.t; (** HTTP response code. See {!Response_code}. *)
@ -251,6 +263,12 @@ module Response : sig
?headers:Headers.t -> (string, Response_code.t * string) result -> t ?headers:Headers.t -> (string, Response_code.t * string) result -> t
(** Same as {!make} but with a string body. *) (** Same as {!make} but with a string body. *)
val make_writer :
?headers:Headers.t ->
(Tiny_httpd_io.Writer.t, Response_code.t * string) result ->
t
(** Same as {!make} but with a writer body. *)
val make_stream : val make_stream :
?headers:Headers.t -> (byte_stream, Response_code.t * string) result -> t ?headers:Headers.t -> (byte_stream, Response_code.t * string) result -> t
(** Same as {!make} but with a stream body. *) (** Same as {!make} but with a stream body. *)

View file

@ -83,11 +83,13 @@ let rec iter f (self : t) : unit =
(iter [@tailcall]) f self (iter [@tailcall]) f self
) )
let to_chan (oc : out_channel) (self : t) = let to_chan (oc : out_channel) (self : t) = iter (output oc) self
iter (fun s i len -> output oc s i len) self
let to_chan' (oc : IO.Out_channel.t) (self : t) = let to_chan' (oc : IO.Out_channel.t) (self : t) =
iter (fun s i len -> IO.Out_channel.output oc s i len) self iter (IO.Out_channel.output oc) self
let to_writer (self : t) : Tiny_httpd_io.Writer.t =
{ write = (fun oc -> to_chan' oc self) }
let of_bytes ?(i = 0) ?len (bs : bytes) : t = let of_bytes ?(i = 0) ?len (bs : bytes) : t =
(* invariant: !i+!len is constant *) (* invariant: !i+!len is constant *)
@ -297,22 +299,11 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t =
refill := false) refill := false)
() ()
let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit = let output_chunked' ?buf (oc : IO.Out_channel.t) (self : t) : unit =
let continue = ref true in let oc' = IO.Out_channel.chunk_encoding ?buf oc ~close_rec:false in
while !continue do to_chan' oc' self;
(* next chunk *) IO.Out_channel.close oc'
self.fill_buf ();
let n = self.len in
IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n);
IO.Out_channel.output oc self.bs self.off n;
self.consume n;
if n = 0 then continue := false;
IO.Out_channel.output_string oc "\r\n"
done;
(* write another crlf after the stream (see #56) *)
IO.Out_channel.output_string oc "\r\n";
()
(* print a stream as a series of chunks *) (* print a stream as a series of chunks *)
let output_chunked (oc : out_channel) (self : t) : unit = let output_chunked ?buf (oc : out_channel) (self : t) : unit =
output_chunked' (IO.Out_channel.of_out_channel oc) self output_chunked' ?buf (IO.Out_channel.of_out_channel oc) self

View file

@ -98,6 +98,10 @@ val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write to the IO channel. (** Write to the IO channel.
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
val to_writer : t -> Tiny_httpd_io.Writer.t
(** Turn this stream into a writer.
@since NEXT_RELEASE *)
val make : val make :
?bs:bytes -> ?bs:bytes ->
?close:(t -> unit) -> ?close:(t -> unit) ->
@ -145,9 +149,11 @@ val read_exactly :
@param too_short is called if [bs] closes with still [n] bytes remaining @param too_short is called if [bs] closes with still [n] bytes remaining
*) *)
val output_chunked : out_channel -> t -> unit val output_chunked : ?buf:Tiny_httpd_buf.t -> out_channel -> t -> unit
(** Write the stream into the channel, using the chunked encoding. *) (** Write the stream into the channel, using the chunked encoding.
@param buf optional buffer for chunking (since NEXT_RELEASE) *)
val output_chunked' : Tiny_httpd_io.Out_channel.t -> t -> unit val output_chunked' :
?buf:Tiny_httpd_buf.t -> Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write the stream into the channel, using the chunked encoding. (** Write the stream into the channel, using the chunked encoding.
@since NEXT_RELEASE *) @since NEXT_RELEASE *)

View file

@ -1,5 +1,7 @@
module S = Tiny_httpd_server module S = Tiny_httpd_server
module BS = Tiny_httpd_stream module BS = Tiny_httpd_stream
module W = Tiny_httpd_io.Writer
module Out = Tiny_httpd_io.Out_channel
let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream =
S._debug (fun k -> k "wrap stream with deflate.decode"); S._debug (fun k -> k "wrap stream with deflate.decode");
@ -40,67 +42,77 @@ let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream =
)) ))
() ()
let encode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = let encode_deflate_writer_ ~buf_size (w : W.t) : W.t =
S._debug (fun k -> k "wrap stream with deflate.encode"); S._debug (fun k -> k "wrap writer with deflate.encode");
let refill = ref true in
let zlib_str = Zlib.deflate_init 4 false in let zlib_str = Zlib.deflate_init 4 false in
BS.make ~bs:(Bytes.create buf_size)
~close:(fun _self -> let o_buf = Bytes.create buf_size in
S._debug (fun k -> k "deflate: close"); let o_off = ref 0 in
Zlib.deflate_end zlib_str; let o_len = ref 0 in
BS.close is)
~consume:(fun self n -> (* write output buffer to out *)
self.off <- self.off + n; let write_out (oc : Out.t) =
self.len <- self.len - n) if !o_len > 0 then (
~fill:(fun self -> Out.output oc o_buf !o_off !o_len;
let rec loop () = o_off := 0;
S._debug (fun k -> o_len := 0
k "deflate.fill.iter out_off=%d out_len=%d" self.off self.len); )
if self.len > 0 then in
()
(* still the same slice, not consumed entirely by output *) let flush_zlib ~flush (oc : Out.t) =
else if not !refill then let continue = ref true in
() while !continue do
(* empty slice, no refill *) let finished, used_in, used_out =
else ( Zlib.deflate zlib_str Bytes.empty 0 0 o_buf 0 (Bytes.length o_buf) flush
(* the output was entirely consumed, we need to do more work *)
is.BS.fill_buf ();
if is.len > 0 then (
(* try to decompress from input buffer *)
let _finished, used_in, used_out =
Zlib.deflate zlib_str is.bs is.off is.len self.bs 0
(Bytes.length self.bs) Zlib.Z_NO_FLUSH
in
self.off <- 0;
self.len <- used_out;
is.consume used_in;
S._debug (fun k ->
k "encode %d bytes as %d bytes using deflate (finished: %b)"
used_in used_out _finished);
if _finished then (
S._debug (fun k -> k "deflate: finished");
refill := false
);
loop ()
) else (
(* [is] is done, finish sending the data in current buffer *)
let _finished, used_in, used_out =
Zlib.deflate zlib_str is.bs is.off is.len self.bs 0
(Bytes.length self.bs) Zlib.Z_FULL_FLUSH
in
assert (used_in = 0);
self.off <- 0;
self.len <- used_out;
if used_out = 0 then refill := false;
loop ()
)
)
in in
try loop () assert (used_in = 0);
with Zlib.Error (e1, e2) -> o_len := !o_len + used_out;
S.Response.fail_raise ~code:400 if finished then continue := false;
"deflate: error during compression:\n%s %s" e1 e2) write_out oc
() done
in
(* compress and consume input buffer *)
let write_zlib ~flush (oc : Out.t) buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let _finished, used_in, used_out =
Zlib.deflate zlib_str buf !i !len o_buf 0 (Bytes.length o_buf) flush
in
i := !i + used_in;
len := !len - used_in;
o_len := !o_len + used_out;
write_out oc
done
in
let write (oc : Out.t) : unit =
let output buf i len = write_zlib ~flush:Zlib.Z_NO_FLUSH oc buf i len in
let bchar = Bytes.create 1 in
let output_char c =
Bytes.set bchar 0 c;
output bchar 0 1
in
let flush () =
flush_zlib oc ~flush:Zlib.Z_FINISH;
assert (!o_len = 0);
oc.flush ()
in
let close () =
flush ();
Zlib.deflate_end zlib_str;
oc.close ()
in
(* new output channel that compresses on the fly *)
let oc' = { Out.flush; close; output; output_char } in
w.write oc';
oc'.close ()
in
W.make ~write ()
let split_on_char ?(f = fun x -> x) c s : string list = let split_on_char ?(f = fun x -> x) c s : string list =
let rec loop acc i = let rec loop acc i =
@ -161,15 +173,21 @@ let compress_resp_stream_ ~compress_above ~buf_size (req : _ S.Request.t)
S._debug (fun k -> S._debug (fun k ->
k "encode str response with deflate (size %d, threshold %d)" k "encode str response with deflate (size %d, threshold %d)"
(String.length s) compress_above); (String.length s) compress_above);
let body = encode_deflate_stream_ ~buf_size @@ BS.of_string s in let body = encode_deflate_writer_ ~buf_size @@ W.of_string s in
resp resp
|> S.Response.update_headers update_headers |> S.Response.update_headers update_headers
|> S.Response.set_body (`Stream body) |> S.Response.set_body (`Writer body)
| `Stream str -> | `Stream str ->
S._debug (fun k -> k "encode stream response with deflate"); S._debug (fun k -> k "encode stream response with deflate");
let w = BS.to_writer str in
resp resp
|> S.Response.update_headers update_headers |> S.Response.update_headers update_headers
|> S.Response.set_body (`Stream (encode_deflate_stream_ ~buf_size str)) |> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w))
| `Writer w ->
S._debug (fun k -> k "encode writer response with deflate");
resp
|> S.Response.update_headers update_headers
|> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w))
| `String _ | `Void -> resp | `String _ | `Void -> resp
) else ) else
resp resp

View file

@ -93,8 +93,16 @@ let oc_of_flow ~buf_pool:oc_pool (flow : Eio.Net.stream_socket) :
if !offset = Bytes.length wbuf then flush () if !offset = Bytes.length wbuf then flush ()
done done
in in
let output_char c =
if !offset = Bytes.length wbuf then flush ();
Bytes.set wbuf !offset c;
incr offset;
if !offset = Bytes.length wbuf then flush ()
in
let close () = flow#shutdown `Send in let close () = flow#shutdown `Send in
{ IO.Out_channel.close; flush; output } { IO.Out_channel.close; flush; output; output_char }
let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections
~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () : ~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () :

View file

@ -287,28 +287,31 @@ let prelude =
This output type is used to produce a string reasonably efficiently from This output type is used to produce a string reasonably efficiently from
a tree of combinators. a tree of combinators.
{b NOTE}: this is experimental and an unstable API.
@since 0.12 @since 0.12
@open *) @open *)
module Out : sig module Out : sig
type t type t
val create : unit -> t val create_of_buffer : Buffer.t -> t
val clear : t -> unit val create_of_out: Tiny_httpd_io.Out_channel.t -> t
val flush : t -> unit
val add_char : t -> char -> unit val add_char : t -> char -> unit
val add_string : t -> string -> unit val add_string : t -> string -> unit
val add_format_nl : t -> unit val add_format_nl : t -> unit
val with_no_format_nl : t -> (unit -> 'a) -> 'a val with_no_format_nl : t -> (unit -> 'a) -> 'a
val to_string : t -> string
end = struct end = struct
module IO = Tiny_httpd_io
type t = { type t = {
buf: Buffer.t; out: IO.Out_channel.t;
mutable fmt_nl: bool; (* if true, we print \b around to format the html *) mutable fmt_nl: bool; (* if true, we print [\n] around tags to format the html *)
} }
let create () = {buf=Buffer.create 256; fmt_nl=true} let create_of_out out = {out; fmt_nl=true}
let clear self = Buffer.clear self.buf; self.fmt_nl <- true let create_of_buffer buf : t = create_of_out (IO.Out_channel.of_buffer buf)
let[@inline] add_char self c = Buffer.add_char self.buf c let[@inline] flush self : unit = IO.Out_channel.flush self.out
let[@inline] add_string self s = Buffer.add_string self.buf s let[@inline] add_char self c = IO.Out_channel.output_char self.out c
let add_format_nl self = if self.fmt_nl then add_char self '\n' let[@inline] add_string self s = IO.Out_channel.output_string self.out s
let to_string self = add_format_nl self; Buffer.contents self.buf let[@inline] add_format_nl self = if self.fmt_nl then add_char self '\n'
let with_no_format_nl self f = let with_no_format_nl self f =
if self.fmt_nl then ( if self.fmt_nl then (
self.fmt_nl <- false; self.fmt_nl <- false;

2
writer.sh Executable file
View file

@ -0,0 +1,2 @@
#!/bin/sh
exec dune exec --display=quiet -- examples/writer.exe $@