lwt/Lwt_actor stub, for erlang-style concurrency (albeit much much more naive)

This commit is contained in:
Simon Cruanes 2014-11-20 00:17:35 +01:00
parent 510f63f921
commit 9c9a78c7a6
3 changed files with 257 additions and 1 deletions

2
_oasis
View file

@ -98,7 +98,7 @@ Library "containers_thread"
Library "containers_lwt"
Path: lwt
Modules: Behavior, Lwt_automaton
Modules: Behavior, Lwt_automaton, Lwt_actor
Pack: true
FindlibName: lwt
FindlibParent: containers

181
lwt/lwt_actor.ml Normal file
View file

@ -0,0 +1,181 @@
(*
copyright (c) 2013-2014, simon cruanes
all rights reserved.
redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Small Actor system for Lwt} *)
module ITbl = Hashtbl.Make(struct
type t = int
let equal (i:int) j = i=j
let hash i = i land max_int
end)
(** {2 Actors Basics} *)
let (>>=) = Lwt.(>>=)
type 'a t = {
mutable inbox : 'a Queue.t;
cond : unit Lwt_condition.t;
act : 'a t -> 'a -> unit Lwt.t;
setup : unit -> unit Lwt.t;
pid : int;
mutable links : any_actor list;
mutable monitors : monitor list;
mutable thread : unit Lwt.t option; (* running thread *)
}
(* invariant: thead=Some t means that t is running, and the
actor is alive *)
and any_actor =
| AnyActor : _ t -> any_actor
and monitor =
| Monitor : [> `Died of any_actor] t -> monitor
(* send message *)
let send m x =
Queue.push x m.inbox;
Lwt_condition.signal m.cond ();
Lwt.return_unit
(* [a] just died, now kill its friends *)
let propagate_dead a =
let traversed = ITbl.create 16 in
(* depth-first traversal of the clique of linked actors *)
let rec traverse stack = match stack with
| [] -> ()
| AnyActor a :: stack' when ITbl.mem traversed a.pid ->
traverse stack'
| (AnyActor a) as any_a :: stack' ->
ITbl.add traversed a.pid ();
begin match a.thread with
| None -> ()
| Some t ->
Lwt.cancel t;
a.thread <- None;
end;
(* notify monitors that [a] died *)
let monitors = a.monitors in
Lwt.async
(fun () ->
Lwt_list.iter_p
(function Monitor m -> send m (`Died any_a)
) monitors
);
(* follow links to other actors to kill *)
let stack' = List.rev_append a.links stack' in
traverse stack'
in
traverse [AnyActor a]
(* number of active actors *)
let num_active = ref 0
let on_num_active_0 = Lwt_condition.create()
let decr_num_active () =
decr num_active;
assert (!num_active >= 0);
if !num_active = 0 then Lwt_condition.broadcast on_num_active_0 ()
(* how to start an actor *)
let start_ a =
(* main loop of the actor *)
let rec loop () =
Lwt_condition.wait a.cond >>= fun () ->
let x = Queue.pop a.inbox in
a.act a x >>= fun () ->
loop ()
and exn_handler e =
Lwt_log.ign_info_f ~exn:e "error in thread %d" a.pid;
propagate_dead a;
Lwt.return_unit
in
match a.thread with
| Some _ -> failwith "start: actor already running";
| None ->
(* start the thread *)
let thread = Lwt.catch (fun () -> a.setup () >>= loop) exn_handler in
(* maintain [num_active] *)
incr num_active;
Lwt.on_termination thread decr_num_active;
a.thread <- Some thread;
()
let kill a = propagate_dead a
let no_setup_ () = Lwt.return_unit
let pid a = a.pid
let cur_pid = ref 0
let monitor m a =
a.monitors <- Monitor m :: a.monitors
let link a b =
if a.thread = None
then kill b
else if b.thread = None
then kill a;
a.links <- AnyActor b :: a.links;
b.links <- AnyActor a :: b.links;
()
let spawn ?(links=[]) ?(setup=no_setup_) act =
let pid = !cur_pid in
incr cur_pid;
let a = {
inbox=Queue.create ();
cond = Lwt_condition.create();
act;
setup;
pid;
links=[];
monitors=[];
thread=None;
} in
start_ a;
(* link now *)
List.iter (function AnyActor b -> link a b) links;
a
let cur_timeout_id = ref 0
let timeout a f =
if f <= 0. then invalid_arg "timeout";
let i = !cur_timeout_id in
incr cur_timeout_id;
let _ = Lwt_engine.on_timer f false
(fun _ -> Lwt.async (fun () -> send a (`Timeout i)))
in
i
(* wait until num_active=0 *)
let rec wait_all () =
if !num_active = 0
then Lwt.return_unit
else
Lwt_condition.wait on_num_active_0 >>= fun () ->
wait_all ()

75
lwt/lwt_actor.mli Normal file
View file

@ -0,0 +1,75 @@
(*
copyright (c) 2013-2014, simon cruanes
all rights reserved.
redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer. redistributions in binary
form must reproduce the above copyright notice, this list of conditions and the
following disclaimer in the documentation and/or other materials provided with
the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*)
(** {1 Small Actor system for Lwt}
Let's draw inspiration from Erlang. Just a tiny bit.
{b NOTE}: this module is not thread-safe at all.
*)
(** {2 Actors Basics} *)
type 'a t
(** An actor that can receive messages of type 'a. In practice, 'a will
often be a variant or a polymorphic variant. *)
type any_actor =
| AnyActor : _ t -> any_actor
val spawn : ?links:any_actor list ->
?setup:(unit -> unit Lwt.t) ->
('a t -> 'a -> unit Lwt.t) -> 'a t
(** Spawn a new actor with the given loop function. The function will
be called repeatedly with [(self, message)] where [self] is the actor
itself, and [msg] some incoming message..
@param setup function that is called when the actor (re)starts
@param links list of other actors that are linked to immediately *)
val send : 'a t -> 'a -> unit Lwt.t
(** Send a message to an actor's inbox *)
val pid : _ t -> int
(** Pid of an actor *)
val timeout : [> `Timeout of int ] t -> float -> int
(** [timeout a f] returns some unique integer ticket [i],
and, [f] seconds later, sends [`Timeout i] to [a] *)
val link : _ t -> _ t -> unit
(** [link a b] links the two actors together, so that if one dies, the
other dies too. The linking relationship is transitive and symmetric. *)
val kill : _ t -> unit
(** Kill the actor, and all its linked actors *)
val monitor : [> `Died of any_actor] t -> _ t -> unit
(** [monitor m a] adds [a] to the list of actors monitored by [m]. If [a]
dies for any reason, [m] is sent [`Died a] and can react consequently. *)
val wait_all : unit -> unit Lwt.t
(** Wait for all actors to finish. Typically used directly in {!Lwt_main.run} *)
(* TODO: some basic patterns: monitor strategies, pub/sub... *)