From aef87c148d2675fea2a70a5c9b93ff96f4828931 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 19 Feb 2015 19:50:36 +0100 Subject: [PATCH] wip: lwt_pipe --- src/lwt/lwt_pipe.ml | 76 +++++++++++++++++++++++++++++--------------- src/lwt/lwt_pipe.mli | 32 +++++++++++++++---- 2 files changed, 76 insertions(+), 32 deletions(-) diff --git a/src/lwt/lwt_pipe.ml b/src/lwt/lwt_pipe.ml index c48209c1..5fbe401b 100644 --- a/src/lwt/lwt_pipe.ml +++ b/src/lwt/lwt_pipe.ml @@ -63,12 +63,10 @@ module Pipe = struct type ('a, +'perm) t = { close : unit Lwt.u; closed : unit Lwt.t; - buf : - [`Item of 'a step - | `Block of 'a step * unit Lwt.u - ] Queue.t; (* actions queued *) + readers : 'a step Lwt.u Queue.t; (* readers *) + writers : 'a step Queue.t; + blocked_writers : ('a step * unit Lwt.u) Queue.t; (* blocked writers *) max_size : int; - box : 'a step Lwt.u Lwt_mvar.t; mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *) } constraint 'perm = [< `r | `w] @@ -77,9 +75,10 @@ module Pipe = struct { close; closed; - buf = Queue.create (); + readers = Queue.create (); + writers = Queue.create (); + blocked_writers = Queue.create (); max_size; - box=Lwt_mvar.create_empty (); keep=[]; } @@ -98,23 +97,36 @@ module Pipe = struct let wait p = Lwt.map (fun _ -> ()) p.closed - (* try to take next element from buffer *) - let try_next_buf t = - if Queue.is_empty t.buf then None - else Some (Queue.pop t.buf) + (* try to take next element from writers buffer *) + let try_read t = + if Queue.is_empty t.writers + then if Queue.is_empty t.blocked_writers + then None + else ( + assert (t.max_size = 0); + let x, signal_done = Queue.pop t.blocked_writers in + Lwt.wakeup signal_done (); + Some x + ) + else ( + let x = Queue.pop t.writers in + (* some writer may unblock *) + if not (Queue.is_empty t.blocked_writers) && Queue.length t.writers < t.max_size then ( + let y, signal_done = Queue.pop t.blocked_writers in + Queue.push y t.writers; + Lwt.wakeup signal_done (); + ); + Some x + ) (* read next one *) - let read t = - match try_next_buf t with + let read t = match try_read t with | None when is_closed t -> ret_end (* end of stream *) | None -> let fut, send = Lwt.wait () in - Lwt_mvar.put t.box send >>= fun () -> + Queue.push send t.readers; fut - | Some (`Item x) -> Lwt.return x - | Some (`Block (x, signal_done)) -> - Lwt.wakeup signal_done (); (* signal the writer it's done *) - Lwt.return x + | Some x -> Lwt.return x (* TODO: signal writers when their value has less than max_size steps before being read *) @@ -122,14 +134,21 @@ module Pipe = struct (* write a value *) let write t x = if is_closed t then Lwt.fail Closed - else if Queue.length t.buf < t.max_size - then ( - Queue.push (`Item x) t.buf; - Lwt.return_unit (* into buffer, do not wait *) - ) else ( + else if Queue.length t.readers > 0 + then ( + let send = Queue.pop t.readers in + Lwt.wakeup send x; + Lwt.return_unit + ) + else if Queue.length t.writers < t.max_size + then ( + Queue.push x t.writers; + Lwt.return_unit (* into buffer, do not wait *) + ) + else ( let is_done, signal_done = Lwt.wait () in - Queue.push (`Block (x, signal_done)) t.buf; - is_done + Queue.push (x, signal_done) t.blocked_writers; + is_done (* block *) ) let rec connect_rec r w = @@ -280,7 +299,12 @@ module Reader = struct c end -let connect = Pipe.connect +let connect ?(ownership=`None) a b = + Pipe.connect a b; + match ownership with + | `None -> () + | `InOwnsOut -> Pipe.link_close b ~after:a + | `OutOwnsIn -> Pipe.link_close a ~after:b (** {2 Conversions} *) diff --git a/src/lwt/lwt_pipe.mli b/src/lwt/lwt_pipe.mli index ae10cbd7..90b18c2b 100644 --- a/src/lwt/lwt_pipe.mli +++ b/src/lwt/lwt_pipe.mli @@ -28,11 +28,29 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. Stream processing using: - {- Pipe: a possibly buffered channel through which readers and writer communicate} - {- Reader: accepts values, produces effects} - {- Writer: yield values} + - Pipe: a possibly buffered channel through which readers and writer communicate + - Reader: accepts values, produces effects + - Writer: yield values - @since NEXT_RELEASE +Examples: +{[ +#require "containers.lwt";; + +module P = Containers_lwt.Lwt_pipe;; + +let p1 = + P.of_list CCList.(1 -- 100) + |> P.Reader.map ~f:string_of_int;; + +Lwt_io.with_file ~mode:Lwt_io.output "/tmp/foo" + (fun oc -> + let p2 = P.IO.write_lines oc in + P.connect ~ownership:`InOwnsOut p1 p2; + P.Pipe.wait p2 + );; +]} + +@since NEXT_RELEASE *) type 'a or_error = [`Ok of 'a | `Error of string] @@ -130,8 +148,10 @@ module Reader : sig val append : 'a t -> 'a t -> 'a t end -val connect : 'a Reader.t -> 'a Writer.t -> unit -(** Handy synonym to {!Pipe.connect} *) +val connect : ?ownership:[`None | `InOwnsOut | `OutOwnsIn] -> + 'a Reader.t -> 'a Writer.t -> unit +(** Handy synonym to {!Pipe.connect}, with additional resource management. + @param own determines which pipes owns which *) (** {2 Conversions} *)