wip: add IO.Writer.t, a push based stream

This is more convenient than our existing streams when it comes to
writing a body. The user can just get an output channel and write to it,
flush it, etc. as they please. This should also simplify compression…
once it works.
This commit is contained in:
Simon Cruanes 2023-07-11 10:57:08 -04:00
parent 832c4bd4df
commit de23d9b2a3
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 173 additions and 81 deletions

View file

@ -17,6 +17,7 @@ module In_channel = struct
channel is closed. *) channel is closed. *)
close: unit -> unit; close: unit -> unit;
} }
(** An input channel, i.e an incoming stream of bytes. *)
let of_in_channel ?(close_noerr = false) (ic : in_channel) : t = let of_in_channel ?(close_noerr = false) (ic : in_channel) : t =
{ {
@ -50,6 +51,7 @@ module Out_channel = struct
flush: unit -> unit; (** Flush underlying buffer *) flush: unit -> unit; (** Flush underlying buffer *)
close: unit -> unit; close: unit -> unit;
} }
(** An output channel, ie. a place into which we can write bytes. *)
let of_out_channel ?(close_noerr = false) (oc : out_channel) : t = let of_out_channel ?(close_noerr = false) (oc : out_channel) : t =
{ {
@ -74,6 +76,47 @@ module Out_channel = struct
let output_buf (self : t) (buf : Buf.t) : unit = let output_buf (self : t) (buf : Buf.t) : unit =
let b = Buf.bytes_slice buf in let b = Buf.bytes_slice buf in
output self b 0 (Buf.size buf) output self b 0 (Buf.size buf)
(** [chunk_encoding oc] makes a new channel that outputs its content into [oc]
in chunk encoding form.
@param close_rec if true, closing the result will also close [oc]
*)
let chunk_encoding ~close_rec (self : t) : t =
let flush = self.flush in
let close () =
(* write another crlf after the stream (see #56) *)
output_string self "\r\n";
self.flush ();
if close_rec then self.close ()
in
let output buf i n =
if n > 0 then (
output_string self (Printf.sprintf "%x\r\n" n);
self.output buf i n
)
in
{ flush; close; output }
end
(** A writer abstraction.
A writer is a push-based stream of bytes. Give it an output channel and it will write
the bytes in it. *)
module Writer = struct
type t = { write: Out_channel.t -> unit } [@@unboxed]
(** Writer. *)
let[@inline] make ~write () : t = { write }
(** Write into the channel. *)
let[@inline] write (oc : Out_channel.t) (self : t) : unit = self.write oc
let empty : t = { write = ignore }
(** A writer that just emits the bytes from the given string. *)
let of_string (str : string) : t =
let write oc = Out_channel.output_string oc str in
{ write }
end end
(** A TCP server abstraction *) (** A TCP server abstraction *)

View file

@ -366,7 +366,12 @@ end
*) *)
module Response = struct module Response = struct
type body = [ `String of string | `Stream of byte_stream | `Void ] type body =
[ `String of string
| `Stream of byte_stream
| `Writer of IO.Writer.t
| `Void ]
type t = { code: Response_code.t; headers: Headers.t; body: body } type t = { code: Response_code.t; headers: Headers.t; body: body }
let set_body body self = { self with body } let set_body body self = { self with body }
@ -383,10 +388,13 @@ module Response = struct
{ code; headers; body = `String body } { code; headers; body = `String body }
let make_raw_stream ?(headers = []) ~code body : t = let make_raw_stream ?(headers = []) ~code body : t =
(* add content length to response *)
let headers = Headers.set "Transfer-Encoding" "chunked" headers in let headers = Headers.set "Transfer-Encoding" "chunked" headers in
{ code; headers; body = `Stream body } { code; headers; body = `Stream body }
let make_raw_writer ?(headers = []) ~code body : t =
let headers = Headers.set "Transfer-Encoding" "chunked" headers in
{ code; headers; body = `Writer body }
let make_void_force_ ?(headers = []) ~code () : t = let make_void_force_ ?(headers = []) ~code () : t =
{ code; headers; body = `Void } { code; headers; body = `Void }
@ -407,11 +415,17 @@ module Response = struct
| Ok body -> make_raw_stream ?headers ~code:200 body | Ok body -> make_raw_stream ?headers ~code:200 body
| Error (code, msg) -> make_raw ?headers ~code msg | Error (code, msg) -> make_raw ?headers ~code msg
let make_writer ?headers r : t =
match r with
| Ok body -> make_raw_writer ?headers ~code:200 body
| Error (code, msg) -> make_raw ?headers ~code msg
let make ?headers r : t = let make ?headers r : t =
match r with match r with
| Ok (`String body) -> make_raw ?headers ~code:200 body | Ok (`String body) -> make_raw ?headers ~code:200 body
| Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body | Ok (`Stream body) -> make_raw_stream ?headers ~code:200 body
| Ok `Void -> make_void ?headers ~code:200 () | Ok `Void -> make_void ?headers ~code:200 ()
| Ok (`Writer f) -> make_raw_writer ?headers ~code:200 f
| Error (code, msg) -> make_raw ?headers ~code msg | Error (code, msg) -> make_raw ?headers ~code msg
let fail ?headers ~code fmt = let fail ?headers ~code fmt =
@ -424,6 +438,7 @@ module Response = struct
let pp_body out = function let pp_body out = function
| `String s -> Format.fprintf out "%S" s | `String s -> Format.fprintf out "%S" s
| `Stream _ -> Format.pp_print_string out "<stream>" | `Stream _ -> Format.pp_print_string out "<stream>"
| `Writer _ -> Format.pp_print_string out "<writer>"
| `Void -> () | `Void -> ()
in in
Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code Format.fprintf out "{@[code=%d;@ headers=[@[%a@]];@ body=%a@]}" self.code
@ -446,9 +461,10 @@ module Response = struct
match self.body with match self.body with
| `String s when String.length s > 1024 * 500 -> | `String s when String.length s > 1024 * 500 ->
(* chunk-encode large bodies *) (* chunk-encode large bodies *)
`Stream (Byte_stream.of_string s), true `Writer (IO.Writer.of_string s), true
| `String _ as b -> b, false | `String _ as b -> b, false
| `Stream _ as b -> b, true | `Stream _ as b -> b, true
| `Writer _ as b -> b, true
| `Void as b -> b, false | `Void as b -> b, false
in in
let headers = let headers =
@ -478,6 +494,14 @@ module Response = struct
(match body with (match body with
| `String "" | `Void -> () | `String "" | `Void -> ()
| `String s -> IO.Out_channel.output_string oc s | `String s -> IO.Out_channel.output_string oc s
| `Writer w ->
let oc' = IO.Out_channel.chunk_encoding ~close_rec:false oc in
(try
IO.Writer.write oc' w;
IO.Out_channel.close oc'
with e ->
IO.Out_channel.close oc';
raise e)
| `Stream str -> | `Stream str ->
(try (try
Byte_stream.output_chunked' oc str; Byte_stream.output_chunked' oc str;

View file

@ -194,9 +194,21 @@ end
the client to answer a {!Request.t}*) the client to answer a {!Request.t}*)
module Response : sig module Response : sig
type body = [ `String of string | `Stream of byte_stream | `Void ] type body =
[ `String of string
| `Stream of byte_stream
| `Writer of Tiny_httpd_io.Writer.t
| `Void ]
(** Body of a response, either as a simple string, (** Body of a response, either as a simple string,
or a stream of bytes, or nothing (for server-sent events notably). *) or a stream of bytes, or nothing (for server-sent events notably).
- [`String str] replies with a body set to this string, and a known content-length.
- [`Stream str] replies with a body made from this string, using chunked encoding.
- [`Void] replies with no body.
- [`Writer w] replies with a body created by the writer [w], using
a chunked encoding.
It is available since NEXT_RELEASE.
*)
type t = private { type t = private {
code: Response_code.t; (** HTTP response code. See {!Response_code}. *) code: Response_code.t; (** HTTP response code. See {!Response_code}. *)
@ -251,6 +263,12 @@ module Response : sig
?headers:Headers.t -> (string, Response_code.t * string) result -> t ?headers:Headers.t -> (string, Response_code.t * string) result -> t
(** Same as {!make} but with a string body. *) (** Same as {!make} but with a string body. *)
val make_writer :
?headers:Headers.t ->
(Tiny_httpd_io.Writer.t, Response_code.t * string) result ->
t
(** Same as {!make} but with a writer body. *)
val make_stream : val make_stream :
?headers:Headers.t -> (byte_stream, Response_code.t * string) result -> t ?headers:Headers.t -> (byte_stream, Response_code.t * string) result -> t
(** Same as {!make} but with a stream body. *) (** Same as {!make} but with a stream body. *)

View file

@ -89,6 +89,9 @@ let to_chan (oc : out_channel) (self : t) =
let to_chan' (oc : IO.Out_channel.t) (self : t) = let to_chan' (oc : IO.Out_channel.t) (self : t) =
iter (fun s i len -> IO.Out_channel.output oc s i len) self iter (fun s i len -> IO.Out_channel.output oc s i len) self
let to_writer (self : t) : Tiny_httpd_io.Writer.t =
{ write = (fun oc -> to_chan' oc self) }
let of_bytes ?(i = 0) ?len (bs : bytes) : t = let of_bytes ?(i = 0) ?len (bs : bytes) : t =
(* invariant: !i+!len is constant *) (* invariant: !i+!len is constant *)
let len = let len =
@ -298,20 +301,9 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t =
() ()
let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit = let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit =
let continue = ref true in let oc' = IO.Out_channel.chunk_encoding oc ~close_rec:false in
while !continue do to_chan' oc' self;
(* next chunk *) IO.Out_channel.close oc'
self.fill_buf ();
let n = self.len in
IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n);
IO.Out_channel.output oc self.bs self.off n;
self.consume n;
if n = 0 then continue := false;
IO.Out_channel.output_string oc "\r\n"
done;
(* write another crlf after the stream (see #56) *)
IO.Out_channel.output_string oc "\r\n";
()
(* print a stream as a series of chunks *) (* print a stream as a series of chunks *)
let output_chunked (oc : out_channel) (self : t) : unit = let output_chunked (oc : out_channel) (self : t) : unit =

View file

@ -98,6 +98,10 @@ val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write to the IO channel. (** Write to the IO channel.
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
val to_writer : t -> Tiny_httpd_io.Writer.t
(** Turn this stream into a writer.
@since NEXT_RELEASE *)
val make : val make :
?bs:bytes -> ?bs:bytes ->
?close:(t -> unit) -> ?close:(t -> unit) ->

View file

@ -1,5 +1,7 @@
module S = Tiny_httpd_server module S = Tiny_httpd_server
module BS = Tiny_httpd_stream module BS = Tiny_httpd_stream
module W = Tiny_httpd_io.Writer
module Out = Tiny_httpd_io.Out_channel
let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream =
S._debug (fun k -> k "wrap stream with deflate.decode"); S._debug (fun k -> k "wrap stream with deflate.decode");
@ -40,67 +42,70 @@ let decode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream =
)) ))
() ()
let encode_deflate_stream_ ~buf_size (is : S.byte_stream) : S.byte_stream = let encode_deflate_writer_ ~buf_size (w : W.t) : W.t =
S._debug (fun k -> k "wrap stream with deflate.encode"); S._debug (fun k -> k "wrap writer with deflate.encode");
let refill = ref true in
let zlib_str = Zlib.deflate_init 4 false in let zlib_str = Zlib.deflate_init 4 false in
BS.make ~bs:(Bytes.create buf_size)
~close:(fun _self -> let o_buf = Bytes.create buf_size in
S._debug (fun k -> k "deflate: close"); let o_off = ref 0 in
Zlib.deflate_end zlib_str; let o_len = ref 0 in
BS.close is)
~consume:(fun self n -> (* write output buffer to out *)
self.off <- self.off + n; let write_out (oc : Out.t) =
self.len <- self.len - n) if !o_len > 0 then (
~fill:(fun self -> Out.output oc o_buf !o_off !o_len;
let rec loop () = o_off := 0;
S._debug (fun k -> o_len := 0
k "deflate.fill.iter out_off=%d out_len=%d" self.off self.len); )
if self.len > 0 then in
()
(* still the same slice, not consumed entirely by output *) (* Zlib.Z_NO_FLUSH *)
else if not !refill then let flush_zlib ~flush (oc : Out.t) =
() let continue = ref true in
(* empty slice, no refill *) while !continue do
else ( let finished, used_in, used_out =
(* the output was entirely consumed, we need to do more work *) Zlib.deflate zlib_str Bytes.empty 0 0 o_buf 0 (Bytes.length o_buf) flush
is.BS.fill_buf ();
if is.len > 0 then (
(* try to decompress from input buffer *)
let _finished, used_in, used_out =
Zlib.deflate zlib_str is.bs is.off is.len self.bs 0
(Bytes.length self.bs) Zlib.Z_NO_FLUSH
in
self.off <- 0;
self.len <- used_out;
is.consume used_in;
S._debug (fun k ->
k "encode %d bytes as %d bytes using deflate (finished: %b)"
used_in used_out _finished);
if _finished then (
S._debug (fun k -> k "deflate: finished");
refill := false
);
loop ()
) else (
(* [is] is done, finish sending the data in current buffer *)
let _finished, used_in, used_out =
Zlib.deflate zlib_str is.bs is.off is.len self.bs 0
(Bytes.length self.bs) Zlib.Z_FULL_FLUSH
in
assert (used_in = 0);
self.off <- 0;
self.len <- used_out;
if used_out = 0 then refill := false;
loop ()
)
)
in in
try loop () assert (used_in = 0);
with Zlib.Error (e1, e2) -> o_len := !o_len + used_out;
S.Response.fail_raise ~code:400 if finished then continue := false;
"deflate: error during compression:\n%s %s" e1 e2) write_out oc
() done
in
(* compress and consume input buffer *)
let write_zlib ~flush (oc : Out.t) buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let _finished, used_in, used_out =
Zlib.deflate zlib_str buf !i !len o_buf 0 (Bytes.length o_buf) flush
in
i := !i + used_in;
len := !len - used_in;
o_len := !o_len + used_out;
write_out oc
done
in
let write (oc : Out.t) : unit =
let output buf i len = write_zlib ~flush:Zlib.Z_NO_FLUSH oc buf i len in
let flush () =
flush_zlib oc ~flush:Zlib.Z_SYNC_FLUSH;
oc.flush ()
in
let close () =
flush_zlib oc ~flush:Zlib.Z_FULL_FLUSH;
assert (!o_len = 0);
Zlib.deflate_end zlib_str;
oc.close ()
in
(* new output channel that compresses on the fly *)
let oc' = { Out.flush; close; output } in
w.write oc'
in
W.make ~write ()
let split_on_char ?(f = fun x -> x) c s : string list = let split_on_char ?(f = fun x -> x) c s : string list =
let rec loop acc i = let rec loop acc i =
@ -161,15 +166,21 @@ let compress_resp_stream_ ~compress_above ~buf_size (req : _ S.Request.t)
S._debug (fun k -> S._debug (fun k ->
k "encode str response with deflate (size %d, threshold %d)" k "encode str response with deflate (size %d, threshold %d)"
(String.length s) compress_above); (String.length s) compress_above);
let body = encode_deflate_stream_ ~buf_size @@ BS.of_string s in let body = encode_deflate_writer_ ~buf_size @@ W.of_string s in
resp resp
|> S.Response.update_headers update_headers |> S.Response.update_headers update_headers
|> S.Response.set_body (`Stream body) |> S.Response.set_body (`Writer body)
| `Stream str -> | `Stream str ->
S._debug (fun k -> k "encode stream response with deflate"); S._debug (fun k -> k "encode stream response with deflate");
let w = BS.to_writer str in
resp resp
|> S.Response.update_headers update_headers |> S.Response.update_headers update_headers
|> S.Response.set_body (`Stream (encode_deflate_stream_ ~buf_size str)) |> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w))
| `Writer w ->
S._debug (fun k -> k "encode writer response with deflate");
resp
|> S.Response.update_headers update_headers
|> S.Response.set_body (`Writer (encode_deflate_writer_ ~buf_size w))
| `String _ | `Void -> resp | `String _ | `Void -> resp
) else ) else
resp resp