wip: lwt_klist

This commit is contained in:
Simon Cruanes 2015-02-19 19:35:53 +01:00
parent e41faaf91e
commit 89aded1311
3 changed files with 114 additions and 108 deletions

View file

@ -1,5 +1,5 @@
(* (*
copyright (c) 2013-2014, simon cruanes copyright (c) 2013-2015, simon cruanes
all rights reserved. all rights reserved.
redistribution and use in source and binary forms, with or without redistribution and use in source and binary forms, with or without

View file

@ -26,7 +26,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
(** {1 Functional streams for Lwt} *) (** {1 Functional streams for Lwt} *)
type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t type 'a t = [ `Nil | `Cons of 'a * 'a t ] Lwt.t
type 'a stream = 'a t type 'a stream = 'a t
let (>>=) = Lwt.(>>=) let (>>=) = Lwt.(>>=)
@ -34,126 +34,120 @@ let (>|=) = Lwt.(>|=)
let empty = Lwt.return `Nil let empty = Lwt.return `Nil
let cons x l = Lwt.return (`Cons (x, fun () -> l)) let cons x l = Lwt.return (`Cons (x, l))
let rec of_list_rec l () = match l with let rec create f : 'a t =
| [] -> empty let fut, wake = Lwt.wait () in
| x :: tl -> Lwt.return (`Cons (x, of_list_rec tl))
let of_list l : 'a t = of_list_rec l ()
let rec create_rec f () : 'a t =
f () >|= function f () >|= function
| None -> `Nil | None -> `Nil
| Some x -> `Cons (x, create_rec f) | Some x -> `Cons (x, create f)
and create_rec f () =
let create f = create_rec f () f () >|= function
| None -> `Nil
| Some x -> `Cons (x, create f)
let next l = let next l =
l >|= function l >|= function
| `Nil -> None | `Nil -> None
| `Cons (x, tl) -> Some (x, tl()) | `Cons (x, tl) -> Some (x, tl)
let next_exn l = let next_exn l =
l >>= function l >>= function
| `Nil -> Lwt.fail Not_found | `Nil -> Lwt.fail Not_found
| `Cons (x, tl) -> Lwt.return (x, tl ()) | `Cons (x, tl) -> Lwt.return (x, tl)
let rec map_rec f l () = let rec map f l =
l >|= function l >|= function
| `Nil -> `Nil | `Nil -> `Nil
| `Cons (x, tl) -> `Cons (f x, map_rec f (tl ())) | `Cons (x, tl) -> `Cons (f x, map f tl)
let map f (l:'a t) : 'b t = map_rec f l () let rec map_s (f:'a -> 'b Lwt.t) l =
let rec map_s_rec (f:'a -> 'b Lwt.t) l () =
l >>= function l >>= function
| `Nil -> empty | `Nil -> empty
| `Cons (x, tl) -> | `Cons (x, tl) ->
f x >|= fun y -> `Cons (y, map_s_rec f (tl ())) f x >|= fun y -> `Cons (y, map_s f tl)
let map_s f l = map_s_rec f l () let rec append l1 l2 =
let rec append_rec l1 l2 () =
l1 >>= function l1 >>= function
| `Nil -> l2 | `Nil -> l2
| `Cons (x, tl1) -> Lwt.return (`Cons (x, append_rec (tl1 ()) l2)) | `Cons (x, tl1) -> Lwt.return (`Cons (x, append tl1 l2))
let append l1 l2 = append_rec l1 l2 ()
let rec flat_map f l = let rec flat_map f l =
l >>= function l >>= function
| `Nil -> empty | `Nil -> empty
| `Cons (x, tl) -> append (f x) (flat_map f (tl ())) | `Cons (x, tl) -> append (f x) (flat_map f tl)
let rec filter_map f l =
l >>= function
| `Nil -> empty
| `Cons (x, tl) ->
match f x with
| None -> filter_map f tl
| Some y -> Lwt.return (`Cons (y, filter_map f tl))
let rec filter_map_s f l =
l >>= function
| `Nil -> empty
| `Cons (x, tl) ->
f x >>= function
| None -> filter_map_s f tl
| Some y -> Lwt.return (`Cons (y, filter_map_s f tl))
let rec iter f l = let rec iter f l =
l >>= function l >>= function
| `Nil -> Lwt.return_unit | `Nil -> Lwt.return_unit
| `Cons (x, tl) -> f x; iter f (tl ()) | `Cons (x, tl) -> f x; iter f tl
let rec iter_s f l = let rec iter_s f l =
l >>= function l >>= function
| `Nil -> Lwt.return_unit | `Nil -> Lwt.return_unit
| `Cons (x, tl) -> f x >>= fun () -> iter_s f (tl ()) | `Cons (x, tl) -> f x >>= fun () -> iter_s f tl
module Queue = struct let rec fold f acc l =
type 'a t = { l >>= function
bufsize : int; | `Nil -> Lwt.return acc
cond : unit Lwt_condition.t; | `Cons (x, tl) ->
q : 'a Queue.t; let acc = f acc x in
mutable str : 'a stream; fold f acc tl
mutable closed : bool;
}
(* function that waits for the next element, and recursively, let rec fold_s f acc l =
returning a stream of values *) l >>= function
let rec make_stream_ t () : 'a stream = | `Nil -> Lwt.return acc
if t.closed then empty | `Cons (x, tl) -> f acc x >>= fun acc -> fold_s f acc tl
else if not (Queue.is_empty t.q)
then (
let x = Queue.pop t.q in
Lwt_condition.signal t.cond ();
Lwt.return (`Cons (x, make_stream_ t))
)
else
(* wait for something to happen *)
Lwt_condition.wait t.cond >>= make_stream_ t
let create ?(bufsize=128) () = let take n l = assert false
let t = { let take_while f l = assert false
bufsize; let take_while_s f l = assert false
q = Queue.create (); let drop n l = assert false
str = empty; let drop_while f l = assert false
cond = Lwt_condition.create (); let drop_while_s f l = assert false
closed = false; let merge a b = assert false
} in
t.str <- make_stream_ t ();
t
exception ClosedQueue (** {2 Conversions} *)
let close t = type 'a gen = unit -> 'a option
if not t.closed then (
t.closed <- true;
Lwt_condition.signal t.cond ()
)
let rec push_rec t x () = let rec of_list l = match l with
if t.closed then raise ClosedQueue; | [] -> empty
if Queue.length t.q = t.bufsize | x :: tl -> Lwt.return (`Cons (x, of_list tl))
then Lwt_condition.wait t.cond >>= push_rec t x
else (
Queue.push x t.q;
Lwt.return_unit
)
let push t x = push_rec t x () let rec of_array_rec a i =
if i = Array.length a
then empty
else Lwt.return (`Cons (a.(i), of_array_rec a (i+1)))
let to_stream t = t.str let of_array a = of_array_rec a 0
let take t = assert false let rec of_gen g = match g () with
let take_exn t = assert false | None -> empty
| Some x -> Lwt.return (`Cons (x, of_gen g))
end
let rec of_gen_s g = match g() with
| None -> empty
| Some x ->
x >|= fun x -> `Cons (x, of_gen_s g)
let of_string s = assert false
let to_string l = assert false
let to_list l = assert false
let to_rev_list l = assert false

View file

@ -26,15 +26,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
(** {1 Functional streams for Lwt} *) (** {1 Functional streams for Lwt} *)
type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t type 'a t = [ `Nil | `Cons of 'a * 'a t ] Lwt.t
type 'a stream = 'a t type 'a stream = 'a t
val empty : 'a t val empty : 'a t
val cons : 'a -> 'a t -> 'a t val cons : 'a -> 'a t -> 'a t
val of_list : 'a list -> 'a t
val create : (unit -> 'a option Lwt.t) -> 'a t val create : (unit -> 'a option Lwt.t) -> 'a t
(** Create from a function that returns the next element *) (** Create from a function that returns the next element *)
@ -43,46 +41,60 @@ val next : 'a t -> ('a * 'a t) option Lwt.t
val next_exn : 'a t -> ('a * 'a t) Lwt.t val next_exn : 'a t -> ('a * 'a t) Lwt.t
(** Obtain the next element or fail (** Obtain the next element or fail
@raise Not_found if the stream is empty *) @raise Not_found if the stream is empty (using {!Lwt.fail}) *)
val map : ('a -> 'b) -> 'a t -> 'b t val map : ('a -> 'b) -> 'a t -> 'b t
val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t
val append : 'a t -> 'a t -> 'a t val append : 'a t -> 'a t -> 'a t
val filter_map : ('a -> 'b option) -> 'a t -> 'b t
val filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b t
val flat_map : ('a -> 'b t) -> 'a t -> 'b t val flat_map : ('a -> 'b t) -> 'a t -> 'b t
val iter : ('a -> unit) -> 'a t -> unit Lwt.t val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
(** {2 Bounded Queue} *) val fold : ('a -> 'b -> 'a) -> 'a -> 'b t -> 'a Lwt.t
module Queue : sig
type 'a t
val create : ?bufsize:int -> unit -> 'a t val fold_s : ('a -> 'b -> 'a Lwt.t) -> 'a -> 'b t -> 'a Lwt.t
(** Create a new queue, with the given internal buffer size.
If [bufsize=0] the queue is fully blocking *)
exception ClosedQueue val take : int -> 'a t -> 'a t
val close : _ t -> unit val take_while : ('a -> bool) -> 'a t -> 'a t
(** Close the queue. Elements remaining in the queue will be available for
consumption, say, by {!get}; pushing an element will raise {!ClosedQueue} *)
val push : 'a t -> 'a -> unit Lwt.t val take_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a t
(** Push an element at the back of the queue. Returns immediately
if the queue isn't full, blocks until an element is consumed otherwise *)
val take : 'a t -> 'a option Lwt.t val drop : int -> 'a t -> 'a t
(** Take the next element. May block if no element is currently available. *)
val take_exn : 'a t -> 'a Lwt.t val drop_while : ('a -> bool) -> 'a t -> 'a t
(** Same as {!get} but fails if the queue is closed.
@raise ClosedQueue if the queue gets closed before an element is pushed *)
val to_stream : 'a t -> 'a stream val drop_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a t
(** Stream of elements pushed into the queue *)
val merge : 'a t -> 'a t -> 'a t
(** Non-deterministic merge *)
(** {2 Conversions} *)
type 'a gen = unit -> 'a option
val of_list : 'a list -> 'a t
val of_array : 'a array -> 'a t
val of_gen : 'a gen -> 'a t
val of_gen_s : 'a Lwt.t gen -> 'a t
val of_string : string -> 'a t
val to_list : 'a t -> 'a list Lwt.t
val to_rev_list : 'a t -> 'a list Lwt.t
val to_string : char t -> string Lwt.t
(* TODO: fix semantics; e.g. notion of "cursor" with several cursors
on one queue *)
end