diff --git a/README.md b/README.md index 27e8968d..0dbc50ff 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,32 @@ it allows downloading the files, and listing directories. If a directory contains `index.html` then this will be served instead of listing the content. +## Steaming response body + +Tiny_httpd provides multiple ways of returning a body in a response. +The response body type is: + +```ocaml +type body = + [ `String of string + | `Stream of byte_stream + | `Writer of Tiny_httpd_io.Writer.t + | `Void ] +``` + +The simplest way is to return, say, `` `String "hello" ``. The response +will have a set content-length header and its body is just the string. +Some responses don't have a body at all, which is where `` `Void `` is useful. + +The `` `Stream _ `` case is more advanced and really only intended for experts. + +The `` `Writer w `` is new, and is intended as an easy way to write the +body in a streaming fashion. See 'examples/writer.ml' to see a full example. +Typically the idea is to create the body with `Tiny_httpd_io.Writer.make ~write ()` +where `write` will be called with an output channel (the connection to the client), +and can write whatever it wants to this channel. Once the `write` function returns +the body has been fully sent and the next request can be processed. + ## Socket activation Since version 0.10, socket activation is supported indirectly, by allowing a diff --git a/examples/dune b/examples/dune index 4cafb199..dd8e19a3 100644 --- a/examples/dune +++ b/examples/dune @@ -21,6 +21,12 @@ (libraries tiny_httpd tiny_httpd_camlzip tiny_httpd_eio eio eio_posix)) +(executable + (name writer) + (flags :standard -warn-error -a+8) + (modules writer) + (libraries tiny_httpd)) + (rule (targets test_output.txt) (deps diff --git a/examples/echo.ml b/examples/echo.ml index d00d468d..fb06586a 100644 --- a/examples/echo.ml +++ b/examples/echo.ml @@ -88,8 +88,8 @@ let () = "echo [option]*"; let server = S.create ~port:!port_ ~max_connections:!j () in - Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server; + Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server; let m_stats, get_stats = middleware_stat () in S.add_middleware server ~stage:(`Stage 1) m_stats; diff --git a/examples/writer.ml b/examples/writer.ml new file mode 100644 index 00000000..e00399f2 --- /dev/null +++ b/examples/writer.ml @@ -0,0 +1,62 @@ +module H = Tiny_httpd + +let serve_zeroes server : unit = + H.add_route_handler server H.(Route.(exact "zeroes" @/ int @/ return)) + @@ fun n _req -> + (* stream [n] zeroes *) + let write (oc : H.IO.Out_channel.t) : unit = + let buf = Bytes.make 1 '0' in + for _i = 1 to n do + H.IO.Out_channel.output oc buf 0 1 + done + in + let writer = H.IO.Writer.make ~write () in + H.Response.make_writer @@ Ok writer + +let serve_file server : unit = + H.add_route_handler server H.(Route.(exact "file" @/ string @/ return)) + @@ fun file _req -> + if Sys.file_exists file then ( + (* stream the content of the file *) + let write oc = + let buf = Bytes.create 4096 in + let ic = open_in file in + Fun.protect ~finally:(fun () -> close_in_noerr ic) @@ fun () -> + while + let n = input ic buf 0 (Bytes.length buf) in + if n > 0 then H.IO.Out_channel.output oc buf 0 n; + n > 0 + do + () + done + in + + let writer = H.IO.Writer.make ~write () in + H.Response.make_writer @@ Ok writer + ) else + H.Response.fail ~code:404 "file not found" + +let () = + let port = ref 8085 in + Arg.parse [ "-p", Arg.Set_int port, " port" ] ignore ""; + let server = H.create ~port:!port () in + Printf.printf "listen on http://localhost:%d/\n%!" !port; + serve_file server; + serve_zeroes server; + H.add_route_handler server H.Route.return (fun _req -> + let body = + H.Html.( + div [] + [ + p [] [ txt "routes" ]; + ul [] + [ + li [] + [ a [ A.href "/zeroes/1000" ] [ txt "get 1000 zeroes" ] ]; + li [] [ a [ A.href "/file/f_13M" ] [ txt "read file" ] ]; + ]; + ]) + |> H.Html.to_string_top + in + H.Response.make_string @@ Ok body); + H.run_exn server diff --git a/src/Tiny_httpd_buf.ml b/src/Tiny_httpd_buf.ml index e3e2faa2..30cc1a45 100644 --- a/src/Tiny_httpd_buf.ml +++ b/src/Tiny_httpd_buf.ml @@ -1,12 +1,19 @@ -type t = { mutable bytes: bytes; mutable i: int } +type t = { mutable bytes: bytes; mutable i: int; original: bytes } -let create ?(size = 4_096) () : t = { bytes = Bytes.make size ' '; i = 0 } -let size self = self.i -let bytes_slice self = self.bytes +let create ?(size = 4_096) () : t = + let bytes = Bytes.make size ' ' in + { bytes; i = 0; original = bytes } + +let[@inline] size self = self.i +let[@inline] bytes_slice self = self.bytes let clear self : unit = - if Bytes.length self.bytes > 4_096 * 1_024 then - self.bytes <- Bytes.make 4096 ' ' (* free big buffer *); + if + Bytes.length self.bytes > 500 * 1_024 + && Bytes.length self.bytes > Bytes.length self.original + then + (* free big buffer *) + self.bytes <- self.original; self.i <- 0 let resize self new_size : unit = @@ -14,6 +21,12 @@ let resize self new_size : unit = Bytes.blit self.bytes 0 new_buf 0 self.i; self.bytes <- new_buf +let add_char self c : unit = + if self.i + 1 >= Bytes.length self.bytes then + resize self (self.i + (self.i / 2) + 10); + Bytes.set self.bytes self.i c; + self.i <- 1 + self.i + let add_bytes (self : t) s i len : unit = if self.i + len >= Bytes.length self.bytes then resize self (self.i + (self.i / 2) + len + 10); diff --git a/src/Tiny_httpd_buf.mli b/src/Tiny_httpd_buf.mli index 2bcfe58b..702cc787 100644 --- a/src/Tiny_httpd_buf.mli +++ b/src/Tiny_httpd_buf.mli @@ -21,6 +21,10 @@ val contents_and_clear : t -> string (** Get contents of the buffer and clear it. @since 0.5 *) +val add_char : t -> char -> unit +(** Add a single char. + @since NEXT_RELEASE *) + val add_bytes : t -> bytes -> int -> int -> unit (** Append given bytes slice to the buffer. @since 0.5 *) diff --git a/src/Tiny_httpd_html.ml b/src/Tiny_httpd_html.ml index 4a6532f2..1872707d 100644 --- a/src/Tiny_httpd_html.ml +++ b/src/Tiny_httpd_html.ml @@ -6,33 +6,59 @@ @since 0.12 *) +module IO = Tiny_httpd_io + include Tiny_httpd_html_ (** @inline *) +(** Write an HTML element to this out channel. + @param top if true, add DOCTYPE at the beginning. The top element should then + be a "html" tag. + @since NEXT_RELEASE + *) +let to_out_channel ?(top = false) (self : elt) (out : IO.Out_channel.t) : unit = + let out = Out.create_of_out out in + if top then Out.add_string out "\n"; + self out; + Out.add_format_nl out; + Out.flush out + (** Convert a HTML element to a string. @param top if true, add DOCTYPE at the beginning. The top element should then be a "html" tag. *) -let to_string ?(top = false) (self : elt) : string = - let out = Out.create () in - if top then Out.add_string out "\n"; - self out; - Out.to_string out +let to_string ?top (self : elt) : string = + let buf = Buffer.create 64 in + let out = IO.Out_channel.of_buffer buf in + to_out_channel ?top self out; + Buffer.contents buf (** Convert a list of HTML elements to a string. This is designed for fragments of HTML that are to be injected inside a bigger context, as it's invalid to have multiple elements at the toplevel of a HTML document. *) let to_string_l (l : elt list) = - let out = Out.create () in + let buf = Buffer.create 64 in + let out = Out.create_of_buffer buf in List.iter (fun f -> f out; Out.add_format_nl out) l; - Out.to_string out + Buffer.contents buf let to_string_top = to_string ~top:true +(** Write a toplevel element to an output channel. + @since NEXT_RELEASE *) +let to_out_channel_top = to_out_channel ~top:true + +(** Produce a streaming writer from this HTML element. + @param top if true, add a DOCTYPE. See {!to_out_channel}. + @since NEXT_RELEASE *) +let to_writer ?top (self : elt) : IO.Writer.t = + let write oc = to_out_channel ?top self oc in + IO.Writer.make ~write () + (** Convert a HTML element to a stream. This might just convert it to a string first, do not assume it to be more efficient. *) let to_stream (self : elt) : Tiny_httpd_stream.t = diff --git a/src/Tiny_httpd_io.ml b/src/Tiny_httpd_io.ml index 749f53d9..815bd8de 100644 --- a/src/Tiny_httpd_io.ml +++ b/src/Tiny_httpd_io.ml @@ -10,13 +10,18 @@ module Buf = Tiny_httpd_buf +(** Input channel (byte source) *) module In_channel = struct type t = { input: bytes -> int -> int -> int; (** Read into the slice. Returns [0] only if the channel is closed. *) - close: unit -> unit; + close: unit -> unit; (** Close the input. Must be idempotent. *) } + (** An input channel, i.e an incoming stream of bytes. + + This can be a [string], an [int_channel], an [Unix.file_descr], a + decompression wrapper around another input channel, etc. *) let of_in_channel ?(close_noerr = false) (ic : in_channel) : t = { @@ -40,19 +45,32 @@ module In_channel = struct Unix.close fd); } + (** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) let[@inline] input (self : t) buf i len = self.input buf i len + + (** Close the channel. *) let[@inline] close self : unit = self.close () end +(** Output channel (byte sink) *) module Out_channel = struct type t = { + output_char: char -> unit; (** Output a single char *) output: bytes -> int -> int -> unit; (** Output slice *) flush: unit -> unit; (** Flush underlying buffer *) - close: unit -> unit; + close: unit -> unit; (** Close the output. Must be idempotent. *) } + (** An output channel, ie. a place into which we can write bytes. + This can be a [Buffer.t], an [out_channel], a [Unix.file_descr], etc. *) + + (** [of_out_channel oc] wraps the channel into a {!Out_channel.t}. + @param close_noerr if true, then closing the result uses [close_out_noerr] + instead of [close_out] to close [oc] *) let of_out_channel ?(close_noerr = false) (oc : out_channel) : t = { + output_char = (fun c -> output_char oc c); output = (fun buf i len -> output oc buf i len); flush = (fun () -> flush oc); close = @@ -63,20 +81,110 @@ module Out_channel = struct close_out oc); } + (** [of_buffer buf] is an output channel that writes directly into [buf]. + [flush] and [close] have no effect. *) + let of_buffer (buf : Buffer.t) : t = + { + output_char = Buffer.add_char buf; + output = Buffer.add_subbytes buf; + flush = ignore; + close = ignore; + } + + (** Output the buffer slice into this channel *) + let[@inline] output_char (self : t) c : unit = self.output_char c + + (** Output the buffer slice into this channel *) let[@inline] output (self : t) buf i len : unit = self.output buf i len let[@inline] output_string (self : t) (str : string) : unit = self.output (Bytes.unsafe_of_string str) 0 (String.length str) + (** Close the channel. *) let[@inline] close self : unit = self.close () + + (** Flush (ie. force write) any buffered bytes. *) let[@inline] flush self : unit = self.flush () let output_buf (self : t) (buf : Buf.t) : unit = let b = Buf.bytes_slice buf in output self b 0 (Buf.size buf) + + (** [chunk_encoding oc] makes a new channel that outputs its content into [oc] + in chunk encoding form. + @param close_rec if true, closing the result will also close [oc] + @param buf a buffer used to accumulate data into chunks. + Chunks are emitted when [buf]'s size gets over a certain threshold, + or when [flush] is called. + *) + let chunk_encoding ?(buf = Buf.create ()) ~close_rec (self : t) : t = + (* write content of [buf] as a chunk if it's big enough. + If [force=true] then write content of [buf] if it's simply non empty. *) + let write_buf ~force () = + let n = Buf.size buf in + if (force && n > 0) || n > 4_096 then ( + output_string self (Printf.sprintf "%x\r\n" n); + self.output (Buf.bytes_slice buf) 0 n; + output_string self "\r\n"; + Buf.clear buf + ) + in + + let flush () = + write_buf ~force:true (); + self.flush () + in + + let close () = + write_buf ~force:true (); + (* write an empty chunk to close the stream *) + output_string self "0\r\n"; + (* write another crlf after the stream (see #56) *) + output_string self "\r\n"; + self.flush (); + if close_rec then self.close () + in + let output b i n = + Buf.add_bytes buf b i n; + write_buf ~force:false () + in + + let output_char c = + Buf.add_char buf c; + write_buf ~force:false () + in + { output_char; flush; close; output } end -(** A TCP server abstraction *) +(** A writer abstraction. *) +module Writer = struct + type t = { write: Out_channel.t -> unit } [@@unboxed] + (** Writer. + + A writer is a push-based stream of bytes. + Give it an output channel and it will write the bytes in it. + + This is useful for responses: an http endpoint can return a writer + as its response's body, and output into it as if it were a regular + [out_channel], including controlling calls to [flush]. + @since NEXT_RELEASE + *) + + let[@inline] make ~write () : t = { write } + + (** Write into the channel. *) + let[@inline] write (oc : Out_channel.t) (self : t) : unit = self.write oc + + (** Empty writer, will output 0 bytes. *) + let empty : t = { write = ignore } + + (** A writer that just emits the bytes from the given string. *) + let[@inline] of_string (str : string) : t = + let write oc = Out_channel.output_string oc str in + { write } +end + +(** A TCP server abstraction. *) module TCP_server = struct type conn_handler = { handle: In_channel.t -> Out_channel.t -> unit; @@ -90,14 +198,27 @@ module TCP_server = struct (** Number of connections currently active *) running: unit -> bool; (** Is the server currently running? *) stop: unit -> unit; - (** Ask the server to stop. This might not take effect immediately. *) + (** Ask the server to stop. This might not take effect immediately, + and is idempotent. After this [server.running()] must return [false]. *) } - (** Running server. *) + (** A running TCP server. + + This contains some functions that provide information about the running + server, including whether it's active (as opposed to stopped), a function + to stop it, and statistics about the number of connections. *) type builder = { serve: after_init:(t -> unit) -> handle:conn_handler -> unit -> unit; (** Blocking call to listen for incoming connections and handle them. - Uses the connection handler to handle individual client connections. *) + Uses the connection handler [handle] to handle individual client + connections in individual threads/fibers/tasks. + @param after_init is called once with the server after the server + has started. *) } - (** A TCP server implementation. *) + (** A TCP server builder implementation. + + Calling [builder.serve ~after_init ~handle ()] starts a new TCP server on + an unspecified endpoint + (most likely coming from the function returning this builder) + and returns the running server. *) end diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index dc9c25eb..f1879bde 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -366,7 +366,12 @@ end *) module Response = struct - type body = [ `String of string | `Stream of byte_stream | `Void ] + type body = + [ `String of string + | `Stream of byte_stream + | `Writer of IO.Writer.t + | `Void ] + type t = { code: Response_code.t; headers: Headers.t; body: body } let set_body body self = { self with body } @@ -383,10 +388,13 @@ module Response = struct { code; headers; body = `String body } let make_raw_stream ?(headers = []) ~code body : t = - (* add content length to response *) let headers = Headers.set "Transfer-Encoding" "chunked" headers in { code; headers; body = `Stream body } + let make_raw_writer ?(headers = []) ~code body : t = + let headers = Headers.set "Transfer-Encoding" "chunked" headers in + { code; headers; body = `Writer body } + let make_void_force_ ?(headers = []) ~code () : t = { code; headers; body = `Void } @@ -407,11 +415,17 @@ module Response = struct | Ok body -> make_raw_stream ?headers ~code:200 body | Error (code, msg) -> make_raw ?headers ~code msg + let make_writer ?headers r : t = + match r with + | Ok body -> make_raw_writer ?headers ~code:200 body + | Error (code, msg) -> make_raw ?headers ~code msg + let make ?headers r : t = match r with | Ok (`String body) -> make_raw ?headers ~code:200 body | Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body | Ok `Void -> make_void ?headers ~code:200 () + | Ok (`Writer f) -> make_raw_writer ?headers ~code:200 f | Error (code, msg) -> make_raw ?headers ~code msg let fail ?headers ~code fmt = @@ -424,6 +438,7 @@ module Response = struct let pp_body out = function | `String s -> Format.fprintf out "%S" s | `Stream _ -> Format.pp_print_string out "" + | `Writer _ -> Format.pp_print_string out "" | `Void -> () in Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code @@ -446,9 +461,10 @@ module Response = struct match self.body with | `String s when String.length s > 1024 * 500 -> (* chunk-encode large bodies *) - `Stream (Byte_stream.of_string s), true + `Writer (IO.Writer.of_string s), true | `String _ as b -> b, false | `Stream _ as b -> b, true + | `Writer _ as b -> b, true | `Void as b -> b, false in let headers = @@ -462,9 +478,9 @@ module Response = struct let self = { self with headers; body } in _debug (fun k -> k "output response: %s" - (Format.asprintf "%a" pp { self with body = `String "<…>" })); + (Format.asprintf "%a" pp { self with body = `String "<...>" })); - (* write headers *) + (* write headers, using [buf] to batch writes *) List.iter (fun (k, v) -> Printf.bprintf tmp_buffer "%s: %s\r\n" k v; @@ -474,13 +490,23 @@ module Response = struct IO.Out_channel.output_buf oc buf; IO.Out_channel.output_string oc "\r\n"; + Buf.clear buf; (match body with | `String "" | `Void -> () | `String s -> IO.Out_channel.output_string oc s + | `Writer w -> + (* use buffer to chunk encode [w] *) + let oc' = IO.Out_channel.chunk_encoding ~buf ~close_rec:false oc in + (try + IO.Writer.write oc' w; + IO.Out_channel.close oc' + with e -> + IO.Out_channel.close oc'; + raise e) | `Stream str -> (try - Byte_stream.output_chunked' oc str; + Byte_stream.output_chunked' ~buf oc str; Byte_stream.close str with e -> Byte_stream.close str; diff --git a/src/Tiny_httpd_server.mli b/src/Tiny_httpd_server.mli index 1903ba18..ea391cef 100644 --- a/src/Tiny_httpd_server.mli +++ b/src/Tiny_httpd_server.mli @@ -194,9 +194,21 @@ end the client to answer a {!Request.t}*) module Response : sig - type body = [ `String of string | `Stream of byte_stream | `Void ] + type body = + [ `String of string + | `Stream of byte_stream + | `Writer of Tiny_httpd_io.Writer.t + | `Void ] (** Body of a response, either as a simple string, - or a stream of bytes, or nothing (for server-sent events notably). *) + or a stream of bytes, or nothing (for server-sent events notably). + + - [`String str] replies with a body set to this string, and a known content-length. + - [`Stream str] replies with a body made from this string, using chunked encoding. + - [`Void] replies with no body. + - [`Writer w] replies with a body created by the writer [w], using + a chunked encoding. + It is available since NEXT_RELEASE. + *) type t = private { code: Response_code.t; (** HTTP response code. See {!Response_code}. *) @@ -251,6 +263,12 @@ module Response : sig ?headers:Headers.t -> (string, Response_code.t * string) result -> t (** Same as {!make} but with a string body. *) + val make_writer : + ?headers:Headers.t -> + (Tiny_httpd_io.Writer.t, Response_code.t * string) result -> + t + (** Same as {!make} but with a writer body. *) + val make_stream : ?headers:Headers.t -> (byte_stream, Response_code.t * string) result -> t (** Same as {!make} but with a stream body. *) diff --git a/src/Tiny_httpd_stream.ml b/src/Tiny_httpd_stream.ml index 3a100ebf..cdaad2e2 100644 --- a/src/Tiny_httpd_stream.ml +++ b/src/Tiny_httpd_stream.ml @@ -83,11 +83,13 @@ let rec iter f (self : t) : unit = (iter [@tailcall]) f self ) -let to_chan (oc : out_channel) (self : t) = - iter (fun s i len -> output oc s i len) self +let to_chan (oc : out_channel) (self : t) = iter (output oc) self let to_chan' (oc : IO.Out_channel.t) (self : t) = - iter (fun s i len -> IO.Out_channel.output oc s i len) self + iter (IO.Out_channel.output oc) self + +let to_writer (self : t) : Tiny_httpd_io.Writer.t = + { write = (fun oc -> to_chan' oc self) } let of_bytes ?(i = 0) ?len (bs : bytes) : t = (* invariant: !i+!len is constant *) @@ -297,22 +299,11 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t = refill := false) () -let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit = - let continue = ref true in - while !continue do - (* next chunk *) - self.fill_buf (); - let n = self.len in - IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n); - IO.Out_channel.output oc self.bs self.off n; - self.consume n; - if n = 0 then continue := false; - IO.Out_channel.output_string oc "\r\n" - done; - (* write another crlf after the stream (see #56) *) - IO.Out_channel.output_string oc "\r\n"; - () +let output_chunked' ?buf (oc : IO.Out_channel.t) (self : t) : unit = + let oc' = IO.Out_channel.chunk_encoding ?buf oc ~close_rec:false in + to_chan' oc' self; + IO.Out_channel.close oc' (* print a stream as a series of chunks *) -let output_chunked (oc : out_channel) (self : t) : unit = - output_chunked' (IO.Out_channel.of_out_channel oc) self +let output_chunked ?buf (oc : out_channel) (self : t) : unit = + output_chunked' ?buf (IO.Out_channel.of_out_channel oc) self diff --git a/src/Tiny_httpd_stream.mli b/src/Tiny_httpd_stream.mli index baa7dfb5..7f8b38b8 100644 --- a/src/Tiny_httpd_stream.mli +++ b/src/Tiny_httpd_stream.mli @@ -98,6 +98,10 @@ val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit (** Write to the IO channel. @since NEXT_RELEASE *) +val to_writer : t -> Tiny_httpd_io.Writer.t +(** Turn this stream into a writer. + @since NEXT_RELEASE *) + val make : ?bs:bytes -> ?close:(t -> unit) -> @@ -145,9 +149,11 @@ val read_exactly : @param too_short is called if [bs] closes with still [n] bytes remaining *) -val output_chunked : out_channel -> t -> unit -(** Write the stream into the channel, using the chunked encoding. *) +val output_chunked : ?buf:Tiny_httpd_buf.t -> out_channel -> t -> unit +(** Write the stream into the channel, using the chunked encoding. + @param buf optional buffer for chunking (since NEXT_RELEASE) *) -val output_chunked' : Tiny_httpd_io.Out_channel.t -> t -> unit +val output_chunked' : + ?buf:Tiny_httpd_buf.t -> Tiny_httpd_io.Out_channel.t -> t -> unit (** Write the stream into the channel, using the chunked encoding. @since NEXT_RELEASE *) diff --git a/src/camlzip/Tiny_httpd_camlzip.ml b/src/camlzip/Tiny_httpd_camlzip.ml index 6fdb501a..5d3cbe6c 100644 --- a/src/camlzip/Tiny_httpd_camlzip.ml +++ b/src/camlzip/Tiny_httpd_camlzip.ml @@ -1,5 +1,7 @@ module S = Tiny_httpd_server module BS = Tiny_httpd_stream +module W = Tiny_httpd_io.Writer +module Out = Tiny_httpd_io.Out_channel let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = S._debug (fun k -> k "wrap stream with deflate.decode"); @@ -40,67 +42,77 @@ let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = )) () -let encode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = - S._debug (fun k -> k "wrap stream with deflate.encode"); - let refill = ref true in +let encode_deflate_writer_ ~buf_size (w : W.t) : W.t = + S._debug (fun k -> k "wrap writer with deflate.encode"); let zlib_str = Zlib.deflate_init 4 false in - BS.make ~bs:(Bytes.create buf_size) - ~close:(fun _self -> - S._debug (fun k -> k "deflate: close"); - Zlib.deflate_end zlib_str; - BS.close is) - ~consume:(fun self n -> - self.off <- self.off + n; - self.len <- self.len - n) - ~fill:(fun self -> - let rec loop () = - S._debug (fun k -> - k "deflate.fill.iter out_off=%d out_len=%d" self.off self.len); - if self.len > 0 then - () - (* still the same slice, not consumed entirely by output *) - else if not !refill then - () - (* empty slice, no refill *) - else ( - (* the output was entirely consumed, we need to do more work *) - is.BS.fill_buf (); - if is.len > 0 then ( - (* try to decompress from input buffer *) - let _finished, used_in, used_out = - Zlib.deflate zlib_str is.bs is.off is.len self.bs 0 - (Bytes.length self.bs) Zlib.Z_NO_FLUSH - in - self.off <- 0; - self.len <- used_out; - is.consume used_in; - S._debug (fun k -> - k "encode %d bytes as %d bytes using deflate (finished: %b)" - used_in used_out _finished); - if _finished then ( - S._debug (fun k -> k "deflate: finished"); - refill := false - ); - loop () - ) else ( - (* [is] is done, finish sending the data in current buffer *) - let _finished, used_in, used_out = - Zlib.deflate zlib_str is.bs is.off is.len self.bs 0 - (Bytes.length self.bs) Zlib.Z_FULL_FLUSH - in - assert (used_in = 0); - self.off <- 0; - self.len <- used_out; - if used_out = 0 then refill := false; - loop () - ) - ) + + let o_buf = Bytes.create buf_size in + let o_off = ref 0 in + let o_len = ref 0 in + + (* write output buffer to out *) + let write_out (oc : Out.t) = + if !o_len > 0 then ( + Out.output oc o_buf !o_off !o_len; + o_off := 0; + o_len := 0 + ) + in + + let flush_zlib ~flush (oc : Out.t) = + let continue = ref true in + while !continue do + let finished, used_in, used_out = + Zlib.deflate zlib_str Bytes.empty 0 0 o_buf 0 (Bytes.length o_buf) flush in - try loop () - with Zlib.Error (e1, e2) -> - S.Response.fail_raise ~code:400 - "deflate: error during compression:\n%s %s" e1 e2) - () + assert (used_in = 0); + o_len := !o_len + used_out; + if finished then continue := false; + write_out oc + done + in + + (* compress and consume input buffer *) + let write_zlib ~flush (oc : Out.t) buf i len = + let i = ref i in + let len = ref len in + while !len > 0 do + let _finished, used_in, used_out = + Zlib.deflate zlib_str buf !i !len o_buf 0 (Bytes.length o_buf) flush + in + i := !i + used_in; + len := !len - used_in; + o_len := !o_len + used_out; + write_out oc + done + in + + let write (oc : Out.t) : unit = + let output buf i len = write_zlib ~flush:Zlib.Z_NO_FLUSH oc buf i len in + + let bchar = Bytes.create 1 in + let output_char c = + Bytes.set bchar 0 c; + output bchar 0 1 + in + + let flush () = + flush_zlib oc ~flush:Zlib.Z_FINISH; + assert (!o_len = 0); + oc.flush () + in + let close () = + flush (); + Zlib.deflate_end zlib_str; + oc.close () + in + (* new output channel that compresses on the fly *) + let oc' = { Out.flush; close; output; output_char } in + w.write oc'; + oc'.close () + in + + W.make ~write () let split_on_char ?(f = fun x -> x) c s : string list = let rec loop acc i = @@ -161,15 +173,21 @@ let compress_resp_stream_ ~compress_above ~buf_size (req : _ S.Request.t) S._debug (fun k -> k "encode str response with deflate (size %d, threshold %d)" (String.length s) compress_above); - let body = encode_deflate_stream_ ~buf_size @@ BS.of_string s in + let body = encode_deflate_writer_ ~buf_size @@ W.of_string s in resp |> S.Response.update_headers update_headers - |> S.Response.set_body (`Stream body) + |> S.Response.set_body (`Writer body) | `Stream str -> S._debug (fun k -> k "encode stream response with deflate"); + let w = BS.to_writer str in resp |> S.Response.update_headers update_headers - |> S.Response.set_body (`Stream (encode_deflate_stream_ ~buf_size str)) + |> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w)) + | `Writer w -> + S._debug (fun k -> k "encode writer response with deflate"); + resp + |> S.Response.update_headers update_headers + |> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w)) | `String _ | `Void -> resp ) else resp diff --git a/src/eio/tiny_httpd_eio.ml b/src/eio/tiny_httpd_eio.ml index d8c89008..47bf3727 100644 --- a/src/eio/tiny_httpd_eio.ml +++ b/src/eio/tiny_httpd_eio.ml @@ -93,8 +93,16 @@ let oc_of_flow ~buf_pool:oc_pool (flow : Eio.Net.stream_socket) : if !offset = Bytes.length wbuf then flush () done in + + let output_char c = + if !offset = Bytes.length wbuf then flush (); + Bytes.set wbuf !offset c; + incr offset; + if !offset = Bytes.length wbuf then flush () + in + let close () = flow#shutdown `Send in - { IO.Out_channel.close; flush; output } + { IO.Out_channel.close; flush; output; output_char } let io_backend ?(addr = "127.0.0.1") ?(port = 8080) ?max_connections ~(stdenv : Eio_unix.Stdenv.base) ~(sw : Eio.Switch.t) () : diff --git a/src/gen/gentags.ml b/src/gen/gentags.ml index 09ead697..d956f14d 100644 --- a/src/gen/gentags.ml +++ b/src/gen/gentags.ml @@ -287,28 +287,31 @@ let prelude = This output type is used to produce a string reasonably efficiently from a tree of combinators. + {b NOTE}: this is experimental and an unstable API. + @since 0.12 @open *) module Out : sig type t - val create : unit -> t - val clear : t -> unit + val create_of_buffer : Buffer.t -> t + val create_of_out: Tiny_httpd_io.Out_channel.t -> t + val flush : t -> unit val add_char : t -> char -> unit val add_string : t -> string -> unit val add_format_nl : t -> unit val with_no_format_nl : t -> (unit -> 'a) -> 'a - val to_string : t -> string end = struct + module IO = Tiny_httpd_io type t = { - buf: Buffer.t; - mutable fmt_nl: bool; (* if true, we print \b around to format the html *) + out: IO.Out_channel.t; + mutable fmt_nl: bool; (* if true, we print [\n] around tags to format the html *) } - let create () = {buf=Buffer.create 256; fmt_nl=true} - let clear self = Buffer.clear self.buf; self.fmt_nl <- true - let[@inline] add_char self c = Buffer.add_char self.buf c - let[@inline] add_string self s = Buffer.add_string self.buf s - let add_format_nl self = if self.fmt_nl then add_char self '\n' - let to_string self = add_format_nl self; Buffer.contents self.buf + let create_of_out out = {out; fmt_nl=true} + let create_of_buffer buf : t = create_of_out (IO.Out_channel.of_buffer buf) + let[@inline] flush self : unit = IO.Out_channel.flush self.out + let[@inline] add_char self c = IO.Out_channel.output_char self.out c + let[@inline] add_string self s = IO.Out_channel.output_string self.out s + let[@inline] add_format_nl self = if self.fmt_nl then add_char self '\n' let with_no_format_nl self f = if self.fmt_nl then ( self.fmt_nl <- false; diff --git a/writer.sh b/writer.sh new file mode 100755 index 00000000..d8722cd8 --- /dev/null +++ b/writer.sh @@ -0,0 +1,2 @@ +#!/bin/sh +exec dune exec --display=quiet -- examples/writer.exe $@