From fa09029e8a6f2285fe37287c721d77f8719d4592 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 26 Feb 2015 16:20:23 +0100 Subject: [PATCH] Lwt_pipe.Reader: more combinators --- src/lwt/lwt_pipe.ml | 31 +++++++++++++++++++++++++++++++ src/lwt/lwt_pipe.mli | 12 +++++++++--- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/lwt/lwt_pipe.ml b/src/lwt/lwt_pipe.ml index b7c24a17..36af2b1f 100644 --- a/src/lwt/lwt_pipe.ml +++ b/src/lwt/lwt_pipe.ml @@ -27,6 +27,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. type 'a or_error = [`Ok of 'a | `Error of string] type 'a step = ['a or_error | `End] +let (>|=) = Lwt.(>|=) let (>>=) = Lwt.(>>=) module LwtErr = struct @@ -240,6 +241,28 @@ module Reader = struct keep b (fwd()); b + let map_s ~f a = + let b = create () in + let rec fwd () = + read a >>= function + | `Ok x -> f x >>= fun y -> write_step b (`Ok y) >>= fwd + | (`Error _) as e -> write_step b e >>= fun _ -> close b + | `End -> close b + in + keep b (fwd()); + b + + let filter ~f a = + let b = create () in + let rec fwd () = + read a >>= function + | `Ok x -> if f x then write_step b (`Ok x) >>= fwd else fwd() + | (`Error _) as e -> write_step b e >>= fun _ -> close b + | `End -> close b + in + keep b (fwd()); + b + let filter_map ~f a = let b = create () in let rec fwd () = @@ -280,6 +303,14 @@ module Reader = struct | `Error msg -> LwtErr.fail msg | `Ok x -> f x >>= fun () -> iter_s ~f t + let iter_p ~f t = + let rec iter acc = + read t >>= function + | `End -> Lwt.join acc >|= fun () -> `Ok () + | `Error msg -> LwtErr.fail msg + | `Ok x -> iter (f x :: acc) + in iter [] + let merge_all l = if l = [] then invalid_arg "merge_all"; let res = create () in diff --git a/src/lwt/lwt_pipe.mli b/src/lwt/lwt_pipe.mli index efcd0bee..63bcbeb7 100644 --- a/src/lwt/lwt_pipe.mli +++ b/src/lwt/lwt_pipe.mli @@ -140,16 +140,22 @@ module Reader : sig val map : f:('a -> 'b) -> ('a, [>`r]) pipe -> 'b t - val filter_map : f:('a -> 'b option) -> 'a t -> 'b t + val map_s : f:('a -> 'b Lwt.t) -> ('a, [>`r]) pipe -> 'b t - val fold : f:('acc -> 'a -> 'acc) -> x:'acc -> 'a t -> 'acc LwtErr.t + val filter : f:('a -> bool) -> ('a, [>`r]) pipe -> 'a t - val fold_s : f:('acc -> 'a -> 'acc Lwt.t) -> x:'acc -> 'a t -> 'acc LwtErr.t + val filter_map : f:('a -> 'b option) -> ('a, [>`r]) pipe -> 'b t + + val fold : f:('acc -> 'a -> 'acc) -> x:'acc -> ('a, [>`r]) pipe -> 'acc LwtErr.t + + val fold_s : f:('acc -> 'a -> 'acc Lwt.t) -> x:'acc -> ('a, [>`r]) pipe -> 'acc LwtErr.t val iter : f:('a -> unit) -> 'a t -> unit LwtErr.t val iter_s : f:('a -> unit Lwt.t) -> 'a t -> unit LwtErr.t + val iter_p : f:('a -> unit Lwt.t) -> 'a t -> unit LwtErr.t + val merge_both : 'a t -> 'a t -> 'a t (** Merge the two input streams in a non-specified order *)