From 9c9a78c7a64f3af622919bff6c9962d04d42483b Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 20 Nov 2014 00:17:35 +0100 Subject: [PATCH] lwt/Lwt_actor stub, for erlang-style concurrency (albeit much much more naive) --- _oasis | 2 +- lwt/lwt_actor.ml | 181 ++++++++++++++++++++++++++++++++++++++++++++++ lwt/lwt_actor.mli | 75 +++++++++++++++++++ 3 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 lwt/lwt_actor.ml create mode 100644 lwt/lwt_actor.mli diff --git a/_oasis b/_oasis index 56f9fda5..aa4e6762 100644 --- a/_oasis +++ b/_oasis @@ -98,7 +98,7 @@ Library "containers_thread" Library "containers_lwt" Path: lwt - Modules: Behavior, Lwt_automaton + Modules: Behavior, Lwt_automaton, Lwt_actor Pack: true FindlibName: lwt FindlibParent: containers diff --git a/lwt/lwt_actor.ml b/lwt/lwt_actor.ml new file mode 100644 index 00000000..f5686b3d --- /dev/null +++ b/lwt/lwt_actor.ml @@ -0,0 +1,181 @@ + +(* +copyright (c) 2013-2014, simon cruanes +all rights reserved. + +redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. redistributions in binary +form must reproduce the above copyright notice, this list of conditions and the +following disclaimer in the documentation and/or other materials provided with +the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*) + +(** {1 Small Actor system for Lwt} *) + +module ITbl = Hashtbl.Make(struct + type t = int + let equal (i:int) j = i=j + let hash i = i land max_int +end) + +(** {2 Actors Basics} *) + +let (>>=) = Lwt.(>>=) + +type 'a t = { + mutable inbox : 'a Queue.t; + cond : unit Lwt_condition.t; + act : 'a t -> 'a -> unit Lwt.t; + setup : unit -> unit Lwt.t; + pid : int; + mutable links : any_actor list; + mutable monitors : monitor list; + mutable thread : unit Lwt.t option; (* running thread *) +} +(* invariant: thead=Some t means that t is running, and the + actor is alive *) + +and any_actor = + | AnyActor : _ t -> any_actor +and monitor = + | Monitor : [> `Died of any_actor] t -> monitor + +(* send message *) +let send m x = + Queue.push x m.inbox; + Lwt_condition.signal m.cond (); + Lwt.return_unit + +(* [a] just died, now kill its friends *) +let propagate_dead a = + let traversed = ITbl.create 16 in + (* depth-first traversal of the clique of linked actors *) + let rec traverse stack = match stack with + | [] -> () + | AnyActor a :: stack' when ITbl.mem traversed a.pid -> + traverse stack' + | (AnyActor a) as any_a :: stack' -> + ITbl.add traversed a.pid (); + begin match a.thread with + | None -> () + | Some t -> + Lwt.cancel t; + a.thread <- None; + end; + (* notify monitors that [a] died *) + let monitors = a.monitors in + Lwt.async + (fun () -> + Lwt_list.iter_p + (function Monitor m -> send m (`Died any_a) + ) monitors + ); + (* follow links to other actors to kill *) + let stack' = List.rev_append a.links stack' in + traverse stack' + in + traverse [AnyActor a] + +(* number of active actors *) +let num_active = ref 0 +let on_num_active_0 = Lwt_condition.create() + +let decr_num_active () = + decr num_active; + assert (!num_active >= 0); + if !num_active = 0 then Lwt_condition.broadcast on_num_active_0 () + +(* how to start an actor *) +let start_ a = + (* main loop of the actor *) + let rec loop () = + Lwt_condition.wait a.cond >>= fun () -> + let x = Queue.pop a.inbox in + a.act a x >>= fun () -> + loop () + and exn_handler e = + Lwt_log.ign_info_f ~exn:e "error in thread %d" a.pid; + propagate_dead a; + Lwt.return_unit + in + match a.thread with + | Some _ -> failwith "start: actor already running"; + | None -> + (* start the thread *) + let thread = Lwt.catch (fun () -> a.setup () >>= loop) exn_handler in + (* maintain [num_active] *) + incr num_active; + Lwt.on_termination thread decr_num_active; + a.thread <- Some thread; + () + +let kill a = propagate_dead a + +let no_setup_ () = Lwt.return_unit + +let pid a = a.pid + +let cur_pid = ref 0 + +let monitor m a = + a.monitors <- Monitor m :: a.monitors + +let link a b = + if a.thread = None + then kill b + else if b.thread = None + then kill a; + a.links <- AnyActor b :: a.links; + b.links <- AnyActor a :: b.links; + () + +let spawn ?(links=[]) ?(setup=no_setup_) act = + let pid = !cur_pid in + incr cur_pid; + let a = { + inbox=Queue.create (); + cond = Lwt_condition.create(); + act; + setup; + pid; + links=[]; + monitors=[]; + thread=None; + } in + start_ a; + (* link now *) + List.iter (function AnyActor b -> link a b) links; + a + +let cur_timeout_id = ref 0 + +let timeout a f = + if f <= 0. then invalid_arg "timeout"; + let i = !cur_timeout_id in + incr cur_timeout_id; + let _ = Lwt_engine.on_timer f false + (fun _ -> Lwt.async (fun () -> send a (`Timeout i))) + in + i + +(* wait until num_active=0 *) +let rec wait_all () = + if !num_active = 0 + then Lwt.return_unit + else + Lwt_condition.wait on_num_active_0 >>= fun () -> + wait_all () diff --git a/lwt/lwt_actor.mli b/lwt/lwt_actor.mli new file mode 100644 index 00000000..6eca78c8 --- /dev/null +++ b/lwt/lwt_actor.mli @@ -0,0 +1,75 @@ + +(* +copyright (c) 2013-2014, simon cruanes +all rights reserved. + +redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. redistributions in binary +form must reproduce the above copyright notice, this list of conditions and the +following disclaimer in the documentation and/or other materials provided with +the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*) + +(** {1 Small Actor system for Lwt} + +Let's draw inspiration from Erlang. Just a tiny bit. +{b NOTE}: this module is not thread-safe at all. +*) + +(** {2 Actors Basics} *) + +type 'a t +(** An actor that can receive messages of type 'a. In practice, 'a will + often be a variant or a polymorphic variant. *) + +type any_actor = + | AnyActor : _ t -> any_actor + +val spawn : ?links:any_actor list -> + ?setup:(unit -> unit Lwt.t) -> + ('a t -> 'a -> unit Lwt.t) -> 'a t +(** Spawn a new actor with the given loop function. The function will + be called repeatedly with [(self, message)] where [self] is the actor + itself, and [msg] some incoming message.. + @param setup function that is called when the actor (re)starts + @param links list of other actors that are linked to immediately *) + +val send : 'a t -> 'a -> unit Lwt.t +(** Send a message to an actor's inbox *) + +val pid : _ t -> int +(** Pid of an actor *) + +val timeout : [> `Timeout of int ] t -> float -> int +(** [timeout a f] returns some unique integer ticket [i], + and, [f] seconds later, sends [`Timeout i] to [a] *) + +val link : _ t -> _ t -> unit +(** [link a b] links the two actors together, so that if one dies, the + other dies too. The linking relationship is transitive and symmetric. *) + +val kill : _ t -> unit +(** Kill the actor, and all its linked actors *) + +val monitor : [> `Died of any_actor] t -> _ t -> unit +(** [monitor m a] adds [a] to the list of actors monitored by [m]. If [a] + dies for any reason, [m] is sent [`Died a] and can react consequently. *) + +val wait_all : unit -> unit Lwt.t +(** Wait for all actors to finish. Typically used directly in {!Lwt_main.run} *) + +(* TODO: some basic patterns: monitor strategies, pub/sub... *)