From be58673ba15cc11edf3b8ef983f6da00cb9265cd Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 30 Dec 2013 16:18:07 +0100 Subject: [PATCH] Automaton now with separate data types for their input and outputs --- automaton.ml | 288 ++++++++++++++++++++++++++++++-------------------- automaton.mli | 139 +++++++++++++----------- 2 files changed, 252 insertions(+), 175 deletions(-) diff --git a/automaton.ml b/automaton.ml index 0e515817..f35273b6 100644 --- a/automaton.ml +++ b/automaton.ml @@ -24,121 +24,12 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *) -(** {1 Distributed Algorithms} *) +(** {1 Automaton} *) type ('s, -'i, +'o) t = 's -> 'i -> 's * 'o list (** Transition function of an event automaton *) - type queue = (unit -> unit) Queue.t - - let create_queue () = Queue.create () - - let default_queue = create_queue () - -let _process q = - while not (Queue.is_empty q) do - let task = Queue.pop q in - task () - done - -let _schedule q task = Queue.push task q - -(* empty callback *) -let __noop s i os = true - -type ('s, 'i, 'o) instance = { - transition : ('s, 'i, 'o) t; - queue : queue; - mutable state : 's; - mutable connections : 'o connection list; - mutable n_callback : int; - mutable callback : ('s -> 'i -> 's * 'o list -> bool) array; -} - -(* connection to another automaton *) -and 'a connection = - | Conn : (_, 'a, _) instance -> 'a connection - | ConnMap : ('a -> 'b) * (_, 'b, _) instance -> 'a connection - -let instantiate ?(queue=default_queue) ~f init = { - transition = f; - queue; - state = init; - connections = []; - n_callback = 0; - callback = Array.make 3 __noop; -} - -let transition a = a.transition - -let state a = a.state - -(* register a new callback *) -let on_transition a k = - if Array.length a.callback = a.n_callback - then begin - let callback' = Array.make (2 * a.n_callback) __noop in - Array.blit a.callback 0 callback' 0 a.n_callback; - a.callback <- callback'; - end; - a.callback.(a.n_callback) <- k; - a.n_callback <- a.n_callback + 1 - -let connect left right = - left.connections <- (Conn right) :: left.connections - -let connect_map f left right = - left.connections <- (ConnMap (f, right)) :: left.connections - -(* remove i-th callback of [a] *) -let _remove_callback a i = - if i < a.n_callback - then a.callback.(i) <- a.callback.(a.n_callback - 1); - (* avoid memleak *) - a.callback.(a.n_callback - 1) <- __noop; - a.n_callback <- a.n_callback - 1 - -(* process callback at index [n] *) -let rec _call_callbacks a n s i o = - if n >= a.n_callback then () - else try - let keep = a.callback.(n) s i o in - if keep - then _call_callbacks a (n+1) s i o - else begin - _remove_callback a n; - _call_callbacks a n s i o (* same index, the callback has been removed *) - end - with _ -> _call_callbacks a (n+1) s i o - -(* send input to automaton *) -let rec send : type s i o. (s, i, o) instance -> i -> unit -= fun a i -> - let first = Queue.is_empty a.queue in - (* compute transitions *) - let s = a.state in - let s', os = a.transition a.state i in - a.state <- s'; - (* callbacks *) - _call_callbacks a 0 s i (s', os); - (* connections to other automata *) - List.iter - (fun o -> _forward_connections a.connections o) - os; - (* if no enclosing call to [send], we need to process events *) - if first then _process a.queue - -and _forward_connections : type a. a connection list -> a -> unit -= fun l o -> match l with - | [] -> () - | (Conn a') :: l' -> - _schedule a'.queue (fun () -> send a' o); - _forward_connections l' o - | (ConnMap (f, a')) :: l' -> - _schedule a'.queue (fun () -> send a' (f o)); - _forward_connections l' o - -(** {2 Helpers} *) +type ('s, 'i, 'o) automaton = ('s, 'i, 'o) t let map_i f a s i = a s (f i) @@ -146,9 +37,174 @@ let map_o f a s i = let s', os = a s i in s', List.map f os -let iter k a = - on_transition a (fun s i os -> k s i os; true) +let fmap_o f a s i = + let rec _fmap f l = match l with + | [] -> [] + | x::l' -> f x @ _fmap f l' + in + let s', os = a s i in + let os' = _fmap f os in + s', os' -let iter_state k = iter (fun s i (s',os) -> k s') -let iter_input k = iter (fun s i os -> k i) -let iter_output k = iter (fun s i (_,os) -> List.iter k os) +let filter_i p a s i = + if p i + then a s i + else s, [] + +let filter_o p a s i = + let s', os = a s i in + s', List.filter p os + +let fold f s i = + let s' = f s i in + s', [s'] + +let product f1 f2 (s1, s2) i = + let s1', os1 = f1 s1 i in + let s2', os2 = f2 s2 i in + (s1', s2'), (os1 @ os2) + +module I = struct + type 'a t = 'a -> unit + + let send x i = i x + + let comap f i x = i (f x) + + let filter f i x = if f x then i x +end + +module O = struct + type 'a t = { + mutable n : int; (* how many handlers? *) + mutable handlers : ('a -> bool) array; + mutable alive : keepalive; (* keep some signal alive *) + } (** Signal of type 'a *) + + and keepalive = + | Keep : 'a t -> keepalive + | NotAlive : keepalive + + let nop_handler x = true + + let create () = + let s = { + n = 0; + handlers = Array.create 3 nop_handler; + alive = NotAlive; + } in + s + + (* remove handler at index i *) + let remove s i = + (if i < s.n - 1 (* erase handler with the last one *) + then s.handlers.(i) <- s.handlers.(s.n - 1)); + s.handlers.(s.n - 1) <- nop_handler; (* free handler *) + s.n <- s.n - 1; + () + + let send s x = + for i = 0 to s.n - 1 do + while not (try s.handlers.(i) x with _ -> false) do + remove s i (* i-th handler is done, remove it *) + done + done + + let on s f = + (* resize handlers if needed *) + (if s.n = Array.length s.handlers + then begin + let handlers = Array.create (s.n + 4) nop_handler in + Array.blit s.handlers 0 handlers 0 s.n; + s.handlers <- handlers + end); + s.handlers.(s.n) <- f; + s.n <- s.n + 1 + + let once s f = + on s (fun x -> ignore (f x); false) + + let propagate a b = + on a (fun x -> send b x; true) + + let map f signal = + let signal' = create () in + (* weak ref *) + let r = Weak.create 1 in + Weak.set r 0 (Some signal'); + on signal (fun x -> + match Weak.get r 0 with + | None -> false + | Some signal' -> send signal' (f x); true); + signal'.alive <- Keep signal; + signal' + + let filter p signal = + let signal' = create () in + (* weak ref *) + let r = Weak.create 1 in + Weak.set r 0 (Some signal'); + on signal (fun x -> + match Weak.get r 0 with + | None -> false + | Some signal' -> (if p x then send signal' x); true); + signal'.alive <- Keep signal; + signal' +end + +let connect o i = + O.on o (fun x -> I.send x i; true) + +module Instance = struct + type ('s, 'i, 'o) t = { + transition : ('s, 'i, 'o) automaton; + mutable i : 'i I.t; + o : 'o O.t; + transitions : ('s * 'i * 's * 'o list) O.t; + mutable state : 's; + } + + let transition_function a = a.transition + + let i a = a.i + + let o a = a.o + + let state a = a.state + + let transitions a = a.transitions + + let _q = Queue.create () + + let _process q = + while not (Queue.is_empty q) do + let task = Queue.pop q in + task () + done + + let _schedule q task = Queue.push task q + + let _do_transition q a i = + let s = a.state in + let s', os = a.transition s i in + (* update state *) + a.state <- s'; + (* trigger the transitions asap *) + _schedule q (fun () -> O.send a.transitions (s, i, s', os)); + List.iter + (fun o -> _schedule q (fun () -> O.send a.o o)) + os + + let _receive a i = + let first = Queue.is_empty _q in + _do_transition _q a i; + if first then _process _q + + let create ~f init = + let o = O.create () in + let transitions = O.create () in + (* create input and automaton *) + let a = { state = init; i=Obj.magic 0; o; transition=f; transitions; } in + a.i <- _receive a; + a +end diff --git a/automaton.mli b/automaton.mli index f0236007..f633deb1 100644 --- a/automaton.mli +++ b/automaton.mli @@ -24,65 +24,14 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *) -(** {1 Distributed Algorithms} *) +(** {1 Automaton} *) type ('s, -'i, +'o) t = 's -> 'i -> 's * 'o list (** Transition function of an event automaton *) -type ('s, 'i, 'o) instance -(** Instance of an automaton, with a concrete state, and connections to other - automaton instances. *) +type ('s, 'i, 'o) automaton = ('s, 'i, 'o) t -type queue -(** Stateful value used to store the event (pending transitions) that remain - * to process, using an in-memory queue and processing pending tasks until - * none remains. A default global queue is provided, see {!default_queue}. *) - -val default_queue : queue -(** Default event queue *) - -val create_queue : unit -> queue - -val instantiate : - ?queue:queue -> - f:('s, 'i, 'o) t -> - 's -> - ('s, 'i, 'o) instance -(** [instantiate ~f init] creates an instance of [f] with initial state - [init]. - - @param queue event queue used to process transitions of the automaton - upon calls to {!send}. Default value is {!default_queue}. *) - -val transition : ('s, 'i, 'o) instance -> ('s, 'i, 'o) t -(** Transition function of this instance *) - -val state : ('s, _, _) instance -> 's -(** Current state of the automaton instance *) - -val on_transition : ('s, 'i, 'o) instance -> ('s -> 'i -> 's * 'o list -> bool) -> unit -(** [on_state_change a k] calls [k] with the previous state, input, - new state and ouputs of [a] every time [a] changes state. - The callback [k] returns a boolean to signal whether it wants to continue - being called ([true]) or stop being called ([false]). *) - -val connect : (_, _, 'a) instance -> (_, 'a, _) instance -> unit -(** [connect left right] connects the ouput of [left] to the input of [right]. - Outputs of [left] will be fed to [right]. *) - -val connect_map : ('a -> 'b) -> (_, _, 'a) instance -> (_, 'b, _) instance -> unit -(** [connect_map f left right] is a generalization of {!connect}, that - applies [f] to outputs of [left] before they are sent to [right] *) - -val send : (_, 'i, _) instance -> 'i -> unit -(** [send a i] uses [a]'s transition function to update [a] with the input - event [i]. The output of the transition function (a list of outputs) is - recursively processed. - - This may not terminate, if the automata keep on creating new outputs that - trigger other outputs forever. *) - -(** {2 Helpers} *) +(** {2 Combinators} *) val map_i : ('a -> 'b) -> ('s, 'b, 'o) t -> ('s, 'a, 'o) t (** map inputs *) @@ -90,11 +39,83 @@ val map_i : ('a -> 'b) -> ('s, 'b, 'o) t -> ('s, 'a, 'o) t val map_o : ('a -> 'b) -> ('s, 'i, 'a) t -> ('s, 'i, 'b) t (** map outputs *) -val iter : ('s -> 'i -> ('s * 'o list) -> unit) -> ('s,'i,'o) instance -> unit -(** Iterate on every transition (wrapper around {!on_transition}) *) +val fmap_o : ('a -> 'b list) -> ('s, 'i, 'a) t -> ('s, 'i, 'b) t +(** flat-map outputs *) -val iter_state : ('s -> unit) -> ('s, _, _) instance -> unit +val filter_i : ('a -> bool) -> ('s, 'a, 'o) t -> ('s, 'a, 'o) t +(** Filter inputs *) -val iter_input : ('i -> unit) -> (_, 'i, _) instance -> unit +val filter_o : ('a -> bool) -> ('s, 'i, 'a) t -> ('s, 'i, 'a) t +(** Filter outputs *) -val iter_output : ('o -> unit) -> (_, _, 'o) instance -> unit +val fold : ('a -> 'b -> 'a) -> ('a, 'b, 'a) t +(** Automaton that folds over its input using the given function *) + +val product : ('s1, 'i, 'o) t -> ('s2, 'i, 'o) t -> ('s1 * 's2, 'i, 'o) t +(** Product of transition functions and states. *) + +(** {2 Input} + +Input sink, that accepts values of a given type. Cofunctor. *) + +module I : sig + type -'a t + + val comap : ('a -> 'b) -> 'b t -> 'a t + + val filter : ('a -> bool) -> 'a t -> 'a t + + val send : 'a -> 'a t -> unit + (** [send a i] uses [a]'s transition function to update [a] with the input + event [i]. The output of the transition function (a list of outputs) is + recursively processed. + + This may not terminate, if the automata keep on creating new outputs that + trigger other outputs forever. *) +end + +(** {2 Output} + +Stream of output values. Functor. *) + +module O : sig + type 'a t + + val map : ('a -> 'b) -> 'a t -> 'b t + + val filter : ('a -> bool) -> 'a t -> 'a t + + val on : 'a t -> ('a -> bool) -> unit + + val once : 'a t -> ('a -> unit) -> unit + + val propagate : 'a t -> 'a t -> unit + (** [propagate a b] forwards all elements of [a] into [b]. As long as [a] + exists, [b] will not be GC'ed. *) +end + +val connect : 'a O.t -> 'a I.t -> unit + (** Pipe an output into an input *) + +(** {2 Instance} *) + +module Instance : sig + type ('s, 'i, 'o) t + (** Instance of an automaton, with a concrete state, and connections to other + automaton instances. *) + + val transition_function : ('s, 'i, 'o) t -> ('s, 'i, 'o) automaton + (** Transition function of this instance *) + + val i : (_, 'a, _) t -> 'a I.t + + val o : (_, _, 'a) t -> 'a O.t + + val state : ('a, _, _) t -> 'a + + val transitions : ('s, 'i, 'o) t -> ('s * 'i * 's * 'o list) O.t + + val create : f:('s, 'i, 'o) automaton -> 's -> ('s, 'i, 'o) t + (** [create ~f init] creates an instance of [f] with initial state + [init]. *) +end