wip: add structural streams to IO

This commit is contained in:
Simon Cruanes 2022-02-08 14:42:10 -05:00
parent c5d435848b
commit d00e9accc7
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 308 additions and 0 deletions

View file

@ -433,3 +433,207 @@ module File = struct
let name = Filename.temp_file ?temp_dir prefix suffix in
finally_ f name ~h:remove_noerr
end
type istream = (bytes -> int -> int -> int) * (unit -> unit)
type ostream = (bytes -> int -> int -> unit) * (unit -> unit)
let transfer ?(buf=Bytes.create (16 * 1024)) (is:istream) (os:ostream) : unit =
let continue = ref true in
while !continue do
let n = (fst is) buf 0 (Bytes.length buf) in
if n=0 then continue := false
else (
(fst os) buf 0 n
)
done
module Ostream = struct
type t = ostream
let close (_,cl) = cl
let of_chan oc : t =
let write b i len = output oc b i len in
let close () = close_out oc in
write, close
let of_buf buf : t =
let write b i len = Buf.add_bytes buf b i len in
let close() = () in
write, close
end
module Istream = struct
type t = istream
let close (_,cl) = cl()
let empty : t = (fun _ _ _ -> 0), (fun()->())
let of_chan_ ~close ic : t =
let read b i len = input ic b i len in
let close() = close ic in
read, close
let of_chan = of_chan_ ~close:close_in
let of_chan_close_noerr = of_chan_ ~close:close_in_noerr
(*
let rec iter f (self:t) : unit =
let s, i, len = self.bs_fill_buf () in
if len=0 then (
self.bs_close();
) else (
f s i len;
self.bs_consume len;
(iter [@tailcall]) f self
)
*)
let to_chan ?buf (oc:out_channel) (self:t) =
let os = Ostream.of_chan oc in
copy_into ?buf self os
let of_bytes ?(i=0) ?len b : t =
(* invariant: !i+!len is constant *)
let len =
ref (
match len with
| Some n ->
if n > Bytes.length b - i then invalid_arg "Byte_stream.of_bytes";
n
| None -> Bytes.length b - i
)
in
let i = ref i in
let close()= () in
let read b2 i2 n2 =
let n = min n2 !len in
Bytes.blit b !i b2 i2 n;
i := !i + n;
len := !len - n;
n
in
read, close
let of_string s : t =
of_bytes (Bytes.unsafe_of_string s)
let with_file file f =
let ic = open_in file in
try
let x = f (of_chan ic) in
close_in ic;
x
with e ->
close_in_noerr ic;
raise e
let read_all ?(buf=Buf.create()) (self:t) : string =
let continue = ref true in
while !continue do
if Buf.size buf = Buf.cap buf then Buf.grow_ buf;
let n = (fst self) buf.bytes buf.len (Buf.cap buf - buf.len) in
if n=0 then continue := false
else (
buf.len <- buf.len + n;
)
done;
let s = Buf.contents buf in
Buf.clear buf;
s
(*
(* put [n] bytes from the input into bytes *)
let read_exactly_ ~too_short (self:t) (bytes:bytes) (n:int) : unit =
assert (Bytes.length bytes >= n);
let offset = ref 0 in
while !offset < n do
let s, i, len = self.bs_fill_buf () in
let n_read = min len (n- !offset) in
Bytes.blit s i bytes !offset n_read;
offset := !offset + n_read;
self.bs_consume n_read;
if n_read=0 then too_short();
done
*)
(* read a line into the buffer, after clearing it. *)
let read_line_into (self:t) ~buf : unit =
Buf.clear buf;
let continue = ref true in
while !continue do
let s, i, len = self.bs_fill_buf () in
if len=0 then (
continue := false;
if Buf_.size buf = 0 then raise End_of_file;
);
let j = ref i in
while !j < i+len && Bytes.get s !j <> '\n' do
incr j
done;
if !j-i < len then (
assert (Bytes.get s !j = '\n');
Buf_.add_bytes buf s i (!j-i); (* without \n *)
self.bs_consume (!j-i+1); (* remove \n *)
continue := false
) else (
Buf_.add_bytes buf s i len;
self.bs_consume len;
)
done
(* new stream with maximum size [max_size].
@param close_rec if true, closing this will also close the input stream
@param too_big called with read size if the max size is reached *)
let limit_size_to ~close_rec ~max_size ~too_big (self:t) : t =
let size = ref 0 in
let continue = ref true in
{ bs_fill_buf =
(fun () ->
if !continue then self.bs_fill_buf() else Bytes.empty, 0, 0);
bs_close=(fun () ->
if close_rec then self.bs_close ());
bs_consume = (fun n ->
size := !size + n;
if !size > max_size then (
continue := false;
too_big !size
) else (
self.bs_consume n
));
}
(* read exactly [size] bytes from the stream *)
let read_exactly ~close_rec ~size ~too_short (self:t) : t =
if size=0 then (
empty
) else (
let size = ref size in
{ bs_fill_buf = (fun () ->
(* must not block on [self] if we're done *)
if !size = 0 then Bytes.empty, 0, 0
else (
let buf, i, len = self.bs_fill_buf () in
let len = min len !size in
if len = 0 && !size > 0 then (
too_short !size;
);
buf, i, len
)
);
bs_close=(fun () ->
(* close underlying stream if [close_rec] *)
if close_rec then self.bs_close();
size := 0);
bs_consume = (fun n ->
let n = min n !size in
size := !size - n;
self.bs_consume n);
}
)
let read_line ?(buf=Buf.create()) self : string =
read_line_into self ~buf;
Buf.contents buf
end

View file

@ -288,3 +288,107 @@ module File : sig
See {!Filename.temp_file}.
@since 0.17 *)
end
(** Generic input stream of data.
Streams are used to represent a series of bytes that can arrive progressively.
For example, an uploaded file will be sent as a series of chunks.
{b NOTE} experimental. Can change.
It's a pair of function to read data, and to close the stream.
@since NEXT_RELEASE *)
type istream = (bytes -> int -> int -> int) * (unit -> unit)
(** An output stream, into which we can push bytes.
This is a tuple made of:
- a write function, to output byte.
- a flush function, to flush bytes to storage or network.
- a close function, to flush and dispose of resources.
{b NOTE} experimental. Can change.
@since NEXT_RELEASE *)
type ostream = (bytes -> int -> int -> unit) * (unit -> unit) * (unit -> unit)
(** Input streams.
@since NEXT_RELEASE *)
module Istream : sig
type t = istream
val close : t -> unit
val empty : t
val of_chan : in_channel -> t
(** Make a buffered stream from the given channel. *)
val of_chan_close_noerr : in_channel -> t
(** Same as {!of_chan} but the [close] method will never fail. *)
val of_bytes : ?i:int -> ?len:int -> bytes -> t
(** A stream that just returns the slice of bytes starting from [i]
and of length [len]. *)
val of_string : ?i:int -> ?len:int -> string -> t
val read_all : ?buf:CCByte_buffer.t -> t -> string
(** Read the whole stream into a string.
@param buf a buffer that will be cleared and used to store the content. *)
end
(** Buffered input stream, with an accessible buffer.
{b NOTE} experimental. Can change.
@since NEXT_RELEASE *)
module Buffered_istream : sig
type t
val istream : t -> istream
val buf_bytes : t -> bytes
val buf_length : t -> int
val consume : t -> int -> unit
val make : ?buf_size:int -> istream -> t
(** Create a buffered stream. *)
val read_line : t -> string
(** Read one line. *)
val read_lines_gen : t -> string gen
(** Generator of lines *)
end
(** Output streams.
@since NEXT_RELEASE *)
module Ostream : sig
type t = ostream
val of_chan : out_channel -> t
val of_buffer : Buffer.t -> t
val of_byte_buf : CCByte_buffer.t -> t
val write : t -> bytes -> int -> int -> unit
val write_string : t -> string -> unit
val flush : t -> unit
val close : t -> unit
end
val transfer : ?buf:bytes -> istream -> ostream -> unit
(** [transfer is os] transfers the whole content of [is] into [os].
If [is] is infinite this will never return.
@param buf an optional buffer to use as an intermediate storage for bytes.
@since NEXT_RELEASE
*)