From de23d9b2a3ad8e9911bb93041ecfcb56738b700a Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 11 Jul 2023 10:57:08 -0400 Subject: [PATCH] wip: add `IO.Writer.t`, a push based stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is more convenient than our existing streams when it comes to writing a body. The user can just get an output channel and write to it, flush it, etc. as they please. This should also simplify compression… once it works. --- src/Tiny_httpd_io.ml | 43 ++++++++++ src/Tiny_httpd_server.ml | 30 ++++++- src/Tiny_httpd_server.mli | 22 ++++- src/Tiny_httpd_stream.ml | 20 ++--- src/Tiny_httpd_stream.mli | 4 + src/camlzip/Tiny_httpd_camlzip.ml | 135 ++++++++++++++++-------------- 6 files changed, 173 insertions(+), 81 deletions(-) diff --git a/src/Tiny_httpd_io.ml b/src/Tiny_httpd_io.ml index 749f53d9..3aef4c84 100644 --- a/src/Tiny_httpd_io.ml +++ b/src/Tiny_httpd_io.ml @@ -17,6 +17,7 @@ module In_channel = struct channel is closed. *) close: unit -> unit; } + (** An input channel, i.e an incoming stream of bytes. *) let of_in_channel ?(close_noerr = false) (ic : in_channel) : t = { @@ -50,6 +51,7 @@ module Out_channel = struct flush: unit -> unit; (** Flush underlying buffer *) close: unit -> unit; } + (** An output channel, ie. a place into which we can write bytes. *) let of_out_channel ?(close_noerr = false) (oc : out_channel) : t = { @@ -74,6 +76,47 @@ module Out_channel = struct let output_buf (self : t) (buf : Buf.t) : unit = let b = Buf.bytes_slice buf in output self b 0 (Buf.size buf) + + (** [chunk_encoding oc] makes a new channel that outputs its content into [oc] + in chunk encoding form. + @param close_rec if true, closing the result will also close [oc] + *) + let chunk_encoding ~close_rec (self : t) : t = + let flush = self.flush in + let close () = + (* write another crlf after the stream (see #56) *) + output_string self "\r\n"; + self.flush (); + if close_rec then self.close () + in + let output buf i n = + if n > 0 then ( + output_string self (Printf.sprintf "%x\r\n" n); + self.output buf i n + ) + in + { flush; close; output } +end + +(** A writer abstraction. + + A writer is a push-based stream of bytes. Give it an output channel and it will write + the bytes in it. *) +module Writer = struct + type t = { write: Out_channel.t -> unit } [@@unboxed] + (** Writer. *) + + let[@inline] make ~write () : t = { write } + + (** Write into the channel. *) + let[@inline] write (oc : Out_channel.t) (self : t) : unit = self.write oc + + let empty : t = { write = ignore } + + (** A writer that just emits the bytes from the given string. *) + let of_string (str : string) : t = + let write oc = Out_channel.output_string oc str in + { write } end (** A TCP server abstraction *) diff --git a/src/Tiny_httpd_server.ml b/src/Tiny_httpd_server.ml index 3edc1068..0d0685f3 100644 --- a/src/Tiny_httpd_server.ml +++ b/src/Tiny_httpd_server.ml @@ -366,7 +366,12 @@ end *) module Response = struct - type body = [ `String of string | `Stream of byte_stream | `Void ] + type body = + [ `String of string + | `Stream of byte_stream + | `Writer of IO.Writer.t + | `Void ] + type t = { code: Response_code.t; headers: Headers.t; body: body } let set_body body self = { self with body } @@ -383,10 +388,13 @@ module Response = struct { code; headers; body = `String body } let make_raw_stream ?(headers = []) ~code body : t = - (* add content length to response *) let headers = Headers.set "Transfer-Encoding" "chunked" headers in { code; headers; body = `Stream body } + let make_raw_writer ?(headers = []) ~code body : t = + let headers = Headers.set "Transfer-Encoding" "chunked" headers in + { code; headers; body = `Writer body } + let make_void_force_ ?(headers = []) ~code () : t = { code; headers; body = `Void } @@ -407,11 +415,17 @@ module Response = struct | Ok body -> make_raw_stream ?headers ~code:200 body | Error (code, msg) -> make_raw ?headers ~code msg + let make_writer ?headers r : t = + match r with + | Ok body -> make_raw_writer ?headers ~code:200 body + | Error (code, msg) -> make_raw ?headers ~code msg + let make ?headers r : t = match r with | Ok (`String body) -> make_raw ?headers ~code:200 body | Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body | Ok `Void -> make_void ?headers ~code:200 () + | Ok (`Writer f) -> make_raw_writer ?headers ~code:200 f | Error (code, msg) -> make_raw ?headers ~code msg let fail ?headers ~code fmt = @@ -424,6 +438,7 @@ module Response = struct let pp_body out = function | `String s -> Format.fprintf out "%S" s | `Stream _ -> Format.pp_print_string out "" + | `Writer _ -> Format.pp_print_string out "" | `Void -> () in Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code @@ -446,9 +461,10 @@ module Response = struct match self.body with | `String s when String.length s > 1024 * 500 -> (* chunk-encode large bodies *) - `Stream (Byte_stream.of_string s), true + `Writer (IO.Writer.of_string s), true | `String _ as b -> b, false | `Stream _ as b -> b, true + | `Writer _ as b -> b, true | `Void as b -> b, false in let headers = @@ -478,6 +494,14 @@ module Response = struct (match body with | `String "" | `Void -> () | `String s -> IO.Out_channel.output_string oc s + | `Writer w -> + let oc' = IO.Out_channel.chunk_encoding ~close_rec:false oc in + (try + IO.Writer.write oc' w; + IO.Out_channel.close oc' + with e -> + IO.Out_channel.close oc'; + raise e) | `Stream str -> (try Byte_stream.output_chunked' oc str; diff --git a/src/Tiny_httpd_server.mli b/src/Tiny_httpd_server.mli index 1903ba18..ea391cef 100644 --- a/src/Tiny_httpd_server.mli +++ b/src/Tiny_httpd_server.mli @@ -194,9 +194,21 @@ end the client to answer a {!Request.t}*) module Response : sig - type body = [ `String of string | `Stream of byte_stream | `Void ] + type body = + [ `String of string + | `Stream of byte_stream + | `Writer of Tiny_httpd_io.Writer.t + | `Void ] (** Body of a response, either as a simple string, - or a stream of bytes, or nothing (for server-sent events notably). *) + or a stream of bytes, or nothing (for server-sent events notably). + + - [`String str] replies with a body set to this string, and a known content-length. + - [`Stream str] replies with a body made from this string, using chunked encoding. + - [`Void] replies with no body. + - [`Writer w] replies with a body created by the writer [w], using + a chunked encoding. + It is available since NEXT_RELEASE. + *) type t = private { code: Response_code.t; (** HTTP response code. See {!Response_code}. *) @@ -251,6 +263,12 @@ module Response : sig ?headers:Headers.t -> (string, Response_code.t * string) result -> t (** Same as {!make} but with a string body. *) + val make_writer : + ?headers:Headers.t -> + (Tiny_httpd_io.Writer.t, Response_code.t * string) result -> + t + (** Same as {!make} but with a writer body. *) + val make_stream : ?headers:Headers.t -> (byte_stream, Response_code.t * string) result -> t (** Same as {!make} but with a stream body. *) diff --git a/src/Tiny_httpd_stream.ml b/src/Tiny_httpd_stream.ml index 3a100ebf..d9a367a3 100644 --- a/src/Tiny_httpd_stream.ml +++ b/src/Tiny_httpd_stream.ml @@ -89,6 +89,9 @@ let to_chan (oc : out_channel) (self : t) = let to_chan' (oc : IO.Out_channel.t) (self : t) = iter (fun s i len -> IO.Out_channel.output oc s i len) self +let to_writer (self : t) : Tiny_httpd_io.Writer.t = + { write = (fun oc -> to_chan' oc self) } + let of_bytes ?(i = 0) ?len (bs : bytes) : t = (* invariant: !i+!len is constant *) let len = @@ -298,20 +301,9 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t = () let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit = - let continue = ref true in - while !continue do - (* next chunk *) - self.fill_buf (); - let n = self.len in - IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n); - IO.Out_channel.output oc self.bs self.off n; - self.consume n; - if n = 0 then continue := false; - IO.Out_channel.output_string oc "\r\n" - done; - (* write another crlf after the stream (see #56) *) - IO.Out_channel.output_string oc "\r\n"; - () + let oc' = IO.Out_channel.chunk_encoding oc ~close_rec:false in + to_chan' oc' self; + IO.Out_channel.close oc' (* print a stream as a series of chunks *) let output_chunked (oc : out_channel) (self : t) : unit = diff --git a/src/Tiny_httpd_stream.mli b/src/Tiny_httpd_stream.mli index baa7dfb5..519128fa 100644 --- a/src/Tiny_httpd_stream.mli +++ b/src/Tiny_httpd_stream.mli @@ -98,6 +98,10 @@ val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit (** Write to the IO channel. @since NEXT_RELEASE *) +val to_writer : t -> Tiny_httpd_io.Writer.t +(** Turn this stream into a writer. + @since NEXT_RELEASE *) + val make : ?bs:bytes -> ?close:(t -> unit) -> diff --git a/src/camlzip/Tiny_httpd_camlzip.ml b/src/camlzip/Tiny_httpd_camlzip.ml index 6fdb501a..77993147 100644 --- a/src/camlzip/Tiny_httpd_camlzip.ml +++ b/src/camlzip/Tiny_httpd_camlzip.ml @@ -1,5 +1,7 @@ module S = Tiny_httpd_server module BS = Tiny_httpd_stream +module W = Tiny_httpd_io.Writer +module Out = Tiny_httpd_io.Out_channel let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = S._debug (fun k -> k "wrap stream with deflate.decode"); @@ -40,67 +42,70 @@ let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = )) () -let encode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = - S._debug (fun k -> k "wrap stream with deflate.encode"); - let refill = ref true in +let encode_deflate_writer_ ~buf_size (w : W.t) : W.t = + S._debug (fun k -> k "wrap writer with deflate.encode"); let zlib_str = Zlib.deflate_init 4 false in - BS.make ~bs:(Bytes.create buf_size) - ~close:(fun _self -> - S._debug (fun k -> k "deflate: close"); - Zlib.deflate_end zlib_str; - BS.close is) - ~consume:(fun self n -> - self.off <- self.off + n; - self.len <- self.len - n) - ~fill:(fun self -> - let rec loop () = - S._debug (fun k -> - k "deflate.fill.iter out_off=%d out_len=%d" self.off self.len); - if self.len > 0 then - () - (* still the same slice, not consumed entirely by output *) - else if not !refill then - () - (* empty slice, no refill *) - else ( - (* the output was entirely consumed, we need to do more work *) - is.BS.fill_buf (); - if is.len > 0 then ( - (* try to decompress from input buffer *) - let _finished, used_in, used_out = - Zlib.deflate zlib_str is.bs is.off is.len self.bs 0 - (Bytes.length self.bs) Zlib.Z_NO_FLUSH - in - self.off <- 0; - self.len <- used_out; - is.consume used_in; - S._debug (fun k -> - k "encode %d bytes as %d bytes using deflate (finished: %b)" - used_in used_out _finished); - if _finished then ( - S._debug (fun k -> k "deflate: finished"); - refill := false - ); - loop () - ) else ( - (* [is] is done, finish sending the data in current buffer *) - let _finished, used_in, used_out = - Zlib.deflate zlib_str is.bs is.off is.len self.bs 0 - (Bytes.length self.bs) Zlib.Z_FULL_FLUSH - in - assert (used_in = 0); - self.off <- 0; - self.len <- used_out; - if used_out = 0 then refill := false; - loop () - ) - ) + + let o_buf = Bytes.create buf_size in + let o_off = ref 0 in + let o_len = ref 0 in + + (* write output buffer to out *) + let write_out (oc : Out.t) = + if !o_len > 0 then ( + Out.output oc o_buf !o_off !o_len; + o_off := 0; + o_len := 0 + ) + in + + (* Zlib.Z_NO_FLUSH *) + let flush_zlib ~flush (oc : Out.t) = + let continue = ref true in + while !continue do + let finished, used_in, used_out = + Zlib.deflate zlib_str Bytes.empty 0 0 o_buf 0 (Bytes.length o_buf) flush in - try loop () - with Zlib.Error (e1, e2) -> - S.Response.fail_raise ~code:400 - "deflate: error during compression:\n%s %s" e1 e2) - () + assert (used_in = 0); + o_len := !o_len + used_out; + if finished then continue := false; + write_out oc + done + in + + (* compress and consume input buffer *) + let write_zlib ~flush (oc : Out.t) buf i len = + let i = ref i in + let len = ref len in + while !len > 0 do + let _finished, used_in, used_out = + Zlib.deflate zlib_str buf !i !len o_buf 0 (Bytes.length o_buf) flush + in + i := !i + used_in; + len := !len - used_in; + o_len := !o_len + used_out; + write_out oc + done + in + + let write (oc : Out.t) : unit = + let output buf i len = write_zlib ~flush:Zlib.Z_NO_FLUSH oc buf i len in + let flush () = + flush_zlib oc ~flush:Zlib.Z_SYNC_FLUSH; + oc.flush () + in + let close () = + flush_zlib oc ~flush:Zlib.Z_FULL_FLUSH; + assert (!o_len = 0); + Zlib.deflate_end zlib_str; + oc.close () + in + (* new output channel that compresses on the fly *) + let oc' = { Out.flush; close; output } in + w.write oc' + in + + W.make ~write () let split_on_char ?(f = fun x -> x) c s : string list = let rec loop acc i = @@ -161,15 +166,21 @@ let compress_resp_stream_ ~compress_above ~buf_size (req : _ S.Request.t) S._debug (fun k -> k "encode str response with deflate (size %d, threshold %d)" (String.length s) compress_above); - let body = encode_deflate_stream_ ~buf_size @@ BS.of_string s in + let body = encode_deflate_writer_ ~buf_size @@ W.of_string s in resp |> S.Response.update_headers update_headers - |> S.Response.set_body (`Stream body) + |> S.Response.set_body (`Writer body) | `Stream str -> S._debug (fun k -> k "encode stream response with deflate"); + let w = BS.to_writer str in resp |> S.Response.update_headers update_headers - |> S.Response.set_body (`Stream (encode_deflate_stream_ ~buf_size str)) + |> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w)) + | `Writer w -> + S._debug (fun k -> k "encode writer response with deflate"); + resp + |> S.Response.update_headers update_headers + |> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w)) | `String _ | `Void -> resp ) else resp