wip: Lwt_klist, a functional stream for Lwt

This commit is contained in:
Simon Cruanes 2015-01-30 19:11:05 +01:00
parent a1a8a8252c
commit 21c90a209d
3 changed files with 249 additions and 2 deletions

4
_oasis
View file

@ -131,7 +131,7 @@ Library "containers_thread"
Library "containers_lwt" Library "containers_lwt"
Path: src/lwt Path: src/lwt
Modules: Lwt_automaton, Lwt_actor Modules: Lwt_automaton, Lwt_actor, Lwt_klist
Pack: true Pack: true
FindlibName: lwt FindlibName: lwt
FindlibParent: containers FindlibParent: containers
@ -192,7 +192,7 @@ Executable run_qtest
Install: false Install: false
CompiledObject: native CompiledObject: native
MainIs: run_qtest.ml MainIs: run_qtest.ml
Build$: flag(tests) Build$: flag(tests) && flag(bigarray)
BuildDepends: containers, containers.misc, containers.string, containers.iter, BuildDepends: containers, containers.misc, containers.string, containers.iter,
containers.io, containers.advanced, containers.sexp, containers.io, containers.advanced, containers.sexp,
containers.bigarray, containers.bigarray,

159
src/lwt/lwt_klist.ml Normal file
View file

@ -0,0 +1,159 @@
(*
copyright (c) 2013-2014, simon cruanes
all rights reserved.
redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Functional streams for Lwt} *)
type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t
type 'a stream = 'a t
let (>>=) = Lwt.(>>=)
let (>|=) = Lwt.(>|=)
let empty = Lwt.return `Nil
let cons x l = Lwt.return (`Cons (x, fun () -> l))
let rec of_list_rec l () = match l with
| [] -> empty
| x :: tl -> Lwt.return (`Cons (x, of_list_rec tl))
let of_list l : 'a t = of_list_rec l ()
let rec create_rec f () : 'a t =
f () >|= function
| None -> `Nil
| Some x -> `Cons (x, create_rec f)
let create f = create_rec f ()
let next l =
l >|= function
| `Nil -> None
| `Cons (x, tl) -> Some (x, tl())
let next_exn l =
l >>= function
| `Nil -> Lwt.fail Not_found
| `Cons (x, tl) -> Lwt.return (x, tl ())
let rec map_rec f l () =
l >|= function
| `Nil -> `Nil
| `Cons (x, tl) -> `Cons (f x, map_rec f (tl ()))
let map f (l:'a t) : 'b t = map_rec f l ()
let rec map_s_rec (f:'a -> 'b Lwt.t) l () =
l >>= function
| `Nil -> empty
| `Cons (x, tl) ->
f x >|= fun y -> `Cons (y, map_s_rec f (tl ()))
let map_s f l = map_s_rec f l ()
let rec append_rec l1 l2 () =
l1 >>= function
| `Nil -> l2
| `Cons (x, tl1) -> Lwt.return (`Cons (x, append_rec (tl1 ()) l2))
let append l1 l2 = append_rec l1 l2 ()
let rec flat_map f l =
l >>= function
| `Nil -> empty
| `Cons (x, tl) -> append (f x) (flat_map f (tl ()))
let rec iter f l =
l >>= function
| `Nil -> Lwt.return_unit
| `Cons (x, tl) -> f x; iter f (tl ())
let rec iter_s f l =
l >>= function
| `Nil -> Lwt.return_unit
| `Cons (x, tl) -> f x >>= fun () -> iter_s f (tl ())
module Queue = struct
type 'a t = {
bufsize : int;
cond : unit Lwt_condition.t;
q : 'a Queue.t;
mutable str : 'a stream;
mutable closed : bool;
}
(* function that waits for the next element, and recursively,
returning a stream of values *)
let rec make_stream_ t () : 'a stream =
if t.closed then empty
else if not (Queue.is_empty t.q)
then (
let x = Queue.pop t.q in
Lwt_condition.signal t.cond ();
Lwt.return (`Cons (x, make_stream_ t))
)
else
(* wait for something to happen *)
Lwt_condition.wait t.cond >>= make_stream_ t
let create ?(bufsize=128) () =
let t = {
bufsize;
q = Queue.create ();
str = empty;
cond = Lwt_condition.create ();
closed = false;
} in
t.str <- make_stream_ t ();
t
exception ClosedQueue
let close t =
if not t.closed then (
t.closed <- true;
Lwt_condition.signal t.cond ()
)
let rec push_rec t x () =
if t.closed then raise ClosedQueue;
if Queue.length t.q = t.bufsize
then Lwt_condition.wait t.cond >>= push_rec t x
else (
Queue.push x t.q;
Lwt.return_unit
)
let push t x = push_rec t x ()
let to_stream t = t.str
let take t = assert false
let take_exn t = assert false
end

88
src/lwt/lwt_klist.mli Normal file
View file

@ -0,0 +1,88 @@
(*
copyright (c) 2013-2014, simon cruanes
all rights reserved.
redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Functional streams for Lwt} *)
type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t
type 'a stream = 'a t
val empty : 'a t
val cons : 'a -> 'a t -> 'a t
val of_list : 'a list -> 'a t
val create : (unit -> 'a option Lwt.t) -> 'a t
(** Create from a function that returns the next element *)
val next : 'a t -> ('a * 'a t) option Lwt.t
(** Obtain the next element *)
val next_exn : 'a t -> ('a * 'a t) Lwt.t
(** Obtain the next element or fail
@raise Not_found if the stream is empty *)
val map : ('a -> 'b) -> 'a t -> 'b t
val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t
val append : 'a t -> 'a t -> 'a t
val flat_map : ('a -> 'b t) -> 'a t -> 'b t
val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
(** {2 Bounded Queue} *)
module Queue : sig
type 'a t
val create : ?bufsize:int -> unit -> 'a t
(** Create a new queue, with the given internal buffer size.
If [bufsize=0] the queue is fully blocking *)
exception ClosedQueue
val close : _ t -> unit
(** Close the queue. Elements remaining in the queue will be available for
consumption, say, by {!get}; pushing an element will raise {!ClosedQueue} *)
val push : 'a t -> 'a -> unit Lwt.t
(** Push an element at the back of the queue. Returns immediately
if the queue isn't full, blocks until an element is consumed otherwise *)
val take : 'a t -> 'a option Lwt.t
(** Take the next element. May block if no element is currently available. *)
val take_exn : 'a t -> 'a Lwt.t
(** Same as {!get} but fails if the queue is closed.
@raise ClosedQueue if the queue gets closed before an element is pushed *)
val to_stream : 'a t -> 'a stream
(** Stream of elements pushed into the queue *)
(* TODO: fix semantics; e.g. notion of "cursor" with several cursors
on one queue *)
end