From 89aded1311ba7f9e3a051d47771cb6db7a47b8bf Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 19 Feb 2015 19:35:53 +0100 Subject: [PATCH] wip: lwt_klist --- .header | 2 +- src/lwt/lwt_klist.ml | 154 ++++++++++++++++++++---------------------- src/lwt/lwt_klist.mli | 66 ++++++++++-------- 3 files changed, 114 insertions(+), 108 deletions(-) diff --git a/.header b/.header index 71e61012..d5a14c50 100644 --- a/.header +++ b/.header @@ -1,5 +1,5 @@ (* -copyright (c) 2013-2014, simon cruanes +copyright (c) 2013-2015, simon cruanes all rights reserved. redistribution and use in source and binary forms, with or without diff --git a/src/lwt/lwt_klist.ml b/src/lwt/lwt_klist.ml index 30b3154b..fa186711 100644 --- a/src/lwt/lwt_klist.ml +++ b/src/lwt/lwt_klist.ml @@ -26,7 +26,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Functional streams for Lwt} *) -type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t +type 'a t = [ `Nil | `Cons of 'a * 'a t ] Lwt.t type 'a stream = 'a t let (>>=) = Lwt.(>>=) @@ -34,126 +34,120 @@ let (>|=) = Lwt.(>|=) let empty = Lwt.return `Nil -let cons x l = Lwt.return (`Cons (x, fun () -> l)) +let cons x l = Lwt.return (`Cons (x, l)) -let rec of_list_rec l () = match l with - | [] -> empty - | x :: tl -> Lwt.return (`Cons (x, of_list_rec tl)) - -let of_list l : 'a t = of_list_rec l () - -let rec create_rec f () : 'a t = +let rec create f : 'a t = + let fut, wake = Lwt.wait () in f () >|= function | None -> `Nil - | Some x -> `Cons (x, create_rec f) - -let create f = create_rec f () + | Some x -> `Cons (x, create f) +and create_rec f () = + f () >|= function + | None -> `Nil + | Some x -> `Cons (x, create f) let next l = l >|= function | `Nil -> None - | `Cons (x, tl) -> Some (x, tl()) + | `Cons (x, tl) -> Some (x, tl) let next_exn l = l >>= function | `Nil -> Lwt.fail Not_found - | `Cons (x, tl) -> Lwt.return (x, tl ()) + | `Cons (x, tl) -> Lwt.return (x, tl) -let rec map_rec f l () = +let rec map f l = l >|= function | `Nil -> `Nil - | `Cons (x, tl) -> `Cons (f x, map_rec f (tl ())) + | `Cons (x, tl) -> `Cons (f x, map f tl) -let map f (l:'a t) : 'b t = map_rec f l () - -let rec map_s_rec (f:'a -> 'b Lwt.t) l () = +let rec map_s (f:'a -> 'b Lwt.t) l = l >>= function | `Nil -> empty | `Cons (x, tl) -> - f x >|= fun y -> `Cons (y, map_s_rec f (tl ())) + f x >|= fun y -> `Cons (y, map_s f tl) -let map_s f l = map_s_rec f l () - -let rec append_rec l1 l2 () = +let rec append l1 l2 = l1 >>= function | `Nil -> l2 - | `Cons (x, tl1) -> Lwt.return (`Cons (x, append_rec (tl1 ()) l2)) - -let append l1 l2 = append_rec l1 l2 () + | `Cons (x, tl1) -> Lwt.return (`Cons (x, append tl1 l2)) let rec flat_map f l = l >>= function | `Nil -> empty - | `Cons (x, tl) -> append (f x) (flat_map f (tl ())) + | `Cons (x, tl) -> append (f x) (flat_map f tl) + +let rec filter_map f l = + l >>= function + | `Nil -> empty + | `Cons (x, tl) -> + match f x with + | None -> filter_map f tl + | Some y -> Lwt.return (`Cons (y, filter_map f tl)) + +let rec filter_map_s f l = + l >>= function + | `Nil -> empty + | `Cons (x, tl) -> + f x >>= function + | None -> filter_map_s f tl + | Some y -> Lwt.return (`Cons (y, filter_map_s f tl)) let rec iter f l = l >>= function | `Nil -> Lwt.return_unit - | `Cons (x, tl) -> f x; iter f (tl ()) + | `Cons (x, tl) -> f x; iter f tl let rec iter_s f l = l >>= function | `Nil -> Lwt.return_unit - | `Cons (x, tl) -> f x >>= fun () -> iter_s f (tl ()) + | `Cons (x, tl) -> f x >>= fun () -> iter_s f tl -module Queue = struct - type 'a t = { - bufsize : int; - cond : unit Lwt_condition.t; - q : 'a Queue.t; - mutable str : 'a stream; - mutable closed : bool; - } +let rec fold f acc l = + l >>= function + | `Nil -> Lwt.return acc + | `Cons (x, tl) -> + let acc = f acc x in + fold f acc tl - (* function that waits for the next element, and recursively, - returning a stream of values *) - let rec make_stream_ t () : 'a stream = - if t.closed then empty - else if not (Queue.is_empty t.q) - then ( - let x = Queue.pop t.q in - Lwt_condition.signal t.cond (); - Lwt.return (`Cons (x, make_stream_ t)) - ) - else - (* wait for something to happen *) - Lwt_condition.wait t.cond >>= make_stream_ t +let rec fold_s f acc l = + l >>= function + | `Nil -> Lwt.return acc + | `Cons (x, tl) -> f acc x >>= fun acc -> fold_s f acc tl - let create ?(bufsize=128) () = - let t = { - bufsize; - q = Queue.create (); - str = empty; - cond = Lwt_condition.create (); - closed = false; - } in - t.str <- make_stream_ t (); - t +let take n l = assert false +let take_while f l = assert false +let take_while_s f l = assert false +let drop n l = assert false +let drop_while f l = assert false +let drop_while_s f l = assert false +let merge a b = assert false - exception ClosedQueue +(** {2 Conversions} *) - let close t = - if not t.closed then ( - t.closed <- true; - Lwt_condition.signal t.cond () - ) +type 'a gen = unit -> 'a option - let rec push_rec t x () = - if t.closed then raise ClosedQueue; - if Queue.length t.q = t.bufsize - then Lwt_condition.wait t.cond >>= push_rec t x - else ( - Queue.push x t.q; - Lwt.return_unit - ) +let rec of_list l = match l with + | [] -> empty + | x :: tl -> Lwt.return (`Cons (x, of_list tl)) - let push t x = push_rec t x () +let rec of_array_rec a i = + if i = Array.length a + then empty + else Lwt.return (`Cons (a.(i), of_array_rec a (i+1))) - let to_stream t = t.str +let of_array a = of_array_rec a 0 - let take t = assert false - let take_exn t = assert false - -end +let rec of_gen g = match g () with + | None -> empty + | Some x -> Lwt.return (`Cons (x, of_gen g)) +let rec of_gen_s g = match g() with + | None -> empty + | Some x -> + x >|= fun x -> `Cons (x, of_gen_s g) +let of_string s = assert false +let to_string l = assert false +let to_list l = assert false +let to_rev_list l = assert false diff --git a/src/lwt/lwt_klist.mli b/src/lwt/lwt_klist.mli index 766de3f6..4a2b6087 100644 --- a/src/lwt/lwt_klist.mli +++ b/src/lwt/lwt_klist.mli @@ -26,15 +26,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (** {1 Functional streams for Lwt} *) -type 'a t = [ `Nil | `Cons of 'a * (unit -> 'a t) ] Lwt.t +type 'a t = [ `Nil | `Cons of 'a * 'a t ] Lwt.t type 'a stream = 'a t val empty : 'a t val cons : 'a -> 'a t -> 'a t -val of_list : 'a list -> 'a t - val create : (unit -> 'a option Lwt.t) -> 'a t (** Create from a function that returns the next element *) @@ -43,46 +41,60 @@ val next : 'a t -> ('a * 'a t) option Lwt.t val next_exn : 'a t -> ('a * 'a t) Lwt.t (** Obtain the next element or fail - @raise Not_found if the stream is empty *) + @raise Not_found if the stream is empty (using {!Lwt.fail}) *) val map : ('a -> 'b) -> 'a t -> 'b t + val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t val append : 'a t -> 'a t -> 'a t +val filter_map : ('a -> 'b option) -> 'a t -> 'b t + +val filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b t + val flat_map : ('a -> 'b t) -> 'a t -> 'b t val iter : ('a -> unit) -> 'a t -> unit Lwt.t + val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t -(** {2 Bounded Queue} *) -module Queue : sig - type 'a t +val fold : ('a -> 'b -> 'a) -> 'a -> 'b t -> 'a Lwt.t - val create : ?bufsize:int -> unit -> 'a t - (** Create a new queue, with the given internal buffer size. - If [bufsize=0] the queue is fully blocking *) +val fold_s : ('a -> 'b -> 'a Lwt.t) -> 'a -> 'b t -> 'a Lwt.t - exception ClosedQueue +val take : int -> 'a t -> 'a t - val close : _ t -> unit - (** Close the queue. Elements remaining in the queue will be available for - consumption, say, by {!get}; pushing an element will raise {!ClosedQueue} *) +val take_while : ('a -> bool) -> 'a t -> 'a t - val push : 'a t -> 'a -> unit Lwt.t - (** Push an element at the back of the queue. Returns immediately - if the queue isn't full, blocks until an element is consumed otherwise *) +val take_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a t - val take : 'a t -> 'a option Lwt.t - (** Take the next element. May block if no element is currently available. *) +val drop : int -> 'a t -> 'a t - val take_exn : 'a t -> 'a Lwt.t - (** Same as {!get} but fails if the queue is closed. - @raise ClosedQueue if the queue gets closed before an element is pushed *) +val drop_while : ('a -> bool) -> 'a t -> 'a t - val to_stream : 'a t -> 'a stream - (** Stream of elements pushed into the queue *) +val drop_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a t + +val merge : 'a t -> 'a t -> 'a t +(** Non-deterministic merge *) + +(** {2 Conversions} *) + +type 'a gen = unit -> 'a option + +val of_list : 'a list -> 'a t + +val of_array : 'a array -> 'a t + +val of_gen : 'a gen -> 'a t + +val of_gen_s : 'a Lwt.t gen -> 'a t + +val of_string : string -> 'a t + +val to_list : 'a t -> 'a list Lwt.t + +val to_rev_list : 'a t -> 'a list Lwt.t + +val to_string : char t -> string Lwt.t - (* TODO: fix semantics; e.g. notion of "cursor" with several cursors - on one queue *) -end