From d00e9accc72542973b5c47a94b53bf44882cdfce Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 8 Feb 2022 14:42:10 -0500 Subject: [PATCH] wip: add structural streams to IO --- src/core/CCIO.ml | 204 ++++++++++++++++++++++++++++++++++++++++++++++ src/core/CCIO.mli | 104 +++++++++++++++++++++++ 2 files changed, 308 insertions(+) diff --git a/src/core/CCIO.ml b/src/core/CCIO.ml index 36d03d7d..820bd131 100644 --- a/src/core/CCIO.ml +++ b/src/core/CCIO.ml @@ -433,3 +433,207 @@ module File = struct let name = Filename.temp_file ?temp_dir prefix suffix in finally_ f name ~h:remove_noerr end + +type istream = (bytes -> int -> int -> int) * (unit -> unit) +type ostream = (bytes -> int -> int -> unit) * (unit -> unit) + +let transfer ?(buf=Bytes.create (16 * 1024)) (is:istream) (os:ostream) : unit = + let continue = ref true in + while !continue do + let n = (fst is) buf 0 (Bytes.length buf) in + if n=0 then continue := false + else ( + (fst os) buf 0 n + ) + done + +module Ostream = struct + type t = ostream + + let close (_,cl) = cl + + let of_chan oc : t = + let write b i len = output oc b i len in + let close () = close_out oc in + write, close + + let of_buf buf : t = + let write b i len = Buf.add_bytes buf b i len in + let close() = () in + write, close +end + +module Istream = struct + type t = istream + + let close (_,cl) = cl() + + let empty : t = (fun _ _ _ -> 0), (fun()->()) + + let of_chan_ ~close ic : t = + let read b i len = input ic b i len in + let close() = close ic in + read, close + + let of_chan = of_chan_ ~close:close_in + let of_chan_close_noerr = of_chan_ ~close:close_in_noerr + + (* + let rec iter f (self:t) : unit = + let s, i, len = self.bs_fill_buf () in + if len=0 then ( + self.bs_close(); + ) else ( + f s i len; + self.bs_consume len; + (iter [@tailcall]) f self + ) + *) + + let to_chan ?buf (oc:out_channel) (self:t) = + let os = Ostream.of_chan oc in + copy_into ?buf self os + + let of_bytes ?(i=0) ?len b : t = + (* invariant: !i+!len is constant *) + let len = + ref ( + match len with + | Some n -> + if n > Bytes.length b - i then invalid_arg "Byte_stream.of_bytes"; + n + | None -> Bytes.length b - i + ) + in + let i = ref i in + let close()= () in + let read b2 i2 n2 = + let n = min n2 !len in + Bytes.blit b !i b2 i2 n; + i := !i + n; + len := !len - n; + n + in + read, close + + let of_string s : t = + of_bytes (Bytes.unsafe_of_string s) + + let with_file file f = + let ic = open_in file in + try + let x = f (of_chan ic) in + close_in ic; + x + with e -> + close_in_noerr ic; + raise e + + let read_all ?(buf=Buf.create()) (self:t) : string = + let continue = ref true in + while !continue do + if Buf.size buf = Buf.cap buf then Buf.grow_ buf; + let n = (fst self) buf.bytes buf.len (Buf.cap buf - buf.len) in + if n=0 then continue := false + else ( + buf.len <- buf.len + n; + ) + done; + let s = Buf.contents buf in + Buf.clear buf; + s + + (* + (* put [n] bytes from the input into bytes *) + let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit = + assert (Bytes.length bytes >= n); + let offset = ref 0 in + while !offset < n do + let s, i, len = self.bs_fill_buf () in + let n_read = min len (n- !offset) in + Bytes.blit s i bytes !offset n_read; + offset := !offset + n_read; + self.bs_consume n_read; + if n_read=0 then too_short(); + done + *) + + (* read a line into the buffer, after clearing it. *) + let read_line_into (self:t) ~buf : unit = + Buf.clear buf; + let continue = ref true in + while !continue do + let s, i, len = self.bs_fill_buf () in + if len=0 then ( + continue := false; + if Buf_.size buf = 0 then raise End_of_file; + ); + let j = ref i in + while !j < i+len && Bytes.get s !j <> '\n' do + incr j + done; + if !j-i < len then ( + assert (Bytes.get s !j = '\n'); + Buf_.add_bytes buf s i (!j-i); (* without \n *) + self.bs_consume (!j-i+1); (* remove \n *) + continue := false + ) else ( + Buf_.add_bytes buf s i len; + self.bs_consume len; + ) + done + + (* new stream with maximum size [max_size]. + @param close_rec if true, closing this will also close the input stream + @param too_big called with read size if the max size is reached *) + let limit_size_to ~close_rec ~max_size ~too_big (self:t) : t = + let size = ref 0 in + let continue = ref true in + { bs_fill_buf = + (fun () -> + if !continue then self.bs_fill_buf() else Bytes.empty, 0, 0); + bs_close=(fun () -> + if close_rec then self.bs_close ()); + bs_consume = (fun n -> + size := !size + n; + if !size > max_size then ( + continue := false; + too_big !size + ) else ( + self.bs_consume n + )); + } + + (* read exactly [size] bytes from the stream *) + let read_exactly ~close_rec ~size ~too_short (self:t) : t = + if size=0 then ( + empty + ) else ( + let size = ref size in + { bs_fill_buf = (fun () -> + (* must not block on [self] if we're done *) + if !size = 0 then Bytes.empty, 0, 0 + else ( + let buf, i, len = self.bs_fill_buf () in + let len = min len !size in + if len = 0 && !size > 0 then ( + too_short !size; + ); + buf, i, len + ) + ); + bs_close=(fun () -> + (* close underlying stream if [close_rec] *) + if close_rec then self.bs_close(); + size := 0); + bs_consume = (fun n -> + let n = min n !size in + size := !size - n; + self.bs_consume n); + } + ) + + let read_line ?(buf=Buf.create()) self : string = + read_line_into self ~buf; + Buf.contents buf +end diff --git a/src/core/CCIO.mli b/src/core/CCIO.mli index ff03520a..9509ba09 100644 --- a/src/core/CCIO.mli +++ b/src/core/CCIO.mli @@ -288,3 +288,107 @@ module File : sig See {!Filename.temp_file}. @since 0.17 *) end + +(** Generic input stream of data. + + Streams are used to represent a series of bytes that can arrive progressively. + For example, an uploaded file will be sent as a series of chunks. + + {b NOTE} experimental. Can change. + + It's a pair of function to read data, and to close the stream. + @since NEXT_RELEASE *) +type istream = (bytes -> int -> int -> int) * (unit -> unit) + +(** An output stream, into which we can push bytes. + + This is a tuple made of: + - a write function, to output byte. + - a flush function, to flush bytes to storage or network. + - a close function, to flush and dispose of resources. + + {b NOTE} experimental. Can change. + + @since NEXT_RELEASE *) +type ostream = (bytes -> int -> int -> unit) * (unit -> unit) * (unit -> unit) + +(** Input streams. + + @since NEXT_RELEASE *) +module Istream : sig + type t = istream + + val close : t -> unit + + val empty : t + + val of_chan : in_channel -> t + (** Make a buffered stream from the given channel. *) + + val of_chan_close_noerr : in_channel -> t + (** Same as {!of_chan} but the [close] method will never fail. *) + + val of_bytes : ?i:int -> ?len:int -> bytes -> t + (** A stream that just returns the slice of bytes starting from [i] + and of length [len]. *) + + val of_string : ?i:int -> ?len:int -> string -> t + + val read_all : ?buf:CCByte_buffer.t -> t -> string + (** Read the whole stream into a string. + @param buf a buffer that will be cleared and used to store the content. *) +end + +(** Buffered input stream, with an accessible buffer. + + {b NOTE} experimental. Can change. + @since NEXT_RELEASE *) +module Buffered_istream : sig + type t + + val istream : t -> istream + + val buf_bytes : t -> bytes + + val buf_length : t -> int + + val consume : t -> int -> unit + + val make : ?buf_size:int -> istream -> t + (** Create a buffered stream. *) + + val read_line : t -> string + (** Read one line. *) + + val read_lines_gen : t -> string gen + (** Generator of lines *) +end + +(** Output streams. + + @since NEXT_RELEASE *) +module Ostream : sig + type t = ostream + + val of_chan : out_channel -> t + + val of_buffer : Buffer.t -> t + + val of_byte_buf : CCByte_buffer.t -> t + + val write : t -> bytes -> int -> int -> unit + + val write_string : t -> string -> unit + + val flush : t -> unit + + val close : t -> unit +end + +val transfer : ?buf:bytes -> istream -> ostream -> unit +(** [transfer is os] transfers the whole content of [is] into [os]. + If [is] is infinite this will never return. + + @param buf an optional buffer to use as an intermediate storage for bytes. + @since NEXT_RELEASE +*)