wip: Lwt_pipe

This commit is contained in:
Simon Cruanes 2015-02-19 16:41:47 +01:00
parent f6ea8b0aa2
commit 77b6197c49
3 changed files with 428 additions and 1 deletions

2
_oasis
View file

@ -132,7 +132,7 @@ Library "containers_thread"
Library "containers_lwt" Library "containers_lwt"
Path: src/lwt Path: src/lwt
Modules: Lwt_automaton, Lwt_actor, Lwt_klist Modules: Lwt_automaton, Lwt_actor, Lwt_klist, Lwt_pipe
Pack: true Pack: true
FindlibName: lwt FindlibName: lwt
FindlibParent: containers FindlibParent: containers

287
src/lwt/lwt_pipe.ml Normal file
View file

@ -0,0 +1,287 @@
(*
copyright (c) 2013-2014, simon cruanes
all rights reserved.
redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
type 'a or_error = [`Ok of 'a | `Error of string]
type 'a step = ['a or_error | `End]
let (>>=) = Lwt.(>>=)
let (>|=) = Lwt.(>|=)
module LwtErr = struct
type 'a t = 'a or_error Lwt.t
let return x = Lwt.return (`Ok x)
let return_unit = Lwt.return (`Ok ())
let fail msg = Lwt.return (`Error msg)
let (>>=) x f =
Lwt.bind x
(function
| `Error msg -> fail msg
| `Ok y -> f y
)
let (>|=) x f =
Lwt.map
(function
| `Error _ as e -> e
| `Ok x -> `Ok (f x)
) x
end
let step_map f = function
| `Ok x -> `Ok (f x)
| (`Error _ | `End) as e -> e
let (>>|=) = LwtErr.(>|=)
let ret_end = Lwt.return `End
module Pipe = struct
type -'a writer = 'a step -> unit Lwt.t
type +'a reader = unit -> 'a step Lwt.t
(* messages given to writers through the condition *)
type 'a msg =
| Send of 'a step Lwt.u (* send directly to reader *)
| SendQueue (* push into queue *)
type 'a t = {
lock : Lwt_mutex.t;
queue : 'a step Queue.t;
max_size : int;
cond : 'a msg Lwt_condition.t;
mutable keep : unit Lwt.t list; (* do not GC *)
}
let create ?(max_size=0) () = {
queue=Queue.create();
max_size;
lock=Lwt_mutex.create();
cond=Lwt_condition.create();
keep=[];
}
let keep p fut = p.keep <- fut :: p.keep
(* read next one *)
let reader t () =
Lwt_mutex.with_lock t.lock
(fun () ->
if Queue.is_empty t.queue
then (
let fut, send = Lwt.wait () in
Lwt_condition.signal t.cond (Send send);
fut
) else (
(* direct pop *)
assert (t.max_size > 0);
let x = Queue.pop t.queue in
Lwt_condition.signal t.cond SendQueue; (* queue isn't full anymore *)
Lwt.return x
)
)
(* write a value *)
let writer t x =
let rec try_write () =
if Queue.length t.queue < t.max_size then (
Queue.push x t.queue;
Lwt.return_unit
) else (
(* wait for readers to consume the queue *)
Lwt_condition.wait ~mutex:t.lock t.cond >>= fun msg ->
match msg with
| Send s ->
Lwt.wakeup s x;
Lwt.return_unit
| SendQueue -> try_write () (* try again! *)
)
in
Lwt_mutex.with_lock t.lock try_write
let create_pair ?max_size () =
let p = create ?max_size () in
reader p, writer p
let rec connect_ (r:'a reader) (w:'a writer) =
r () >>= function
| `End -> w `End (* then stop *)
| (`Error _ | `Ok _) as step -> w step >>= fun () -> connect_ r w
let pipe_into p1 p2 =
connect_ (reader p1) (writer p2)
end
let connect r w = Pipe.connect_ r w
module Writer = struct
type -'a t = 'a Pipe.writer
let write t x = t (`Ok x)
let write_error t msg = t (`Error msg)
let write_end t = t `End
let rec write_list t l = match l with
| [] -> Lwt.return_unit
| x :: tail ->
write t x >>= fun () -> write_list t tail
let map ~f t x = t (step_map f x)
end
module Reader = struct
type +'a t = 'a Pipe.reader
let read t = t ()
let map ~f t () =
t () >|= (step_map f)
let rec filter_map ~f t () =
t () >>= function
| `Error msg -> LwtErr.fail msg
| `Ok x ->
begin match f x with
| Some y -> LwtErr.return y
| None -> filter_map ~f t ()
end
| `End -> ret_end
let rec fold ~f ~x t =
t () >>= function
| `End -> LwtErr.return x
| `Error msg -> LwtErr.fail msg
| `Ok y -> fold ~f ~x:(f x y) t
let rec fold_s ~f ~x t =
t () >>= function
| `End -> LwtErr.return x
| `Error msg -> LwtErr.fail msg
| `Ok y ->
f x y >>= fun x -> fold_s ~f ~x t
let rec iter ~f t =
t () >>= function
| `End -> LwtErr.return_unit
| `Error msg -> LwtErr.fail msg
| `Ok x -> f x; iter ~f t
let rec iter_s ~f t =
t () >>= function
| `End -> LwtErr.return_unit
| `Error msg -> LwtErr.fail msg
| `Ok x -> f x >>= fun () -> iter_s ~f t
let merge a b : _ t =
let r, w = Pipe.create_pair () in
Lwt.async (fun () -> Lwt.join [connect a w; connect b w]);
r
end
(** {2 Conversions} *)
let of_list l : _ Reader.t =
let l = ref l in
fun () -> match !l with
| [] -> ret_end
| x :: tail ->
l := tail;
Lwt.return (`Ok x)
let of_array a =
let i = ref 0 in
fun () ->
if !i = Array.length a
then ret_end
else (
let x = a.(!i) in
incr i;
Lwt.return (`Ok x)
)
let of_string s =
let i = ref 0 in
fun () ->
if !i = String.length s
then ret_end
else (
let x = String.get s !i in
incr i;
Lwt.return (`Ok x)
)
let to_rev_list w =
Reader.fold ~f:(fun acc x -> x :: acc) ~x:[] w
let to_list w = to_rev_list w >>|= List.rev
let to_list_exn w =
to_list w >>= function
| `Error msg -> Lwt.fail (Failure msg)
| `Ok x -> Lwt.return x
let to_buffer buf : _ Writer.t = function
| `Ok c ->
Buffer.add_char buf c;
Lwt.return_unit
| `Error _ | `End -> Lwt.return_unit
let to_buffer_str buf = function
| `Ok s ->
Buffer.add_string buf s;
Lwt.return_unit
| `Error _ | `End -> Lwt.return_unit
(** {2 Basic IO wrappers} *)
module IO = struct
let read ?(bufsize=4096) ic : _ Reader.t =
let buf = Bytes.make bufsize ' ' in
fun () ->
Lwt_io.read_into ic buf 0 bufsize >>= fun n ->
if n = 0 then ret_end
else
Lwt.return (`Ok (Bytes.sub_string buf 0 n))
let read_lines ic () =
Lwt_io.read_line_opt ic >>= function
| None -> ret_end
| Some line -> Lwt.return (`Ok line)
let write oc = function
| `Ok s -> Lwt_io.write oc s
| `End | `Error _ -> Lwt.return_unit
let write_lines oc = function
| `Ok l -> Lwt_io.write_line oc l
| `End | `Error _ -> Lwt.return_unit
end

