diff --git a/_oasis b/_oasis index 30340442..7e197a54 100644 --- a/_oasis +++ b/_oasis @@ -131,7 +131,7 @@ Library "containers_thread" Library "containers_lwt" Path: src/lwt - Modules: Lwt_automaton, Lwt_actor + Modules: Lwt_automaton, Lwt_actor, Lwt_klist Pack: true FindlibName: lwt FindlibParent: containers @@ -192,7 +192,7 @@ Executable run_qtest Install: false CompiledObject: native MainIs: run_qtest.ml - Build$: flag(tests) + Build$: flag(tests) && flag(bigarray) BuildDepends: containers, containers.misc, containers.string, containers.iter, containers.io, containers.advanced, containers.sexp, containers.bigarray, diff --git a/src/lwt/lwt_klist.ml b/src/lwt/lwt_klist.ml new file mode 100644 index 00000000..30b3154b --- /dev/null +++ b/src/lwt/lwt_klist.ml @@ -0,0 +1,159 @@ + +(* +copyright (c) 2013-2014, simon cruanes +all rights reserved. + +redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. redistributions in binary +form must reproduce the above copyright notice, this list of conditions and the +following disclaimer in the documentation and/or other materials provided with +the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*) + +(** {1 Functional streams for Lwt} *) + +type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t +type 'a stream = 'a t + +let (>>=) = Lwt.(>>=) +let (>|=) = Lwt.(>|=) + +let empty = Lwt.return `Nil + +let cons x l = Lwt.return (`Cons (x, fun () -> l)) + +let rec of_list_rec l () = match l with + | [] -> empty + | x :: tl -> Lwt.return (`Cons (x, of_list_rec tl)) + +let of_list l : 'a t = of_list_rec l () + +let rec create_rec f () : 'a t = + f () >|= function + | None -> `Nil + | Some x -> `Cons (x, create_rec f) + +let create f = create_rec f () + +let next l = + l >|= function + | `Nil -> None + | `Cons (x, tl) -> Some (x, tl()) + +let next_exn l = + l >>= function + | `Nil -> Lwt.fail Not_found + | `Cons (x, tl) -> Lwt.return (x, tl ()) + +let rec map_rec f l () = + l >|= function + | `Nil -> `Nil + | `Cons (x, tl) -> `Cons (f x, map_rec f (tl ())) + +let map f (l:'a t) : 'b t = map_rec f l () + +let rec map_s_rec (f:'a -> 'b Lwt.t) l () = + l >>= function + | `Nil -> empty + | `Cons (x, tl) -> + f x >|= fun y -> `Cons (y, map_s_rec f (tl ())) + +let map_s f l = map_s_rec f l () + +let rec append_rec l1 l2 () = + l1 >>= function + | `Nil -> l2 + | `Cons (x, tl1) -> Lwt.return (`Cons (x, append_rec (tl1 ()) l2)) + +let append l1 l2 = append_rec l1 l2 () + +let rec flat_map f l = + l >>= function + | `Nil -> empty + | `Cons (x, tl) -> append (f x) (flat_map f (tl ())) + +let rec iter f l = + l >>= function + | `Nil -> Lwt.return_unit + | `Cons (x, tl) -> f x; iter f (tl ()) + +let rec iter_s f l = + l >>= function + | `Nil -> Lwt.return_unit + | `Cons (x, tl) -> f x >>= fun () -> iter_s f (tl ()) + +module Queue = struct + type 'a t = { + bufsize : int; + cond : unit Lwt_condition.t; + q : 'a Queue.t; + mutable str : 'a stream; + mutable closed : bool; + } + + (* function that waits for the next element, and recursively, + returning a stream of values *) + let rec make_stream_ t () : 'a stream = + if t.closed then empty + else if not (Queue.is_empty t.q) + then ( + let x = Queue.pop t.q in + Lwt_condition.signal t.cond (); + Lwt.return (`Cons (x, make_stream_ t)) + ) + else + (* wait for something to happen *) + Lwt_condition.wait t.cond >>= make_stream_ t + + let create ?(bufsize=128) () = + let t = { + bufsize; + q = Queue.create (); + str = empty; + cond = Lwt_condition.create (); + closed = false; + } in + t.str <- make_stream_ t (); + t + + exception ClosedQueue + + let close t = + if not t.closed then ( + t.closed <- true; + Lwt_condition.signal t.cond () + ) + + let rec push_rec t x () = + if t.closed then raise ClosedQueue; + if Queue.length t.q = t.bufsize + then Lwt_condition.wait t.cond >>= push_rec t x + else ( + Queue.push x t.q; + Lwt.return_unit + ) + + let push t x = push_rec t x () + + let to_stream t = t.str + + let take t = assert false + let take_exn t = assert false + +end + + diff --git a/src/lwt/lwt_klist.mli b/src/lwt/lwt_klist.mli new file mode 100644 index 00000000..766de3f6 --- /dev/null +++ b/src/lwt/lwt_klist.mli @@ -0,0 +1,88 @@ + +(* +copyright (c) 2013-2014, simon cruanes +all rights reserved. + +redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. redistributions in binary +form must reproduce the above copyright notice, this list of conditions and the +following disclaimer in the documentation and/or other materials provided with +the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*) + +(** {1 Functional streams for Lwt} *) + +type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t +type 'a stream = 'a t + +val empty : 'a t + +val cons : 'a -> 'a t -> 'a t + +val of_list : 'a list -> 'a t + +val create : (unit -> 'a option Lwt.t) -> 'a t +(** Create from a function that returns the next element *) + +val next : 'a t -> ('a * 'a t) option Lwt.t +(** Obtain the next element *) + +val next_exn : 'a t -> ('a * 'a t) Lwt.t +(** Obtain the next element or fail + @raise Not_found if the stream is empty *) + +val map : ('a -> 'b) -> 'a t -> 'b t +val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t + +val append : 'a t -> 'a t -> 'a t + +val flat_map : ('a -> 'b t) -> 'a t -> 'b t + +val iter : ('a -> unit) -> 'a t -> unit Lwt.t +val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + +(** {2 Bounded Queue} *) +module Queue : sig + type 'a t + + val create : ?bufsize:int -> unit -> 'a t + (** Create a new queue, with the given internal buffer size. + If [bufsize=0] the queue is fully blocking *) + + exception ClosedQueue + + val close : _ t -> unit + (** Close the queue. Elements remaining in the queue will be available for + consumption, say, by {!get}; pushing an element will raise {!ClosedQueue} *) + + val push : 'a t -> 'a -> unit Lwt.t + (** Push an element at the back of the queue. Returns immediately + if the queue isn't full, blocks until an element is consumed otherwise *) + + val take : 'a t -> 'a option Lwt.t + (** Take the next element. May block if no element is currently available. *) + + val take_exn : 'a t -> 'a Lwt.t + (** Same as {!get} but fails if the queue is closed. + @raise ClosedQueue if the queue gets closed before an element is pushed *) + + val to_stream : 'a t -> 'a stream + (** Stream of elements pushed into the queue *) + + (* TODO: fix semantics; e.g. notion of "cursor" with several cursors + on one queue *) +end