diff --git a/src/lwt/lwt_pipe.ml b/src/lwt/lwt_pipe.ml index 5fbe401b..eb02d9c0 100644 --- a/src/lwt/lwt_pipe.ml +++ b/src/lwt/lwt_pipe.ml @@ -59,198 +59,199 @@ let ret_end = Lwt.return `End exception Closed -module Pipe = struct - type ('a, +'perm) t = { - close : unit Lwt.u; - closed : unit Lwt.t; - readers : 'a step Lwt.u Queue.t; (* readers *) - writers : 'a step Queue.t; - blocked_writers : ('a step * unit Lwt.u) Queue.t; (* blocked writers *) - max_size : int; - mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *) - } constraint 'perm = [< `r | `w] +type ('a, +'perm) t = { + close : unit Lwt.u; + closed : unit Lwt.t; + readers : 'a step Lwt.u Queue.t; (* readers *) + writers : 'a step Queue.t; + blocked_writers : ('a step * unit Lwt.u) Queue.t; (* blocked writers *) + max_size : int; + mutable keep : unit Lwt.t list; (* do not GC, and wait for completion *) +} constraint 'perm = [< `r | `w] - let create ?(max_size=0) () = - let closed, close = Lwt.wait () in - { - close; - closed; - readers = Queue.create (); - writers = Queue.create (); - blocked_writers = Queue.create (); - max_size; - keep=[]; - } +type ('a, 'perm) pipe = ('a, 'perm) t - let keep p fut = p.keep <- fut :: p.keep +let create ?(max_size=0) () = + let closed, close = Lwt.wait () in + { + close; + closed; + readers = Queue.create (); + writers = Queue.create (); + blocked_writers = Queue.create (); + max_size; + keep=[]; + } - let is_closed p = not (Lwt.is_sleeping p.closed) +let keep p fut = p.keep <- fut :: p.keep - let close p = - if is_closed p then Lwt.return_unit +let is_closed p = not (Lwt.is_sleeping p.closed) + +let close p = + if is_closed p then Lwt.return_unit + else ( + Lwt.wakeup p.close (); (* evaluate *) + Lwt.join p.keep; + ) + +let close_async p = Lwt.async (fun () -> close p) + +let wait p = Lwt.map (fun _ -> ()) p.closed + +(* try to take next element from writers buffer *) +let try_read t = + if Queue.is_empty t.writers + then if Queue.is_empty t.blocked_writers + then None else ( - Lwt.wakeup p.close (); (* evaluate *) - Lwt.join p.keep; - ) - - let close_async p = Lwt.async (fun () -> close p) - - let wait p = Lwt.map (fun _ -> ()) p.closed - - (* try to take next element from writers buffer *) - let try_read t = - if Queue.is_empty t.writers - then if Queue.is_empty t.blocked_writers - then None - else ( - assert (t.max_size = 0); - let x, signal_done = Queue.pop t.blocked_writers in - Lwt.wakeup signal_done (); - Some x - ) - else ( - let x = Queue.pop t.writers in - (* some writer may unblock *) - if not (Queue.is_empty t.blocked_writers) && Queue.length t.writers < t.max_size then ( - let y, signal_done = Queue.pop t.blocked_writers in - Queue.push y t.writers; - Lwt.wakeup signal_done (); - ); + assert (t.max_size = 0); + let x, signal_done = Queue.pop t.blocked_writers in + Lwt.wakeup signal_done (); Some x ) + else ( + let x = Queue.pop t.writers in + (* some writer may unblock *) + if not (Queue.is_empty t.blocked_writers) && Queue.length t.writers < t.max_size then ( + let y, signal_done = Queue.pop t.blocked_writers in + Queue.push y t.writers; + Lwt.wakeup signal_done (); + ); + Some x + ) - (* read next one *) - let read t = match try_read t with - | None when is_closed t -> ret_end (* end of stream *) - | None -> - let fut, send = Lwt.wait () in - Queue.push send t.readers; - fut - | Some x -> Lwt.return x +(* read next one *) +let read t = match try_read t with + | None when is_closed t -> ret_end (* end of stream *) + | None -> + let fut, send = Lwt.wait () in + Queue.push send t.readers; + fut + | Some x -> Lwt.return x - (* TODO: signal writers when their value has less than max_size - steps before being read *) - - (* write a value *) - let write t x = - if is_closed t then Lwt.fail Closed - else if Queue.length t.readers > 0 - then ( - let send = Queue.pop t.readers in - Lwt.wakeup send x; - Lwt.return_unit - ) - else if Queue.length t.writers < t.max_size - then ( - Queue.push x t.writers; - Lwt.return_unit (* into buffer, do not wait *) - ) - else ( - let is_done, signal_done = Lwt.wait () in - Queue.push (x, signal_done) t.blocked_writers; - is_done (* block *) +(* write a value *) +let write_step t x = + if is_closed t then Lwt.fail Closed + else if Queue.length t.readers > 0 + then ( + (* some reader waits, synchronize now *) + let send = Queue.pop t.readers in + Lwt.wakeup send x; + Lwt.return_unit ) + else if Queue.length t.writers < t.max_size + then ( + Queue.push x t.writers; + Lwt.return_unit (* into buffer, do not wait *) + ) + else ( + (* block until the queue isn't full anymore *) + let is_done, signal_done = Lwt.wait () in + Queue.push (x, signal_done) t.blocked_writers; + is_done (* block *) + ) - let rec connect_rec r w = - read r >>= function - | `End -> Lwt.return_unit - | `Error _ as step -> write w step - | `Ok _ as step -> - write w step >>= fun () -> - connect_rec r w +let rec connect_rec r w = + read r >>= function + | `End -> Lwt.return_unit + | `Error _ as step -> write_step w step + | `Ok _ as step -> + write_step w step >>= fun () -> + connect_rec r w - let connect a b = - let fut = connect_rec a b in - keep b fut +(* close a when b closes *) +let link_close p ~after = + Lwt.on_termination after.closed + (fun _ -> close_async p) - (* close a when b closes *) - let link_close p ~after = - Lwt.on_termination after.closed - (fun _ -> close_async p) +let connect ?(ownership=`None) a b = + let fut = connect_rec a b in + keep b fut; + match ownership with + | `None -> () + | `InOwnsOut -> link_close b ~after:a + | `OutOwnsIn -> link_close a ~after:b - (* close a when every member of after closes *) - let link_close_l p ~after = - let n = ref (List.length after) in - List.iter - (fun p' -> Lwt.on_termination p'.closed - (fun _ -> - decr n; - if !n = 0 then close_async p - ) - ) after -end +(* close a when every member of after closes *) +let link_close_l p ~after = + let n = ref (List.length after) in + List.iter + (fun p' -> Lwt.on_termination p'.closed + (fun _ -> + decr n; + if !n = 0 then close_async p + ) + ) after + +let write_error t msg = write_step t (`Error msg) + +let write t x = write_step t (`Ok x) + +let rec write_list t l = match l with + | [] -> Lwt.return_unit + | x :: tail -> + write t x >>= fun () -> write_list t tail module Writer = struct - type 'a t = ('a, [`w]) Pipe.t - - let write t x = Pipe.write t (`Ok x) - - let write_error t msg = Pipe.write t (`Error msg) - - let rec write_list t l = match l with - | [] -> Lwt.return_unit - | x :: tail -> - write t x >>= fun () -> write_list t tail + type 'a t = ('a, [`w]) pipe let map ~f a = - let b = Pipe.create() in + let b = create() in let rec fwd () = - Pipe.read b >>= function + read b >>= function | `Ok x -> write a (f x) >>= fwd - | `Error msg -> write_error a msg >>= fun _ -> Pipe.close a + | `Error msg -> write_error a msg >>= fun _ -> close a | `End -> Lwt.return_unit in - Pipe.keep b (fwd()); + keep b (fwd()); (* when a gets closed, close b too *) - Pipe.link_close b ~after:a; + link_close b ~after:a; b let send_all l = if l = [] then invalid_arg "send_all"; - let res = Pipe.create () in + let res = create () in let rec fwd () = - Pipe.read res >>= function + read res >>= function | `End -> Lwt.return_unit | `Ok x -> Lwt_list.iter_p (fun p -> write p x) l >>= fwd | `Error msg -> Lwt_list.iter_p (fun p -> write_error p msg) l >>= fwd in (* do not GC before res dies; close res when any outputx is closed *) - Pipe.keep res (fwd ()); - List.iter (fun out -> Pipe.link_close res ~after:out) l; + keep res (fwd ()); + List.iter (fun out -> link_close res ~after:out) l; res let send_both a b = send_all [a; b] end module Reader = struct - type 'a t = ('a, [`r]) Pipe.t - - let read = Pipe.read + type 'a t = ('a, [`r]) pipe let map ~f a = - let b = Pipe.create () in + let b = create () in let rec fwd () = - Pipe.read a >>= function - | `Ok x -> Pipe.write b (`Ok (f x)) >>= fwd - | (`Error _) as e -> Pipe.write b e >>= fun _ -> Pipe.close b - | `End -> Pipe.close b + read a >>= function + | `Ok x -> write_step b (`Ok (f x)) >>= fwd + | (`Error _) as e -> write_step b e >>= fun _ -> close b + | `End -> close b in - Pipe.keep b (fwd()); + keep b (fwd()); b let filter_map ~f a = - let b = Pipe.create () in + let b = create () in let rec fwd () = - Pipe.read a >>= function + read a >>= function | `Ok x -> begin match f x with | None -> fwd() - | Some y -> Pipe.write b (`Ok y) >>= fwd + | Some y -> write_step b (`Ok y) >>= fwd end - | (`Error _) as e -> Pipe.write b e >>= fun _ -> Pipe.close b - | `End -> Pipe.close b + | (`Error _) as e -> write_step b e >>= fun _ -> close b + | `End -> close b in - Pipe.keep b (fwd()); + keep b (fwd()); b let rec fold ~f ~x t = @@ -280,61 +281,54 @@ module Reader = struct let merge_all l = if l = [] then invalid_arg "merge_all"; - let res = Pipe.create () in - List.iter (fun p -> Pipe.connect p res) l; + let res = create () in + List.iter (fun p -> connect p res) l; (* connect res' input to all members of l; close res when they all close *) - Pipe.link_close_l res ~after:l; + link_close_l res ~after:l; res let merge_both a b = merge_all [a; b] let append a b = - let c = Pipe.create () in - Pipe.connect a c; - Lwt.on_success (Pipe.wait a) + let c = create () in + connect a c; + Lwt.on_success (wait a) (fun () -> - Pipe.connect b c; - Pipe.link_close c ~after:b (* once a and b finished, c is too *) + connect b c; + link_close c ~after:b (* once a and b finished, c is too *) ); c end -let connect ?(ownership=`None) a b = - Pipe.connect a b; - match ownership with - | `None -> () - | `InOwnsOut -> Pipe.link_close b ~after:a - | `OutOwnsIn -> Pipe.link_close a ~after:b - (** {2 Conversions} *) let of_list l : _ Reader.t = - let p = Pipe.create ~max_size:0 () in - Pipe.keep p (Lwt_list.iter_s (Writer.write p) l >>= fun () -> Pipe.close p); + let p = create ~max_size:0 () in + keep p (Lwt_list.iter_s (write p) l >>= fun () -> close p); p let of_array a = - let p = Pipe.create ~max_size:0 () in + let p = create ~max_size:0 () in let rec send i = - if i = Array.length a then Pipe.close p + if i = Array.length a then close p else ( - Writer.write p a.(i) >>= fun () -> + write p a.(i) >>= fun () -> send (i+1) ) in - Pipe.keep p (send 0); + keep p (send 0); p let of_string a = - let p = Pipe.create ~max_size:0 () in + let p = create ~max_size:0 () in let rec send i = - if i = String.length a then Pipe.close p + if i = String.length a then close p else ( - Writer.write p (String.get a i) >>= fun () -> + write p (String.get a i) >>= fun () -> send (i+1) ) in - Pipe.keep p (send 0); + keep p (send 0); p let to_list_rev r = @@ -348,16 +342,16 @@ let to_list_exn r = | `Ok x -> Lwt.return x let to_buffer buf = - let p = Pipe.create () in - Pipe.keep p ( + let p = create () in + keep p ( Reader.iter ~f:(fun c -> Buffer.add_char buf c) p >>= fun _ -> Lwt.return_unit ); p let to_buffer_str buf = - let p = Pipe.create () in - Pipe.keep p ( + let p = create () in + keep p ( Reader.iter ~f:(fun s -> Buffer.add_string buf s) p >>= fun _ -> Lwt.return_unit ); @@ -368,41 +362,41 @@ let to_buffer_str buf = module IO = struct let read ?(bufsize=4096) ic : _ Reader.t = let buf = Bytes.make bufsize ' ' in - let p = Pipe.create ~max_size:0 () in + let p = create ~max_size:0 () in let rec send() = Lwt_io.read_into ic buf 0 bufsize >>= fun n -> - if n = 0 then Pipe.close p + if n = 0 then close p else - Writer.write p (Bytes.sub_string buf 0 n) >>= fun () -> + write p (Bytes.sub_string buf 0 n) >>= fun () -> send () in Lwt.async send; p let read_lines ic = - let p = Pipe.create () in + let p = create () in let rec send () = Lwt_io.read_line_opt ic >>= function - | None -> Pipe.close p - | Some line -> Writer.write p line >>= fun () -> send () + | None -> close p + | Some line -> write p line >>= fun () -> send () in Lwt.async send; p let write oc = - let p = Pipe.create () in - Pipe.keep p ( + let p = create () in + keep p ( Reader.iter_s ~f:(Lwt_io.write oc) p >>= fun _ -> Lwt_io.flush oc >>= fun () -> - Pipe.close p + close p ); p let write_lines oc = - let p = Pipe.create () in - Pipe.keep p ( + let p = create () in + keep p ( Reader.iter_s ~f:(Lwt_io.write_line oc) p >>= fun _ -> Lwt_io.flush oc >>= fun () -> - Pipe.close p + close p ); p end diff --git a/src/lwt/lwt_pipe.mli b/src/lwt/lwt_pipe.mli index 90b18c2b..a24debae 100644 --- a/src/lwt/lwt_pipe.mli +++ b/src/lwt/lwt_pipe.mli @@ -46,7 +46,7 @@ Lwt_io.with_file ~mode:Lwt_io.output "/tmp/foo" (fun oc -> let p2 = P.IO.write_lines oc in P.connect ~ownership:`InOwnsOut p1 p2; - P.Pipe.wait p2 + P.wait p2 );; ]} @@ -66,49 +66,59 @@ end exception Closed -module Pipe : sig - type ('a, +'perm) t constraint 'perm = [< `r | `w] - (** A pipe between producers of values of type 'a, and consumers of values - of type 'a. *) +type ('a, +'perm) t constraint 'perm = [< `r | `w] +(** A pipe between producers of values of type 'a, and consumers of values + of type 'a. *) - val keep : _ t -> unit Lwt.t -> unit - (** [keep p fut] adds a pointer from [p] to [fut] so that [fut] is not - garbage-collected before [p] *) +type ('a, 'perm) pipe = ('a, 'perm) t - val is_closed : _ t -> bool +val keep : _ t -> unit Lwt.t -> unit +(** [keep p fut] adds a pointer from [p] to [fut] so that [fut] is not + garbage-collected before [p] *) - val close : _ t -> unit Lwt.t - (** [close p] closes [p], which will not accept input anymore. - This sends [`End] to all readers connected to [p] *) +val is_closed : _ t -> bool - val close_async : _ t -> unit - (** Same as {!close} but closes in the background *) +val close : _ t -> unit Lwt.t +(** [close p] closes [p], which will not accept input anymore. + This sends [`End] to all readers connected to [p] *) - val wait : _ t -> unit Lwt.t - (** Evaluates once the pipe closes *) +val close_async : _ t -> unit +(** Same as {!close} but closes in the background *) - val create : ?max_size:int -> unit -> ('a, 'perm) t - (** Create a new pipe. - @param max_size size of internal buffer. Default 0. *) +val wait : _ t -> unit Lwt.t +(** Evaluates once the pipe closes *) - val connect : ('a, [>`r]) t -> ('a, [>`w]) t -> unit - (** [connect p1 p2] forwards every item output by [p1] into [p2]'s input - until [p1] is closed. *) -end +val create : ?max_size:int -> unit -> ('a, 'perm) t +(** Create a new pipe. + @param max_size size of internal buffer. Default 0. *) + +val connect : ?ownership:[`None | `InOwnsOut | `OutOwnsIn] -> + ('a, [>`r]) t -> ('a, [>`w]) t -> unit +(** [connect p1 p2] forwards every item output by [p1] into [p2]'s input + until [p1] is closed. + @param own determines which pipes owns which (the owner, when it + closes, also closes the ownee) *) + +val link_close : _ t -> after:_ t -> unit +(** [link_close p ~after] will close [p] when [after] closes. + if [after] is closed already, closes [p] immediately *) + +val read : ('a, [>`r]) t -> 'a step Lwt.t +(** Read the next value from a Pipe *) + +val write : ('a, [>`w]) t -> 'a -> unit Lwt.t +(** @raise Pipe.Closed if the writer is closed *) + +val write_list : ('a, [>`w]) t -> 'a list -> unit Lwt.t +(** @raise Pipe.Closed if the writer is closed *) + +val write_error : (_, [>`w]) t -> string -> unit Lwt.t +(** @raise Pipe.Closed if the writer is closed *) module Writer : sig - type 'a t = ('a, [`w]) Pipe.t + type 'a t = ('a, [`w]) pipe - val write : 'a t -> 'a -> unit Lwt.t - (** @raise Pipe.Closed if the writer is closed *) - - val write_list : 'a t -> 'a list -> unit Lwt.t - (** @raise Pipe.Closed if the writer is closed *) - - val write_error : _ t -> string -> unit Lwt.t - (** @raise Pipe.Closed if the writer is closed *) - - val map : f:('a -> 'b) -> 'b t -> 'a t + val map : f:('a -> 'b) -> ('b, [>`w]) pipe -> 'a t (** Map values before writing them *) val send_both : 'a t -> 'a t -> 'a t @@ -122,11 +132,9 @@ module Writer : sig end module Reader : sig - type 'a t = ('a, [`r]) Pipe.t + type 'a t = ('a, [`r]) pipe - val read : 'a t -> 'a step Lwt.t - - val map : f:('a -> 'b) -> 'a t -> 'b t + val map : f:('a -> 'b) -> ('a, [>`r]) pipe -> 'b t val filter_map : f:('a -> 'b option) -> 'a t -> 'b t @@ -146,13 +154,10 @@ module Reader : sig @raise Invalid_argument if the list is empty *) val append : 'a t -> 'a t -> 'a t + (** [append a b] reads from [a] until [a] closes, then reads from [b] + and closes when [b] closes *) end -val connect : ?ownership:[`None | `InOwnsOut | `OutOwnsIn] -> - 'a Reader.t -> 'a Writer.t -> unit -(** Handy synonym to {!Pipe.connect}, with additional resource management. - @param own determines which pipes owns which *) - (** {2 Conversions} *) val of_list : 'a list -> 'a Reader.t