wip: lwt_pipe

This commit is contained in:
Simon Cruanes 2015-02-19 19:50:36 +01:00
parent 89aded1311
commit aef87c148d
2 changed files with 76 additions and 32 deletions

View file

@ -63,12 +63,10 @@ module Pipe = struct
type ('a, +'perm) t = { type ('a, +'perm) t = {
close : unit Lwt.u; close : unit Lwt.u;
closed : unit Lwt.t; closed : unit Lwt.t;
buf : readers : 'a step Lwt.u Queue.t; (* readers *)
[`Item of 'a step writers : 'a step Queue.t;
| `Block of 'a step * unit Lwt.u blocked_writers : ('a step * unit Lwt.u) Queue.t; (* blocked writers *)
] Queue.t; (* actions queued *)
max_size : int; max_size : int;
box : 'a step Lwt.u Lwt_mvar.t;
mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *) mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *)
} constraint 'perm = [< `r | `w] } constraint 'perm = [< `r | `w]
@ -77,9 +75,10 @@ module Pipe = struct
{ {
close; close;
closed; closed;
buf = Queue.create (); readers = Queue.create ();
writers = Queue.create ();
blocked_writers = Queue.create ();
max_size; max_size;
box=Lwt_mvar.create_empty ();
keep=[]; keep=[];
} }
@ -98,23 +97,36 @@ module Pipe = struct
let wait p = Lwt.map (fun _ -> ()) p.closed let wait p = Lwt.map (fun _ -> ()) p.closed
(* try to take next element from buffer *) (* try to take next element from writers buffer *)
let try_next_buf t = let try_read t =
if Queue.is_empty t.buf then None if Queue.is_empty t.writers
else Some (Queue.pop t.buf) then if Queue.is_empty t.blocked_writers
then None
else (
assert (t.max_size = 0);
let x, signal_done = Queue.pop t.blocked_writers in
Lwt.wakeup signal_done ();
Some x
)
else (
let x = Queue.pop t.writers in
(* some writer may unblock *)
if not (Queue.is_empty t.blocked_writers) && Queue.length t.writers < t.max_size then (
let y, signal_done = Queue.pop t.blocked_writers in
Queue.push y t.writers;
Lwt.wakeup signal_done ();
);
Some x
)
(* read next one *) (* read next one *)
let read t = let read t = match try_read t with
match try_next_buf t with
| None when is_closed t -> ret_end (* end of stream *) | None when is_closed t -> ret_end (* end of stream *)
| None -> | None ->
let fut, send = Lwt.wait () in let fut, send = Lwt.wait () in
Lwt_mvar.put t.box send >>= fun () -> Queue.push send t.readers;
fut fut
| Some (`Item x) -> Lwt.return x | Some x -> Lwt.return x
| Some (`Block (x, signal_done)) ->
Lwt.wakeup signal_done (); (* signal the writer it's done *)
Lwt.return x
(* TODO: signal writers when their value has less than max_size (* TODO: signal writers when their value has less than max_size
steps before being read *) steps before being read *)
@ -122,14 +134,21 @@ module Pipe = struct
(* write a value *) (* write a value *)
let write t x = let write t x =
if is_closed t then Lwt.fail Closed if is_closed t then Lwt.fail Closed
else if Queue.length t.buf < t.max_size else if Queue.length t.readers > 0
then ( then (
Queue.push (`Item x) t.buf; let send = Queue.pop t.readers in
Lwt.return_unit (* into buffer, do not wait *) Lwt.wakeup send x;
) else ( Lwt.return_unit
)
else if Queue.length t.writers < t.max_size
then (
Queue.push x t.writers;
Lwt.return_unit (* into buffer, do not wait *)
)
else (
let is_done, signal_done = Lwt.wait () in let is_done, signal_done = Lwt.wait () in
Queue.push (`Block (x, signal_done)) t.buf; Queue.push (x, signal_done) t.blocked_writers;
is_done is_done (* block *)
) )
let rec connect_rec r w = let rec connect_rec r w =
@ -280,7 +299,12 @@ module Reader = struct
c c
end end
let connect = Pipe.connect let connect ?(ownership=`None) a b =
Pipe.connect a b;
match ownership with
| `None -> ()
| `InOwnsOut -> Pipe.link_close b ~after:a
| `OutOwnsIn -> Pipe.link_close a ~after:b
(** {2 Conversions} *) (** {2 Conversions} *)

View file

@ -28,11 +28,29 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Stream processing using: Stream processing using:
{- Pipe: a possibly buffered channel through which readers and writer communicate} - Pipe: a possibly buffered channel through which readers and writer communicate
{- Reader: accepts values, produces effects} - Reader: accepts values, produces effects
{- Writer: yield values} - Writer: yield values
@since NEXT_RELEASE Examples:
{[
#require "containers.lwt";;
module P = Containers_lwt.Lwt_pipe;;
let p1 =
P.of_list CCList.(1 -- 100)
|> P.Reader.map ~f:string_of_int;;
Lwt_io.with_file ~mode:Lwt_io.output "/tmp/foo"
(fun oc ->
let p2 = P.IO.write_lines oc in
P.connect ~ownership:`InOwnsOut p1 p2;
P.Pipe.wait p2
);;
]}
@since NEXT_RELEASE
*) *)
type 'a or_error = [`Ok of 'a | `Error of string] type 'a or_error = [`Ok of 'a | `Error of string]
@ -130,8 +148,10 @@ module Reader : sig
val append : 'a t -> 'a t -> 'a t val append : 'a t -> 'a t -> 'a t
end end
val connect : 'a Reader.t -> 'a Writer.t -> unit val connect : ?ownership:[`None | `InOwnsOut | `OutOwnsIn] ->
(** Handy synonym to {!Pipe.connect} *) 'a Reader.t -> 'a Writer.t -> unit
(** Handy synonym to {!Pipe.connect}, with additional resource management.
@param own determines which pipes owns which *)
(** {2 Conversions} *) (** {2 Conversions} *)