140
src/lwt/lwt_pipe.mli Normal file
View file

@ -0,0 +1,140 @@
(*
copyright (c) 2013-2014, simon cruanes
all rights reserved.
redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Pipes, Readers, Writers}
Stream processing using:
{- Pipe: a possibly buffered channel through which readers and writer communicate}
{- Reader: accepts values, produces effects}
{- Writer: yield values}
*)
type 'a or_error = [`Ok of 'a | `Error of string]
type 'a step = ['a or_error | `End]
module LwtErr : sig
type 'a t = 'a or_error Lwt.t
val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
val (>|=) : 'a t -> ('a -> 'b) -> 'b t
val return : 'a -> 'a t
val fail : string -> 'a t
end
module Writer : sig
type -'a t
val write : 'a t -> 'a -> unit Lwt.t
val write_list : 'a t -> 'a list -> unit Lwt.t
val write_error : _ t -> string -> unit Lwt.t
val write_end : _ t -> unit Lwt.t
val map : f:('a -> 'b) -> 'b t -> 'a t
end
module Reader : sig
type +'a t
val read : 'a t -> 'a step Lwt.t
val map : f:('a -> 'b) -> 'a t -> 'b t
val filter_map : f:('a -> 'b option) -> 'a t -> 'b t
val fold : f:('acc -> 'a -> 'acc) -> x:'acc -> 'a t -> 'acc LwtErr.t
val fold_s : f:('acc -> 'a -> 'acc Lwt.t) -> x:'acc -> 'a t -> 'acc LwtErr.t
val iter : f:('a -> unit) -> 'a t -> unit LwtErr.t
val iter_s : f:('a -> unit Lwt.t) -> 'a t -> unit LwtErr.t
val merge : 'a t -> 'a t -> 'a t
(** Merge the two input streams *)
end
module Pipe : sig
type 'a t
(** A pipe between producers of values of type 'a, and consumers of values
of type 'a. *)
val reader : 'a t -> 'a Reader.t
val writer : 'a t -> 'a Writer.t
val keep : _ t -> unit Lwt.t -> unit
(** [keep p fut] adds a pointer from [p] to [fut] so that [fut] is not
garbage-collected before [p] *)
val create : ?max_size:int -> unit -> 'a t
(** Create a new pipe.
@param max_size size of internal buffer. Default 0. *)
val create_pair : ?max_size:int -> unit -> 'a Reader.t * 'a Writer.t
(** Create a pair [r, w] connect by a pipe *)
val pipe_into : 'a t -> 'a t -> unit Lwt.t
(** [connect p1 p2] forwards every item output by [p1] into [p2]'s input
until [`End] is reached. After [`End] is sent, the process stops. *)
end
val connect : 'a Reader.t -> 'a Writer.t -> unit Lwt.t
(** [connect r w] sends every item read from [r] into [w] *)
(** {2 Conversions} *)
val of_list : 'a list -> 'a Reader.t
val of_array : 'a array -> 'a Reader.t
val of_string : string -> char Reader.t
val to_rev_list : 'a Reader.t -> 'a list LwtErr.t
val to_list : 'a Reader.t -> 'a list LwtErr.t
val to_list_exn : 'a Reader.t -> 'a list Lwt.t
(** Same as {!to_list}, but can fail with
@raise Failure if some error is met *)
val to_buffer : Buffer.t -> char Writer.t
val to_buffer_str : Buffer.t -> string Writer.t
(** {2 Basic IO wrappers} *)
module IO : sig
val read : ?bufsize:int -> Lwt_io.input_channel -> string Reader.t
val read_lines : Lwt_io.input_channel -> string Reader.t
val write : Lwt_io.output_channel -> string Writer.t
val write_lines : Lwt_io.output_channel -> string Writer.t
end