add Tiny_httpd_io module, abstraction over IOs

abstract channels, and abstract TCP server.
This commit is contained in:
Simon Cruanes 2023-06-02 20:09:28 -04:00
parent ee310b5262
commit a32297ac6c
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 129 additions and 25 deletions

View file

@ -4,3 +4,4 @@ include Tiny_httpd_server
module Util = Tiny_httpd_util
module Dir = Tiny_httpd_dir
module Html = Tiny_httpd_html
module IO = Tiny_httpd_io

View file

@ -85,6 +85,10 @@ module Buf = Tiny_httpd_buf
module Byte_stream = Tiny_httpd_stream
(** {2 IO Abstraction} *)
module IO = Tiny_httpd_io
(** {2 Main Server Type} *)
(** @inline *)

85
src/Tiny_httpd_io.ml Normal file
View file

@ -0,0 +1,85 @@
(** IO abstraction.
We abstract IO so we can support classic unix blocking IOs
with threads, and modern async IO with Eio.
{b NOTE}: experimental.
@since NEXT_RELEASE
*)
module In_channel = struct
type t = {
input: bytes -> int -> int -> int;
(** Read into the slice. Returns [0] only if the
channel is closed. *)
close: unit -> unit;
}
let of_in_channel ?(close_noerr = false) (ic : in_channel) : t =
{
input = (fun buf i len -> input ic buf i len);
close =
(fun () ->
if close_noerr then
close_in_noerr ic
else
close_in ic);
}
let of_unix_fd ?(close_noerr = false) (fd : Unix.file_descr) : t =
{
input = (fun buf i len -> Unix.read fd buf i len);
close =
(fun () ->
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd);
}
let[@inline] input (self : t) buf i len = self.input buf i len
let[@inline] close self : unit = self.close ()
end
module Out_channel = struct
type t = {
output: bytes -> int -> int -> unit; (** Output slice *)
flush: unit -> unit; (** Flush underlying buffer *)
close: unit -> unit;
}
let of_out_channel ?(close_noerr = false) (oc : out_channel) : t =
{
output = (fun buf i len -> output oc buf i len);
flush = (fun () -> flush oc);
close =
(fun () ->
if close_noerr then
close_out_noerr oc
else
close_out oc);
}
let[@inline] output (self : t) buf i len : unit = self.output buf i len
let[@inline] output_string (self : t) (str : string) : unit =
self.output (Bytes.unsafe_of_string str) 0 (String.length str)
let[@inline] close self : unit = self.close ()
end
(** A TCP server abstraction *)
module TCP_server = struct
type conn_handler = {
handle: In_channel.t -> Out_channel.t -> unit;
(** Handle client connection *)
}
type t = {
listen: handle:conn_handler -> unit -> unit;
(** Blocking call to start listening for incoming connections.
Uses the connection handler to handle individual client connections. *)
endpoint: unit -> Unix.inet_addr * int; (** Endpoint we listen on *)
}
end

View file

@ -1,4 +1,5 @@
module Buf = Tiny_httpd_buf
module IO = Tiny_httpd_io
let spf = Printf.sprintf
@ -45,37 +46,32 @@ let make ?(bs = Bytes.create @@ (16 * 1024)) ?(close = ignore) ~consume ~fill ()
in
self
let of_chan_ ?(buf_size = 16 * 1024) ~close ic : t =
let of_input ?(buf_size = 16 * 1024) (ic : IO.In_channel.t) : t =
make ~bs:(Bytes.create buf_size)
~close:(fun _ -> close ic)
~close:(fun _ -> IO.In_channel.close ic)
~consume:(fun self n ->
self.off <- self.off + n;
self.len <- self.len - n)
~fill:(fun self ->
if self.off >= self.len then (
self.off <- 0;
self.len <- input ic self.bs 0 (Bytes.length self.bs)
self.len <- IO.In_channel.input ic self.bs 0 (Bytes.length self.bs)
))
()
let of_chan = of_chan_ ~close:close_in
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
let of_chan_ ?buf_size ic ~close_noerr : t =
let inc = IO.In_channel.of_in_channel ~close_noerr ic in
of_input ?buf_size inc
let of_fd_ ?(buf_size = 16 * 1024) ~close ic : t =
make ~bs:(Bytes.create buf_size)
~close:(fun _ -> close ic)
~consume:(fun self n ->
self.off <- self.off + n;
self.len <- self.len - n)
~fill:(fun self ->
if self.off >= self.len then (
self.off <- 0;
self.len <- Unix.read ic self.bs 0 (Bytes.length self.bs)
))
()
let of_chan ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:false
let of_chan_close_noerr ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:true
let of_fd = of_fd_ ~close:Unix.close
let of_fd_close_noerr = of_fd_ ~close:(fun f -> try Unix.close f with _ -> ())
let of_fd_ ?buf_size ~close_noerr ic : t =
let inc = IO.In_channel.of_unix_fd ~close_noerr ic in
of_input ?buf_size inc
let of_fd ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:false fd
let of_fd_close_noerr ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:true fd
let rec iter f (self : t) : unit =
self.fill_buf ();
@ -90,6 +86,9 @@ let rec iter f (self : t) : unit =
let to_chan (oc : out_channel) (self : t) =
iter (fun s i len -> output oc s i len) self
let to_chan' (oc : IO.Out_channel.t) (self : t) =
iter (fun s i len -> IO.Out_channel.output oc s i len) self
let of_bytes ?(i = 0) ?len (bs : bytes) : t =
(* invariant: !i+!len is constant *)
let len =
@ -298,19 +297,22 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t =
refill := false)
()
(* print a stream as a series of chunks *)
let output_chunked (oc : out_channel) (self : t) : unit =
let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit =
let continue = ref true in
while !continue do
(* next chunk *)
self.fill_buf ();
let n = self.len in
Printf.fprintf oc "%x\r\n" n;
output oc self.bs self.off n;
IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n);
IO.Out_channel.output oc self.bs self.off n;
self.consume n;
if n = 0 then continue := false;
output_string oc "\r\n"
IO.Out_channel.output_string oc "\r\n"
done;
(* write another crlf after the stream (see #56) *)
output_string oc "\r\n";
IO.Out_channel.output_string oc "\r\n";
()
(* print a stream as a series of chunks *)
let output_chunked (oc : out_channel) (self : t) : unit =
output_chunked' (IO.Out_channel.of_out_channel oc) self

View file

@ -36,6 +36,10 @@ val close : t -> unit
val empty : t
(** Stream with 0 bytes inside *)
val of_input : ?buf_size:int -> Tiny_httpd_io.In_channel.t -> t
(** Make a buffered stream from the given channel.
@since NEXT_RELEASE *)
val of_chan : ?buf_size:int -> in_channel -> t
(** Make a buffered stream from the given channel. *)
@ -62,6 +66,10 @@ val to_chan : out_channel -> t -> unit
(** Write the stream to the channel.
@since 0.3 *)
val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write to the IO channel.
@since NEXT_RELEASE *)
val make :
?bs:bytes ->
?close:(t -> unit) ->
@ -111,3 +119,7 @@ val read_exactly :
val output_chunked : out_channel -> t -> unit
(** Write the stream into the channel, using the chunked encoding. *)
val output_chunked' : Tiny_httpd_io.Out_channel.t -> t -> unit
(** Write the stream into the channel, using the chunked encoding.
@since NEXT_RELEASE *)