Implement weak links for Eventbus players?

Is it right to say that participants without a link remain subscribers to the flow of events? At least this is what I get from experimenting with Akka ...

I am trying to implement weak links for participants in an EventBus script. In such cases, listeners / participants in events usually come and go. Unlike independent actors who must be present all the time. Of course, unregistered work really works. But I can not always understand the right moment for this.

Does Akka provide such a precedent?

val as = ActorSystem.create("weak") var actor = as.actorOf(Props[ExceptionHandler]) as.eventStream.subscribe(actor,classOf[Exception]) // an event is published & received as.eventStream.publish(new KnownProblem) //session expires or whatever that makes the actor redundant actor = null (1 to 30).foreach(_ => System.gc) // an event is published & STILL received as.eventStream.publish(new KnownProblem) 
+6
source share
1 answer

Well, I could not realize it, but the actor stops at GC. Using Scala 2.9.2 (REPL) + Akka 2.0.3.

EventBus with WeakReference[ActorRef] did not help - because in Akka you also have a dungeon with ChildrenContainer ( self.children ), there may also be a Monitor subscribing to life cycle events. What I have not tried is creating actors with a dispatcher who knows only about our shiny new WeakEventBus - so maybe I missed this moment?

Here is the code for REPL (run it with the appropriate import and :paste it in 2 steps):

 // Start REPL with something like: // scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar: // /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar: // /opt/akka-2.0.3/lib/akka/config-0.3.1.jar: // /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar: // /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar" // :paste 1/2 import akka.actor._ import akka.pattern._ import akka.event._ import akka.util._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import akka.dispatch.Await import scala.ref.WeakReference import java.util.Comparator import java.util.concurrent.atomic._ import java.util.UUID case class Message(val id:String,val timestamp: Long) case class PostMessage( override val id:String=UUID.randomUUID().toString(), override val timestamp: Long=new java.util.Date().getTime(), text:String) extends Message(id, timestamp) case class MessageEvent(val channel:String, val message:Message) case class StartServer(nodeName: String) case class ServerStarted(nodeName: String, actor: ActorRef) case class IsAlive(nodeName: String) case class IsAliveWeak(nodeName: String) case class AmAlive(nodeName: String, actor: ActorRef) case class GcCheck() case class GcCheckScheduled(isScheduled: Boolean, gcFlag: WeakReference[AnyRef]) trait WeakLookupClassification { this: WeakEventBus ⇒ protected final val subscribers = new Index[Classifier, WeakReference[Subscriber]](mapSize(), new Comparator[WeakReference[Subscriber]] { def compare(a: WeakReference[Subscriber], b: WeakReference[Subscriber]): Int = { if (a.get == None || b.get == None) -1 else compareSubscribers(a.get.get, b.get.get) } }) protected def mapSize(): Int protected def compareSubscribers(a: Subscriber, b: Subscriber): Int protected def classify(event: Event): Classifier protected def publish(event: Event, subscriber: Subscriber): Unit def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, new WeakReference(subscriber)) def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, new WeakReference(subscriber)) def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(new WeakReference(subscriber)) def publish(event: Event): Unit = { val i = subscribers.valueIterator(classify(event)) while (i.hasNext) publish(event, i.next().get.get) } } class WeakEventBus extends EventBus with WeakLookupClassification { type Event = MessageEvent type Classifier=String type Subscriber = ActorRef protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b protected def mapSize(): Int = 10 protected def classify(event: Event): Classifier = event.channel protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event } lazy val weakEventBus = new WeakEventBus implicit val timeout = akka.util.Timeout(1000) lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" akka { loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" debug { receive = on autoreceive = on lifecycle = on event-stream = on } } remote { transport = "akka.remote.netty.NettyRemoteTransport" log-sent-messages = on log-received-messages = on } } serverconf { include "common" akka { actor { deployment { /root { remote = "akka:// serversys@127.0.0.1 :2552" } } } remote { netty { hostname = "127.0.0.1" port = 2552 } } } } """).getConfig("serverconf")) class Server extends Actor { private[this] val scheduled = new AtomicBoolean(false) private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]() val gcCheckPeriod = Duration(5000, "millis") override def preRestart(reason: Throwable, message: Option[Any]) { self ! GcCheckScheduled(scheduled.get, gcFlagRef.get) super.preRestart(reason, message) } def schedule(period: Duration, who: ActorRef) = actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) def receive = { case StartServer(nodeName) => sender ! ServerStarted(nodeName, self) if (scheduled.compareAndSet(false, true)) schedule(gcCheckPeriod, self) val gcFlagObj = new AnyRef() gcFlagRef.set(new WeakReference(gcFlagObj)) weakEventBus.subscribe(self, nodeName) actorSystem.eventStream.unsubscribe(self) case GcCheck => val gcFlag = gcFlagRef.get if (gcFlag == null) { sys.error("gcFlag") } gcFlag.get match { case Some(gcFlagObj) => scheduled.set(true) schedule(gcCheckPeriod, self) case None => println("Actor stopped because of GC: " + self) context.stop(self) } case GcCheckScheduled(isScheduled, gcFlag) => if (isScheduled && scheduled.compareAndSet(false, isScheduled)) { gcFlagRef.compareAndSet(null, gcFlag) schedule(gcCheckPeriod, self) } case IsAlive(nodeName) => println("Im alive (default EventBus): " + nodeName) sender ! AmAlive(nodeName, self) case e: MessageEvent => println("Im alive (weak EventBus): " + e) } } // :paste 2/2 class Root extends Actor { def receive = { case start @ StartServer(nodeName) => val server = context.actorOf(Props[Server], nodeName) context.watch(server) Await.result(server ? start, timeout.duration) .asInstanceOf[ServerStarted] match { case started @ ServerStarted(nodeName, _) => sender ! started case _ => throw new RuntimeException( "[S][FAIL] Could not start server: " + start) } case isAlive @ IsAlive(nodeName) => Await.result(context.actorFor(nodeName) ? isAlive, timeout.duration).asInstanceOf[AmAlive] match { case AmAlive(nodeName, _) => println("[S][SUCC] Server is alive : " + nodeName) case _ => throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName) } case isAliveWeak @ IsAliveWeak(nodeName) => actorSystem.eventStream.publish(MessageEvent(nodeName, PostMessage(text="isAlive-default"))) weakEventBus.publish(MessageEvent(nodeName, PostMessage(text="isAlive-weak"))) } } lazy val rootActor = actorSystem.actorOf(Props[Root], "root") object Root { def start(nodeName: String) = { val msg = StartServer(nodeName) var startedActor: Option[ActorRef] = None Await.result(rootActor ? msg, timeout.duration) .asInstanceOf[ServerStarted] match { case succ @ ServerStarted(nodeName, actor) => println("[S][SUCC] Server started: " + succ) startedActor = Some(actor) case _ => throw new RuntimeException("[S][FAIL] Could not start server: " + msg) } startedActor } def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName) def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName) } //////////////// // actual test Root.start("weak") Thread.sleep(7000L) System.gc() Root.isAlive("weak") 
0
source

All Articles