Lwt_pipe.Reader: more combinators

This commit is contained in:
Simon Cruanes 2015-02-26 16:20:23 +01:00
parent cf6d730998
commit fa09029e8a
2 changed files with 40 additions and 3 deletions

View file

@ -27,6 +27,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
type 'a or_error = [`Ok of 'a | `Error of string] type 'a or_error = [`Ok of 'a | `Error of string]
type 'a step = ['a or_error | `End] type 'a step = ['a or_error | `End]
let (>|=) = Lwt.(>|=)
let (>>=) = Lwt.(>>=) let (>>=) = Lwt.(>>=)
module LwtErr = struct module LwtErr = struct
@ -240,6 +241,28 @@ module Reader = struct
keep b (fwd()); keep b (fwd());
b b
let map_s ~f a =
let b = create () in
let rec fwd () =
read a >>= function
| `Ok x -> f x >>= fun y -> write_step b (`Ok y) >>= fwd
| (`Error _) as e -> write_step b e >>= fun _ -> close b
| `End -> close b
in
keep b (fwd());
b
let filter ~f a =
let b = create () in
let rec fwd () =
read a >>= function
| `Ok x -> if f x then write_step b (`Ok x) >>= fwd else fwd()
| (`Error _) as e -> write_step b e >>= fun _ -> close b
| `End -> close b
in
keep b (fwd());
b
let filter_map ~f a = let filter_map ~f a =
let b = create () in let b = create () in
let rec fwd () = let rec fwd () =
@ -280,6 +303,14 @@ module Reader = struct
| `Error msg -> LwtErr.fail msg | `Error msg -> LwtErr.fail msg
| `Ok x -> f x >>= fun () -> iter_s ~f t | `Ok x -> f x >>= fun () -> iter_s ~f t
let iter_p ~f t =
let rec iter acc =
read t >>= function
| `End -> Lwt.join acc >|= fun () -> `Ok ()
| `Error msg -> LwtErr.fail msg
| `Ok x -> iter (f x :: acc)
in iter []
let merge_all l = let merge_all l =
if l = [] then invalid_arg "merge_all"; if l = [] then invalid_arg "merge_all";
let res = create () in let res = create () in

View file

@ -140,16 +140,22 @@ module Reader : sig
val map : f:('a -> 'b) -> ('a, [>`r]) pipe -> 'b t val map : f:('a -> 'b) -> ('a, [>`r]) pipe -> 'b t
val filter_map : f:('a -> 'b option) -> 'a t -> 'b t val map_s : f:('a -> 'b Lwt.t) -> ('a, [>`r]) pipe -> 'b t
val fold : f:('acc -> 'a -> 'acc) -> x:'acc -> 'a t -> 'acc LwtErr.t val filter : f:('a -> bool) -> ('a, [>`r]) pipe -> 'a t
val fold_s : f:('acc -> 'a -> 'acc Lwt.t) -> x:'acc -> 'a t -> 'acc LwtErr.t val filter_map : f:('a -> 'b option) -> ('a, [>`r]) pipe -> 'b t
val fold : f:('acc -> 'a -> 'acc) -> x:'acc -> ('a, [>`r]) pipe -> 'acc LwtErr.t
val fold_s : f:('acc -> 'a -> 'acc Lwt.t) -> x:'acc -> ('a, [>`r]) pipe -> 'acc LwtErr.t
val iter : f:('a -> unit) -> 'a t -> unit LwtErr.t val iter : f:('a -> unit) -> 'a t -> unit LwtErr.t
val iter_s : f:('a -> unit Lwt.t) -> 'a t -> unit LwtErr.t val iter_s : f:('a -> unit Lwt.t) -> 'a t -> unit LwtErr.t
val iter_p : f:('a -> unit Lwt.t) -> 'a t -> unit LwtErr.t
val merge_both : 'a t -> 'a t -> 'a t val merge_both : 'a t -> 'a t -> 'a t
(** Merge the two input streams in a non-specified order *) (** Merge the two input streams in a non-specified order *)