diff --git a/src/Tiny_httpd.ml b/src/Tiny_httpd.ml index 87e7890b..5213ef34 100644 --- a/src/Tiny_httpd.ml +++ b/src/Tiny_httpd.ml @@ -4,3 +4,4 @@ include Tiny_httpd_server module Util = Tiny_httpd_util module Dir = Tiny_httpd_dir module Html = Tiny_httpd_html +module IO = Tiny_httpd_io diff --git a/src/Tiny_httpd.mli b/src/Tiny_httpd.mli index e7385567..607d7461 100644 --- a/src/Tiny_httpd.mli +++ b/src/Tiny_httpd.mli @@ -85,6 +85,10 @@ module Buf = Tiny_httpd_buf module Byte_stream = Tiny_httpd_stream +(** {2 IO Abstraction} *) + +module IO = Tiny_httpd_io + (** {2 Main Server Type} *) (** @inline *) diff --git a/src/Tiny_httpd_io.ml b/src/Tiny_httpd_io.ml new file mode 100644 index 00000000..8d36a2a4 --- /dev/null +++ b/src/Tiny_httpd_io.ml @@ -0,0 +1,85 @@ +(** IO abstraction. + + We abstract IO so we can support classic unix blocking IOs + with threads, and modern async IO with Eio. + + {b NOTE}: experimental. + + @since NEXT_RELEASE +*) + +module In_channel = struct + type t = { + input: bytes -> int -> int -> int; + (** Read into the slice. Returns [0] only if the + channel is closed. *) + close: unit -> unit; + } + + let of_in_channel ?(close_noerr = false) (ic : in_channel) : t = + { + input = (fun buf i len -> input ic buf i len); + close = + (fun () -> + if close_noerr then + close_in_noerr ic + else + close_in ic); + } + + let of_unix_fd ?(close_noerr = false) (fd : Unix.file_descr) : t = + { + input = (fun buf i len -> Unix.read fd buf i len); + close = + (fun () -> + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd); + } + + let[@inline] input (self : t) buf i len = self.input buf i len + let[@inline] close self : unit = self.close () +end + +module Out_channel = struct + type t = { + output: bytes -> int -> int -> unit; (** Output slice *) + flush: unit -> unit; (** Flush underlying buffer *) + close: unit -> unit; + } + + let of_out_channel ?(close_noerr = false) (oc : out_channel) : t = + { + output = (fun buf i len -> output oc buf i len); + flush = (fun () -> flush oc); + close = + (fun () -> + if close_noerr then + close_out_noerr oc + else + close_out oc); + } + + let[@inline] output (self : t) buf i len : unit = self.output buf i len + + let[@inline] output_string (self : t) (str : string) : unit = + self.output (Bytes.unsafe_of_string str) 0 (String.length str) + + let[@inline] close self : unit = self.close () +end + +(** A TCP server abstraction *) +module TCP_server = struct + type conn_handler = { + handle: In_channel.t -> Out_channel.t -> unit; + (** Handle client connection *) + } + + type t = { + listen: handle:conn_handler -> unit -> unit; + (** Blocking call to start listening for incoming connections. + Uses the connection handler to handle individual client connections. *) + endpoint: unit -> Unix.inet_addr * int; (** Endpoint we listen on *) + } +end diff --git a/src/Tiny_httpd_stream.ml b/src/Tiny_httpd_stream.ml index 66ce7aa3..3a100ebf 100644 --- a/src/Tiny_httpd_stream.ml +++ b/src/Tiny_httpd_stream.ml @@ -1,4 +1,5 @@ module Buf = Tiny_httpd_buf +module IO = Tiny_httpd_io let spf = Printf.sprintf @@ -45,37 +46,32 @@ let make ?(bs = Bytes.create @@ (16 * 1024)) ?(close = ignore) ~consume ~fill () in self -let of_chan_ ?(buf_size = 16 * 1024) ~close ic : t = +let of_input ?(buf_size = 16 * 1024) (ic : IO.In_channel.t) : t = make ~bs:(Bytes.create buf_size) - ~close:(fun _ -> close ic) + ~close:(fun _ -> IO.In_channel.close ic) ~consume:(fun self n -> self.off <- self.off + n; self.len <- self.len - n) ~fill:(fun self -> if self.off >= self.len then ( self.off <- 0; - self.len <- input ic self.bs 0 (Bytes.length self.bs) + self.len <- IO.In_channel.input ic self.bs 0 (Bytes.length self.bs) )) () -let of_chan = of_chan_ ~close:close_in -let of_chan_close_noerr = of_chan_ ~close:close_in_noerr +let of_chan_ ?buf_size ic ~close_noerr : t = + let inc = IO.In_channel.of_in_channel ~close_noerr ic in + of_input ?buf_size inc -let of_fd_ ?(buf_size = 16 * 1024) ~close ic : t = - make ~bs:(Bytes.create buf_size) - ~close:(fun _ -> close ic) - ~consume:(fun self n -> - self.off <- self.off + n; - self.len <- self.len - n) - ~fill:(fun self -> - if self.off >= self.len then ( - self.off <- 0; - self.len <- Unix.read ic self.bs 0 (Bytes.length self.bs) - )) - () +let of_chan ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:false +let of_chan_close_noerr ?buf_size ic = of_chan_ ?buf_size ic ~close_noerr:true -let of_fd = of_fd_ ~close:Unix.close -let of_fd_close_noerr = of_fd_ ~close:(fun f -> try Unix.close f with _ -> ()) +let of_fd_ ?buf_size ~close_noerr ic : t = + let inc = IO.In_channel.of_unix_fd ~close_noerr ic in + of_input ?buf_size inc + +let of_fd ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:false fd +let of_fd_close_noerr ?buf_size fd : t = of_fd_ ?buf_size ~close_noerr:true fd let rec iter f (self : t) : unit = self.fill_buf (); @@ -90,6 +86,9 @@ let rec iter f (self : t) : unit = let to_chan (oc : out_channel) (self : t) = iter (fun s i len -> output oc s i len) self +let to_chan' (oc : IO.Out_channel.t) (self : t) = + iter (fun s i len -> IO.Out_channel.output oc s i len) self + let of_bytes ?(i = 0) ?len (bs : bytes) : t = (* invariant: !i+!len is constant *) let len = @@ -298,19 +297,22 @@ let read_chunked ?(buf = Buf.create ()) ~fail (bs : t) : t = refill := false) () -(* print a stream as a series of chunks *) -let output_chunked (oc : out_channel) (self : t) : unit = +let output_chunked' (oc : IO.Out_channel.t) (self : t) : unit = let continue = ref true in while !continue do (* next chunk *) self.fill_buf (); let n = self.len in - Printf.fprintf oc "%x\r\n" n; - output oc self.bs self.off n; + IO.Out_channel.output_string oc (Printf.sprintf "%x\r\n" n); + IO.Out_channel.output oc self.bs self.off n; self.consume n; if n = 0 then continue := false; - output_string oc "\r\n" + IO.Out_channel.output_string oc "\r\n" done; (* write another crlf after the stream (see #56) *) - output_string oc "\r\n"; + IO.Out_channel.output_string oc "\r\n"; () + +(* print a stream as a series of chunks *) +let output_chunked (oc : out_channel) (self : t) : unit = + output_chunked' (IO.Out_channel.of_out_channel oc) self diff --git a/src/Tiny_httpd_stream.mli b/src/Tiny_httpd_stream.mli index b2966662..4fc7dbcd 100644 --- a/src/Tiny_httpd_stream.mli +++ b/src/Tiny_httpd_stream.mli @@ -36,6 +36,10 @@ val close : t -> unit val empty : t (** Stream with 0 bytes inside *) +val of_input : ?buf_size:int -> Tiny_httpd_io.In_channel.t -> t +(** Make a buffered stream from the given channel. + @since NEXT_RELEASE *) + val of_chan : ?buf_size:int -> in_channel -> t (** Make a buffered stream from the given channel. *) @@ -62,6 +66,10 @@ val to_chan : out_channel -> t -> unit (** Write the stream to the channel. @since 0.3 *) +val to_chan' : Tiny_httpd_io.Out_channel.t -> t -> unit +(** Write to the IO channel. + @since NEXT_RELEASE *) + val make : ?bs:bytes -> ?close:(t -> unit) -> @@ -111,3 +119,7 @@ val read_exactly : val output_chunked : out_channel -> t -> unit (** Write the stream into the channel, using the chunked encoding. *) + +val output_chunked' : Tiny_httpd_io.Out_channel.t -> t -> unit +(** Write the stream into the channel, using the chunked encoding. + @since NEXT_RELEASE *)