Automaton now with separate data types for their input and outputs

This commit is contained in:
Simon Cruanes 2013-12-30 16:18:07 +01:00
parent e3d5b78c5e
commit be58673ba1
2 changed files with 252 additions and 175 deletions

View file

@ -24,121 +24,12 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Distributed Algorithms} *)
(** {1 Automaton} *)
type ('s, -'i, +'o) t = 's -> 'i -> 's * 'o list
(** Transition function of an event automaton *)
type queue = (unit -> unit) Queue.t
let create_queue () = Queue.create ()
let default_queue = create_queue ()
let _process q =
while not (Queue.is_empty q) do
let task = Queue.pop q in
task ()
done
let _schedule q task = Queue.push task q
(* empty callback *)
let __noop s i os = true
type ('s, 'i, 'o) instance = {
transition : ('s, 'i, 'o) t;
queue : queue;
mutable state : 's;
mutable connections : 'o connection list;
mutable n_callback : int;
mutable callback : ('s -> 'i -> 's * 'o list -> bool) array;
}
(* connection to another automaton *)
and 'a connection =
| Conn : (_, 'a, _) instance -> 'a connection
| ConnMap : ('a -> 'b) * (_, 'b, _) instance -> 'a connection
let instantiate ?(queue=default_queue) ~f init = {
transition = f;
queue;
state = init;
connections = [];
n_callback = 0;
callback = Array.make 3 __noop;
}
let transition a = a.transition
let state a = a.state
(* register a new callback *)
let on_transition a k =
if Array.length a.callback = a.n_callback
then begin
let callback' = Array.make (2 * a.n_callback) __noop in
Array.blit a.callback 0 callback' 0 a.n_callback;
a.callback <- callback';
end;
a.callback.(a.n_callback) <- k;
a.n_callback <- a.n_callback + 1
let connect left right =
left.connections <- (Conn right) :: left.connections
let connect_map f left right =
left.connections <- (ConnMap (f, right)) :: left.connections
(* remove i-th callback of [a] *)
let _remove_callback a i =
if i < a.n_callback
then a.callback.(i) <- a.callback.(a.n_callback - 1);
(* avoid memleak *)
a.callback.(a.n_callback - 1) <- __noop;
a.n_callback <- a.n_callback - 1
(* process callback at index [n] *)
let rec _call_callbacks a n s i o =
if n >= a.n_callback then ()
else try
let keep = a.callback.(n) s i o in
if keep
then _call_callbacks a (n+1) s i o
else begin
_remove_callback a n;
_call_callbacks a n s i o (* same index, the callback has been removed *)
end
with _ -> _call_callbacks a (n+1) s i o
(* send input to automaton *)
let rec send : type s i o. (s, i, o) instance -> i -> unit
= fun a i ->
let first = Queue.is_empty a.queue in
(* compute transitions *)
let s = a.state in
let s', os = a.transition a.state i in
a.state <- s';
(* callbacks *)
_call_callbacks a 0 s i (s', os);
(* connections to other automata *)
List.iter
(fun o -> _forward_connections a.connections o)
os;
(* if no enclosing call to [send], we need to process events *)
if first then _process a.queue
and _forward_connections : type a. a connection list -> a -> unit
= fun l o -> match l with
| [] -> ()
| (Conn a') :: l' ->
_schedule a'.queue (fun () -> send a' o);
_forward_connections l' o
| (ConnMap (f, a')) :: l' ->
_schedule a'.queue (fun () -> send a' (f o));
_forward_connections l' o
(** {2 Helpers} *)
type ('s, 'i, 'o) automaton = ('s, 'i, 'o) t
let map_i f a s i = a s (f i)
@ -146,9 +37,174 @@ let map_o f a s i =
let s', os = a s i in
s', List.map f os
let iter k a =
on_transition a (fun s i os -> k s i os; true)
let fmap_o f a s i =
let rec _fmap f l = match l with
| [] -> []
| x::l' -> f x @ _fmap f l'
in
let s', os = a s i in
let os' = _fmap f os in
s', os'
let iter_state k = iter (fun s i (s',os) -> k s')
let iter_input k = iter (fun s i os -> k i)
let iter_output k = iter (fun s i (_,os) -> List.iter k os)
let filter_i p a s i =
if p i
then a s i
else s, []
let filter_o p a s i =
let s', os = a s i in
s', List.filter p os
let fold f s i =
let s' = f s i in
s', [s']
let product f1 f2 (s1, s2) i =
let s1', os1 = f1 s1 i in
let s2', os2 = f2 s2 i in
(s1', s2'), (os1 @ os2)
module I = struct
type 'a t = 'a -> unit
let send x i = i x
let comap f i x = i (f x)
let filter f i x = if f x then i x
end
module O = struct
type 'a t = {
mutable n : int; (* how many handlers? *)
mutable handlers : ('a -> bool) array;
mutable alive : keepalive; (* keep some signal alive *)
} (** Signal of type 'a *)
and keepalive =
| Keep : 'a t -> keepalive
| NotAlive : keepalive
let nop_handler x = true
let create () =
let s = {
n = 0;
handlers = Array.create 3 nop_handler;
alive = NotAlive;
} in
s
(* remove handler at index i *)
let remove s i =
(if i < s.n - 1 (* erase handler with the last one *)
then s.handlers.(i) <- s.handlers.(s.n - 1));
s.handlers.(s.n - 1) <- nop_handler; (* free handler *)
s.n <- s.n - 1;
()
let send s x =
for i = 0 to s.n - 1 do
while not (try s.handlers.(i) x with _ -> false) do
remove s i (* i-th handler is done, remove it *)
done
done
let on s f =
(* resize handlers if needed *)
(if s.n = Array.length s.handlers
then begin
let handlers = Array.create (s.n + 4) nop_handler in
Array.blit s.handlers 0 handlers 0 s.n;
s.handlers <- handlers
end);
s.handlers.(s.n) <- f;
s.n <- s.n + 1
let once s f =
on s (fun x -> ignore (f x); false)
let propagate a b =
on a (fun x -> send b x; true)
let map f signal =
let signal' = create () in
(* weak ref *)
let r = Weak.create 1 in
Weak.set r 0 (Some signal');
on signal (fun x ->
match Weak.get r 0 with
| None -> false
| Some signal' -> send signal' (f x); true);
signal'.alive <- Keep signal;
signal'
let filter p signal =
let signal' = create () in
(* weak ref *)
let r = Weak.create 1 in
Weak.set r 0 (Some signal');
on signal (fun x ->
match Weak.get r 0 with
| None -> false
| Some signal' -> (if p x then send signal' x); true);
signal'.alive <- Keep signal;
signal'
end
let connect o i =
O.on o (fun x -> I.send x i; true)
module Instance = struct
type ('s, 'i, 'o) t = {
transition : ('s, 'i, 'o) automaton;
mutable i : 'i I.t;
o : 'o O.t;
transitions : ('s * 'i * 's * 'o list) O.t;
mutable state : 's;
}
let transition_function a = a.transition
let i a = a.i
let o a = a.o
let state a = a.state
let transitions a = a.transitions
let _q = Queue.create ()
let _process q =
while not (Queue.is_empty q) do
let task = Queue.pop q in
task ()
done
let _schedule q task = Queue.push task q
let _do_transition q a i =
let s = a.state in
let s', os = a.transition s i in
(* update state *)
a.state <- s';
(* trigger the transitions asap *)
_schedule q (fun () -> O.send a.transitions (s, i, s', os));
List.iter
(fun o -> _schedule q (fun () -> O.send a.o o))
os
let _receive a i =
let first = Queue.is_empty _q in
_do_transition _q a i;
if first then _process _q
let create ~f init =
let o = O.create () in
let transitions = O.create () in
(* create input and automaton *)
let a = { state = init; i=Obj.magic 0; o; transition=f; transitions; } in
a.i <- _receive a;
a
end

View file

@ -24,65 +24,14 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Distributed Algorithms} *)
(** {1 Automaton} *)
type ('s, -'i, +'o) t = 's -> 'i -> 's * 'o list
(** Transition function of an event automaton *)
type ('s, 'i, 'o) instance
(** Instance of an automaton, with a concrete state, and connections to other
automaton instances. *)
type ('s, 'i, 'o) automaton = ('s, 'i, 'o) t
type queue
(** Stateful value used to store the event (pending transitions) that remain
* to process, using an in-memory queue and processing pending tasks until
* none remains. A default global queue is provided, see {!default_queue}. *)
val default_queue : queue
(** Default event queue *)
val create_queue : unit -> queue
val instantiate :
?queue:queue ->
f:('s, 'i, 'o) t ->
's ->
('s, 'i, 'o) instance
(** [instantiate ~f init] creates an instance of [f] with initial state
[init].
@param queue event queue used to process transitions of the automaton
upon calls to {!send}. Default value is {!default_queue}. *)
val transition : ('s, 'i, 'o) instance -> ('s, 'i, 'o) t
(** Transition function of this instance *)
val state : ('s, _, _) instance -> 's
(** Current state of the automaton instance *)
val on_transition : ('s, 'i, 'o) instance -> ('s -> 'i -> 's * 'o list -> bool) -> unit
(** [on_state_change a k] calls [k] with the previous state, input,
new state and ouputs of [a] every time [a] changes state.
The callback [k] returns a boolean to signal whether it wants to continue
being called ([true]) or stop being called ([false]). *)
val connect : (_, _, 'a) instance -> (_, 'a, _) instance -> unit
(** [connect left right] connects the ouput of [left] to the input of [right].
Outputs of [left] will be fed to [right]. *)
val connect_map : ('a -> 'b) -> (_, _, 'a) instance -> (_, 'b, _) instance -> unit
(** [connect_map f left right] is a generalization of {!connect}, that
applies [f] to outputs of [left] before they are sent to [right] *)
val send : (_, 'i, _) instance -> 'i -> unit
(** [send a i] uses [a]'s transition function to update [a] with the input
event [i]. The output of the transition function (a list of outputs) is
recursively processed.
This may not terminate, if the automata keep on creating new outputs that
trigger other outputs forever. *)
(** {2 Helpers} *)
(** {2 Combinators} *)
val map_i : ('a -> 'b) -> ('s, 'b, 'o) t -> ('s, 'a, 'o) t
(** map inputs *)
@ -90,11 +39,83 @@ val map_i : ('a -> 'b) -> ('s, 'b, 'o) t -> ('s, 'a, 'o) t
val map_o : ('a -> 'b) -> ('s, 'i, 'a) t -> ('s, 'i, 'b) t
(** map outputs *)
val iter : ('s -> 'i -> ('s * 'o list) -> unit) -> ('s,'i,'o) instance -> unit
(** Iterate on every transition (wrapper around {!on_transition}) *)
val fmap_o : ('a -> 'b list) -> ('s, 'i, 'a) t -> ('s, 'i, 'b) t
(** flat-map outputs *)
val iter_state : ('s -> unit) -> ('s, _, _) instance -> unit
val filter_i : ('a -> bool) -> ('s, 'a, 'o) t -> ('s, 'a, 'o) t
(** Filter inputs *)
val iter_input : ('i -> unit) -> (_, 'i, _) instance -> unit
val filter_o : ('a -> bool) -> ('s, 'i, 'a) t -> ('s, 'i, 'a) t
(** Filter outputs *)
val iter_output : ('o -> unit) -> (_, _, 'o) instance -> unit
val fold : ('a -> 'b -> 'a) -> ('a, 'b, 'a) t
(** Automaton that folds over its input using the given function *)
val product : ('s1, 'i, 'o) t -> ('s2, 'i, 'o) t -> ('s1 * 's2, 'i, 'o) t
(** Product of transition functions and states. *)
(** {2 Input}
Input sink, that accepts values of a given type. Cofunctor. *)
module I : sig
type -'a t
val comap : ('a -> 'b) -> 'b t -> 'a t
val filter : ('a -> bool) -> 'a t -> 'a t
val send : 'a -> 'a t -> unit
(** [send a i] uses [a]'s transition function to update [a] with the input
event [i]. The output of the transition function (a list of outputs) is
recursively processed.
This may not terminate, if the automata keep on creating new outputs that
trigger other outputs forever. *)
end
(** {2 Output}
Stream of output values. Functor. *)
module O : sig
type 'a t
val map : ('a -> 'b) -> 'a t -> 'b t
val filter : ('a -> bool) -> 'a t -> 'a t
val on : 'a t -> ('a -> bool) -> unit
val once : 'a t -> ('a -> unit) -> unit
val propagate : 'a t -> 'a t -> unit
(** [propagate a b] forwards all elements of [a] into [b]. As long as [a]
exists, [b] will not be GC'ed. *)
end
val connect : 'a O.t -> 'a I.t -> unit
(** Pipe an output into an input *)
(** {2 Instance} *)
module Instance : sig
type ('s, 'i, 'o) t
(** Instance of an automaton, with a concrete state, and connections to other
automaton instances. *)
val transition_function : ('s, 'i, 'o) t -> ('s, 'i, 'o) automaton
(** Transition function of this instance *)
val i : (_, 'a, _) t -> 'a I.t
val o : (_, _, 'a) t -> 'a O.t
val state : ('a, _, _) t -> 'a
val transitions : ('s, 'i, 'o) t -> ('s * 'i * 's * 'o list) O.t
val create : f:('s, 'i, 'o) automaton -> 's -> ('s, 'i, 'o) t
(** [create ~f init] creates an instance of [f] with initial state
[init]. *)
end