lwt_pipe: remove Pipe module, put values at toplevel, put read/write directly on pipes

This commit is contained in:
Simon Cruanes 2015-02-20 15:14:45 +01:00
parent d257d91b0d
commit 244908a39e
2 changed files with 223 additions and 224 deletions

View file

@ -59,8 +59,7 @@ let ret_end = Lwt.return `End
exception Closed exception Closed
module Pipe = struct type ('a, +'perm) t = {
type ('a, +'perm) t = {
close : unit Lwt.u; close : unit Lwt.u;
closed : unit Lwt.t; closed : unit Lwt.t;
readers : 'a step Lwt.u Queue.t; (* readers *) readers : 'a step Lwt.u Queue.t; (* readers *)
@ -68,9 +67,11 @@ module Pipe = struct
blocked_writers : ('a step * unit Lwt.u) Queue.t; (* blocked writers *) blocked_writers : ('a step * unit Lwt.u) Queue.t; (* blocked writers *)
max_size : int; max_size : int;
mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *) mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *)
} constraint 'perm = [< `r | `w] } constraint 'perm = [< `r | `w]
let create ?(max_size=0) () = type ('a, 'perm) pipe = ('a, 'perm) t
let create ?(max_size=0) () =
let closed, close = Lwt.wait () in let closed, close = Lwt.wait () in
{ {
close; close;
@ -82,23 +83,23 @@ module Pipe = struct
keep=[]; keep=[];
} }
let keep p fut = p.keep <- fut :: p.keep let keep p fut = p.keep <- fut :: p.keep
let is_closed p = not (Lwt.is_sleeping p.closed) let is_closed p = not (Lwt.is_sleeping p.closed)
let close p = let close p =
if is_closed p then Lwt.return_unit if is_closed p then Lwt.return_unit
else ( else (
Lwt.wakeup p.close (); (* evaluate *) Lwt.wakeup p.close (); (* evaluate *)
Lwt.join p.keep; Lwt.join p.keep;
) )
let close_async p = Lwt.async (fun () -> close p) let close_async p = Lwt.async (fun () -> close p)
let wait p = Lwt.map (fun _ -> ()) p.closed let wait p = Lwt.map (fun _ -> ()) p.closed
(* try to take next element from writers buffer *) (* try to take next element from writers buffer *)
let try_read t = let try_read t =
if Queue.is_empty t.writers if Queue.is_empty t.writers
then if Queue.is_empty t.blocked_writers then if Queue.is_empty t.blocked_writers
then None then None
@ -119,8 +120,8 @@ module Pipe = struct
Some x Some x
) )
(* read next one *) (* read next one *)
let read t = match try_read t with let read t = match try_read t with
| None when is_closed t -> ret_end (* end of stream *) | None when is_closed t -> ret_end (* end of stream *)
| None -> | None ->
let fut, send = Lwt.wait () in let fut, send = Lwt.wait () in
@ -128,14 +129,12 @@ module Pipe = struct
fut fut
| Some x -> Lwt.return x | Some x -> Lwt.return x
(* TODO: signal writers when their value has less than max_size (* write a value *)
steps before being read *) let write_step t x =
(* write a value *)
let write t x =
if is_closed t then Lwt.fail Closed if is_closed t then Lwt.fail Closed
else if Queue.length t.readers > 0 else if Queue.length t.readers > 0
then ( then (
(* some reader waits, synchronize now *)
let send = Queue.pop t.readers in let send = Queue.pop t.readers in
Lwt.wakeup send x; Lwt.wakeup send x;
Lwt.return_unit Lwt.return_unit
@ -146,30 +145,35 @@ module Pipe = struct
Lwt.return_unit (* into buffer, do not wait *) Lwt.return_unit (* into buffer, do not wait *)
) )
else ( else (
(* block until the queue isn't full anymore *)
let is_done, signal_done = Lwt.wait () in let is_done, signal_done = Lwt.wait () in
Queue.push (x, signal_done) t.blocked_writers; Queue.push (x, signal_done) t.blocked_writers;
is_done (* block *) is_done (* block *)
) )
let rec connect_rec r w = let rec connect_rec r w =
read r >>= function read r >>= function
| `End -> Lwt.return_unit | `End -> Lwt.return_unit
| `Error _ as step -> write w step | `Error _ as step -> write_step w step
| `Ok _ as step -> | `Ok _ as step ->
write w step >>= fun () -> write_step w step >>= fun () ->
connect_rec r w connect_rec r w
let connect a b = (* close a when b closes *)
let fut = connect_rec a b in let link_close p ~after =
keep b fut
(* close a when b closes *)
let link_close p ~after =
Lwt.on_termination after.closed Lwt.on_termination after.closed
(fun _ -> close_async p) (fun _ -> close_async p)
(* close a when every member of after closes *) let connect ?(ownership=`None) a b =
let link_close_l p ~after = let fut = connect_rec a b in
keep b fut;
match ownership with
| `None -> ()
| `InOwnsOut -> link_close b ~after:a
| `OutOwnsIn -> link_close a ~after:b
(* close a when every member of after closes *)
let link_close_l p ~after =
let n = ref (List.length after) in let n = ref (List.length after) in
List.iter List.iter
(fun p' -> Lwt.on_termination p'.closed (fun p' -> Lwt.on_termination p'.closed
@ -178,79 +182,76 @@ module Pipe = struct
if !n = 0 then close_async p if !n = 0 then close_async p
) )
) after ) after
end
module Writer = struct let write_error t msg = write_step t (`Error msg)
type 'a t = ('a, [`w]) Pipe.t
let write t x = Pipe.write t (`Ok x) let write t x = write_step t (`Ok x)
let write_error t msg = Pipe.write t (`Error msg) let rec write_list t l = match l with
let rec write_list t l = match l with
| [] -> Lwt.return_unit | [] -> Lwt.return_unit
| x :: tail -> | x :: tail ->
write t x >>= fun () -> write_list t tail write t x >>= fun () -> write_list t tail
module Writer = struct
type 'a t = ('a, [`w]) pipe
let map ~f a = let map ~f a =
let b = Pipe.create() in let b = create() in
let rec fwd () = let rec fwd () =
Pipe.read b >>= function read b >>= function
| `Ok x -> write a (f x) >>= fwd | `Ok x -> write a (f x) >>= fwd
| `Error msg -> write_error a msg >>= fun _ -> Pipe.close a | `Error msg -> write_error a msg >>= fun _ -> close a
| `End -> Lwt.return_unit | `End -> Lwt.return_unit
in in
Pipe.keep b (fwd()); keep b (fwd());
(* when a gets closed, close b too *) (* when a gets closed, close b too *)
Pipe.link_close b ~after:a; link_close b ~after:a;
b b
let send_all l = let send_all l =
if l = [] then invalid_arg "send_all"; if l = [] then invalid_arg "send_all";
let res = Pipe.create () in let res = create () in
let rec fwd () = let rec fwd () =
Pipe.read res >>= function read res >>= function
| `End -> Lwt.return_unit | `End -> Lwt.return_unit
| `Ok x -> Lwt_list.iter_p (fun p -> write p x) l >>= fwd | `Ok x -> Lwt_list.iter_p (fun p -> write p x) l >>= fwd
| `Error msg -> Lwt_list.iter_p (fun p -> write_error p msg) l >>= fwd | `Error msg -> Lwt_list.iter_p (fun p -> write_error p msg) l >>= fwd
in in
(* do not GC before res dies; close res when any outputx is closed *) (* do not GC before res dies; close res when any outputx is closed *)
Pipe.keep res (fwd ()); keep res (fwd ());
List.iter (fun out -> Pipe.link_close res ~after:out) l; List.iter (fun out -> link_close res ~after:out) l;
res res
let send_both a b = send_all [a; b] let send_both a b = send_all [a; b]
end end
module Reader = struct module Reader = struct
type 'a t = ('a, [`r]) Pipe.t type 'a t = ('a, [`r]) pipe
let read = Pipe.read
let map ~f a = let map ~f a =
let b = Pipe.create () in let b = create () in
let rec fwd () = let rec fwd () =
Pipe.read a >>= function read a >>= function
| `Ok x -> Pipe.write b (`Ok (f x)) >>= fwd | `Ok x -> write_step b (`Ok (f x)) >>= fwd
| (`Error _) as e -> Pipe.write b e >>= fun _ -> Pipe.close b | (`Error _) as e -> write_step b e >>= fun _ -> close b
| `End -> Pipe.close b | `End -> close b
in in
Pipe.keep b (fwd()); keep b (fwd());
b b
let filter_map ~f a = let filter_map ~f a =
let b = Pipe.create () in let b = create () in
let rec fwd () = let rec fwd () =
Pipe.read a >>= function read a >>= function
| `Ok x -> | `Ok x ->
begin match f x with begin match f x with
| None -> fwd() | None -> fwd()
| Some y -> Pipe.write b (`Ok y) >>= fwd | Some y -> write_step b (`Ok y) >>= fwd
end end
| (`Error _) as e -> Pipe.write b e >>= fun _ -> Pipe.close b | (`Error _) as e -> write_step b e >>= fun _ -> close b
| `End -> Pipe.close b | `End -> close b
in in
Pipe.keep b (fwd()); keep b (fwd());
b b
let rec fold ~f ~x t = let rec fold ~f ~x t =
@ -280,61 +281,54 @@ module Reader = struct
let merge_all l = let merge_all l =
if l = [] then invalid_arg "merge_all"; if l = [] then invalid_arg "merge_all";
let res = Pipe.create () in let res = create () in
List.iter (fun p -> Pipe.connect p res) l; List.iter (fun p -> connect p res) l;
(* connect res' input to all members of l; close res when they all close *) (* connect res' input to all members of l; close res when they all close *)
Pipe.link_close_l res ~after:l; link_close_l res ~after:l;
res res
let merge_both a b = merge_all [a; b] let merge_both a b = merge_all [a; b]
let append a b = let append a b =
let c = Pipe.create () in let c = create () in
Pipe.connect a c; connect a c;
Lwt.on_success (Pipe.wait a) Lwt.on_success (wait a)
(fun () -> (fun () ->
Pipe.connect b c; connect b c;
Pipe.link_close c ~after:b (* once a and b finished, c is too *) link_close c ~after:b (* once a and b finished, c is too *)
); );
c c
end end
let connect ?(ownership=`None) a b =
Pipe.connect a b;
match ownership with
| `None -> ()
| `InOwnsOut -> Pipe.link_close b ~after:a
| `OutOwnsIn -> Pipe.link_close a ~after:b
(** {2 Conversions} *) (** {2 Conversions} *)
let of_list l : _ Reader.t = let of_list l : _ Reader.t =
let p = Pipe.create ~max_size:0 () in let p = create ~max_size:0 () in
Pipe.keep p (Lwt_list.iter_s (Writer.write p) l >>= fun () -> Pipe.close p); keep p (Lwt_list.iter_s (write p) l >>= fun () -> close p);
p p
let of_array a = let of_array a =
let p = Pipe.create ~max_size:0 () in let p = create ~max_size:0 () in
let rec send i = let rec send i =
if i = Array.length a then Pipe.close p if i = Array.length a then close p
else ( else (
Writer.write p a.(i) >>= fun () -> write p a.(i) >>= fun () ->
send (i+1) send (i+1)
) )
in in
Pipe.keep p (send 0); keep p (send 0);
p p
let of_string a = let of_string a =
let p = Pipe.create ~max_size:0 () in let p = create ~max_size:0 () in
let rec send i = let rec send i =
if i = String.length a then Pipe.close p if i = String.length a then close p
else ( else (
Writer.write p (String.get a i) >>= fun () -> write p (String.get a i) >>= fun () ->
send (i+1) send (i+1)
) )
in in
Pipe.keep p (send 0); keep p (send 0);
p p
let to_list_rev r = let to_list_rev r =
@ -348,16 +342,16 @@ let to_list_exn r =
| `Ok x -> Lwt.return x | `Ok x -> Lwt.return x
let to_buffer buf = let to_buffer buf =
let p = Pipe.create () in let p = create () in
Pipe.keep p ( keep p (
Reader.iter ~f:(fun c -> Buffer.add_char buf c) p >>= fun _ -> Reader.iter ~f:(fun c -> Buffer.add_char buf c) p >>= fun _ ->
Lwt.return_unit Lwt.return_unit
); );
p p
let to_buffer_str buf = let to_buffer_str buf =
let p = Pipe.create () in let p = create () in
Pipe.keep p ( keep p (
Reader.iter ~f:(fun s -> Buffer.add_string buf s) p >>= fun _ -> Reader.iter ~f:(fun s -> Buffer.add_string buf s) p >>= fun _ ->
Lwt.return_unit Lwt.return_unit
); );
@ -368,41 +362,41 @@ let to_buffer_str buf =
module IO = struct module IO = struct
let read ?(bufsize=4096) ic : _ Reader.t = let read ?(bufsize=4096) ic : _ Reader.t =
let buf = Bytes.make bufsize ' ' in let buf = Bytes.make bufsize ' ' in
let p = Pipe.create ~max_size:0 () in let p = create ~max_size:0 () in
let rec send() = let rec send() =
Lwt_io.read_into ic buf 0 bufsize >>= fun n -> Lwt_io.read_into ic buf 0 bufsize >>= fun n ->
if n = 0 then Pipe.close p if n = 0 then close p
else else
Writer.write p (Bytes.sub_string buf 0 n) >>= fun () -> write p (Bytes.sub_string buf 0 n) >>= fun () ->
send () send ()
in Lwt.async send; in Lwt.async send;
p p
let read_lines ic = let read_lines ic =
let p = Pipe.create () in let p = create () in
let rec send () = let rec send () =
Lwt_io.read_line_opt ic >>= function Lwt_io.read_line_opt ic >>= function
| None -> Pipe.close p | None -> close p
| Some line -> Writer.write p line >>= fun () -> send () | Some line -> write p line >>= fun () -> send ()
in in
Lwt.async send; Lwt.async send;
p p
let write oc = let write oc =
let p = Pipe.create () in let p = create () in
Pipe.keep p ( keep p (
Reader.iter_s ~f:(Lwt_io.write oc) p >>= fun _ -> Reader.iter_s ~f:(Lwt_io.write oc) p >>= fun _ ->
Lwt_io.flush oc >>= fun () -> Lwt_io.flush oc >>= fun () ->
Pipe.close p close p
); );
p p
let write_lines oc = let write_lines oc =
let p = Pipe.create () in let p = create () in
Pipe.keep p ( keep p (
Reader.iter_s ~f:(Lwt_io.write_line oc) p >>= fun _ -> Reader.iter_s ~f:(Lwt_io.write_line oc) p >>= fun _ ->
Lwt_io.flush oc >>= fun () -> Lwt_io.flush oc >>= fun () ->
Pipe.close p close p
); );
p p
end end

View file

@ -46,7 +46,7 @@ Lwt_io.with_file ~mode:Lwt_io.output "/tmp/foo"
(fun oc -> (fun oc ->
let p2 = P.IO.write_lines oc in let p2 = P.IO.write_lines oc in
P.connect ~ownership:`InOwnsOut p1 p2; P.connect ~ownership:`InOwnsOut p1 p2;
P.Pipe.wait p2 P.wait p2
);; );;
]} ]}
@ -66,49 +66,59 @@ end
exception Closed exception Closed
module Pipe : sig type ('a, +'perm) t constraint 'perm = [< `r | `w]
type ('a, +'perm) t constraint 'perm = [< `r | `w] (** A pipe between producers of values of type 'a, and consumers of values
(** A pipe between producers of values of type 'a, and consumers of values
of type 'a. *) of type 'a. *)
val keep : _ t -> unit Lwt.t -> unit type ('a, 'perm) pipe = ('a, 'perm) t
(** [keep p fut] adds a pointer from [p] to [fut] so that [fut] is not
val keep : _ t -> unit Lwt.t -> unit
(** [keep p fut] adds a pointer from [p] to [fut] so that [fut] is not
garbage-collected before [p] *) garbage-collected before [p] *)
val is_closed : _ t -> bool val is_closed : _ t -> bool
val close : _ t -> unit Lwt.t val close : _ t -> unit Lwt.t
(** [close p] closes [p], which will not accept input anymore. (** [close p] closes [p], which will not accept input anymore.
This sends [`End] to all readers connected to [p] *) This sends [`End] to all readers connected to [p] *)
val close_async : _ t -> unit val close_async : _ t -> unit
(** Same as {!close} but closes in the background *) (** Same as {!close} but closes in the background *)
val wait : _ t -> unit Lwt.t val wait : _ t -> unit Lwt.t
(** Evaluates once the pipe closes *) (** Evaluates once the pipe closes *)
val create : ?max_size:int -> unit -> ('a, 'perm) t val create : ?max_size:int -> unit -> ('a, 'perm) t
(** Create a new pipe. (** Create a new pipe.
@param max_size size of internal buffer. Default 0. *) @param max_size size of internal buffer. Default 0. *)
val connect : ('a, [>`r]) t -> ('a, [>`w]) t -> unit val connect : ?ownership:[`None | `InOwnsOut | `OutOwnsIn] ->
(** [connect p1 p2] forwards every item output by [p1] into [p2]'s input ('a, [>`r]) t -> ('a, [>`w]) t -> unit
until [p1] is closed. *) (** [connect p1 p2] forwards every item output by [p1] into [p2]'s input
end until [p1] is closed.
@param own determines which pipes owns which (the owner, when it
closes, also closes the ownee) *)
val link_close : _ t -> after:_ t -> unit
(** [link_close p ~after] will close [p] when [after] closes.
if [after] is closed already, closes [p] immediately *)
val read : ('a, [>`r]) t -> 'a step Lwt.t
(** Read the next value from a Pipe *)
val write : ('a, [>`w]) t -> 'a -> unit Lwt.t
(** @raise Pipe.Closed if the writer is closed *)
val write_list : ('a, [>`w]) t -> 'a list -> unit Lwt.t
(** @raise Pipe.Closed if the writer is closed *)
val write_error : (_, [>`w]) t -> string -> unit Lwt.t
(** @raise Pipe.Closed if the writer is closed *)
module Writer : sig module Writer : sig
type 'a t = ('a, [`w]) Pipe.t type 'a t = ('a, [`w]) pipe
val write : 'a t -> 'a -> unit Lwt.t val map : f:('a -> 'b) -> ('b, [>`w]) pipe -> 'a t
(** @raise Pipe.Closed if the writer is closed *)
val write_list : 'a t -> 'a list -> unit Lwt.t
(** @raise Pipe.Closed if the writer is closed *)
val write_error : _ t -> string -> unit Lwt.t
(** @raise Pipe.Closed if the writer is closed *)
val map : f:('a -> 'b) -> 'b t -> 'a t
(** Map values before writing them *) (** Map values before writing them *)
val send_both : 'a t -> 'a t -> 'a t val send_both : 'a t -> 'a t -> 'a t
@ -122,11 +132,9 @@ module Writer : sig
end end
module Reader : sig module Reader : sig
type 'a t = ('a, [`r]) Pipe.t type 'a t = ('a, [`r]) pipe
val read : 'a t -> 'a step Lwt.t val map : f:('a -> 'b) -> ('a, [>`r]) pipe -> 'b t
val map : f:('a -> 'b) -> 'a t -> 'b t
val filter_map : f:('a -> 'b option) -> 'a t -> 'b t val filter_map : f:('a -> 'b option) -> 'a t -> 'b t
@ -146,13 +154,10 @@ module Reader : sig
@raise Invalid_argument if the list is empty *) @raise Invalid_argument if the list is empty *)
val append : 'a t -> 'a t -> 'a t val append : 'a t -> 'a t -> 'a t
(** [append a b] reads from [a] until [a] closes, then reads from [b]
and closes when [b] closes *)
end end
val connect : ?ownership:[`None | `InOwnsOut | `OutOwnsIn] ->
'a Reader.t -> 'a Writer.t -> unit
(** Handy synonym to {!Pipe.connect}, with additional resource management.
@param own determines which pipes owns which *)
(** {2 Conversions} *) (** {2 Conversions} *)
val of_list : 'a list -> 'a Reader.t val of_list : 'a list -> 'a Reader.t