2013-01-02 26 views
6

Yanıtsız aktörlerin etkinlik akışına abone olmaya devam ettiğini söylemek doğru mu? En azından, bu Ben Akka ile deney yapmaktan ...Zayıf referanslı Eventbus oyuncularını uygulayın?

Bir EventBus senaryosunda aktörler için zayıf referanslar uygulamaya çalışıyorum. Bu durumlarda olay dinleyicileri/aktörleri genellikle gelir ve gider. Her zaman mevcut olması gereken bağımsız aktörlerin aksine. Açıkça kayıtlı olmayan kayıtlar elbette işe yarıyor. Ama bunu yapmak için doğru anı her zaman algılayamam.

Akka böyle bir kullanım durumunda sunuyor mu?

val as = ActorSystem.create("weak") 
var actor = as.actorOf(Props[ExceptionHandler]) 
as.eventStream.subscribe(actor,classOf[Exception]) 

// an event is published & received 
as.eventStream.publish(new KnownProblem) 

//session expires or whatever that makes the actor redundant 
actor = null 
(1 to 30).foreach(_ => System.gc) 

// an event is published & STILL received 
as.eventStream.publish(new KnownProblem) 
+0

' unsubscribe' aktör? – idonnie

+0

Bu işe yarayacak. Eğer biliyordum eğer aktör artık herhangi bir şekilde referans değil. :-) Bu senaryoyu bir http oturumu için deniyorum. Uygulamalar sunucusu oturumu sona erdiğinde 'abonelikten çık' seçeneğini çalıştırmak için hiçbir zamanım yok. Genellikle bu zayıf referanslarla yapılır. –

+0

EventBus, "ActorSystem" işlevine "def eventStream" ile eklenir, ayrıca bazı olaylar yapılandırma sırasında "eventStream" olarak yayınlanır. "WeakReference [ActorRef]' -s aboneliği ile "EventBus" un uzatılmasını öneririm, 'LookupClassification' özelliği ümit verici görünüyor. yazar bir alıntı: 'EventBus uzatın ve https://groups.google.com/forum/?fromgroups=#!topic/akka-user/T3-FONxoX8E Akka EventBus basit bir örnek https implement.': //gist.github.com/3163791 – idonnie

cevap

0

Tamam, ben aslında bunu uygulamaya olamazdı, ama aktör GC üzerinde duruyor. Scala 2.9.2 (REPL) + Akka 2.0.3 Kullanımı.

yardımcı olmadı EventBus WeakReference[ActorRef] ile - Akka'da da var çünkü bir dungeon ChildrenContainer ( self.children) ile, aynı zamanda yaşam döngüsü olaylara Monitor abonelikler söz konusu olabilir. Denemediğim şey - sadece yeni parlak WeakEventBus'umuzu bilen dağıtıcılarla aktörler yaratmak - belki noktayı özledim mi? İşte

REPL için kod (uygun ithalatı ile başlayın ve :paste o 2'de adım) gider: `classOf [İstisna]` sınıflandırıcı kullanılarak `EventStream` gelen

// Start REPL with something like: 
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar: 
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar: 
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar" 

// :paste 1/2 
import akka.actor._ 
import akka.pattern._ 
import akka.event._ 
import akka.util._ 
import com.typesafe.config.ConfigFactory 
import akka.util.Timeout 
import akka.dispatch.Await 
import scala.ref.WeakReference 
import java.util.Comparator 
import java.util.concurrent.atomic._ 
import java.util.UUID 

case class Message(val id:String,val timestamp: Long) 
case class PostMessage(
    override val id:String=UUID.randomUUID().toString(), 
    override val timestamp: Long=new java.util.Date().getTime(), 
    text:String) extends Message(id, timestamp) 
case class MessageEvent(val channel:String, val message:Message) 

case class StartServer(nodeName: String) 
case class ServerStarted(nodeName: String, actor: ActorRef) 
case class IsAlive(nodeName: String) 
case class IsAliveWeak(nodeName: String) 
case class AmAlive(nodeName: String, actor: ActorRef) 
case class GcCheck() 
case class GcCheckScheduled(isScheduled: Boolean, 
    gcFlag: WeakReference[AnyRef]) 

trait WeakLookupClassification { this: WeakEventBus ⇒ 
protected final val subscribers = new Index[Classifier, 
    WeakReference[Subscriber]](mapSize(), 
    new Comparator[WeakReference[Subscriber]] { 
      def compare(a: WeakReference[Subscriber], 
     b: WeakReference[Subscriber]): Int = { 
       if (a.get == None || b.get == None) -1 
       else compareSubscribers(a.get.get, b.get.get) 
     } 
     }) 
protected def mapSize(): Int 
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int 
protected def classify(event: Event): Classifier 
protected def publish(event: Event, subscriber: Subscriber): Unit 
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = 
    subscribers.put(to, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = 
    subscribers.remove(from, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber): Unit = 
    subscribers.removeValue(new WeakReference(subscriber)) 
def publish(event: Event): Unit = { 
     val i = subscribers.valueIterator(classify(event)) 
     while (i.hasNext) publish(event, i.next().get.get) 
} 
    } 

class WeakEventBus extends EventBus with WeakLookupClassification { 
    type Event = MessageEvent 
    type Classifier=String 
    type Subscriber = ActorRef 

    protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b 

    protected def mapSize(): Int = 10 
    protected def classify(event: Event): Classifier = event.channel 
    protected def publish(event: Event, subscriber: Subscriber): Unit = 
     subscriber ! event 
} 

lazy val weakEventBus = new WeakEventBus 

implicit val timeout = akka.util.Timeout(1000) 
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" 
akka { 
    loglevel = "DEBUG" 
    actor { 
     provider = "akka.remote.RemoteActorRefProvider" 
     debug { 
      receive = on 
      autoreceive = on   
      lifecycle = on 
      event-stream = on 
     } 
    } 
    remote { 
     transport = "akka.remote.netty.NettyRemoteTransport" 
     log-sent-messages = on 
     log-received-messages = on  
    } 
} 
serverconf { 
    include "common" 
    akka { 
     actor { 
      deployment { 
     /root { 
      remote = "akka://[email protected]:2552" 
     }  
      } 
     } 
     remote { 
      netty { 
     hostname = "127.0.0.1" 
     port = 2552 
      } 
     } 
    } 
} 
""").getConfig("serverconf")) 

class Server extends Actor { 
    private[this] val scheduled = new AtomicBoolean(false) 
    private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]() 

    val gcCheckPeriod = Duration(5000, "millis") 

    override def preRestart(reason: Throwable, message: Option[Any]) { 
     self ! GcCheckScheduled(scheduled.get, gcFlagRef.get) 
     super.preRestart(reason, message) 
    } 

    def schedule(period: Duration, who: ActorRef) = 
     actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) 

    def receive = {  
     case StartServer(nodeName) => 
      sender ! ServerStarted(nodeName, self) 
      if (scheduled.compareAndSet(false, true)) 
     schedule(gcCheckPeriod, self) 
      val gcFlagObj = new AnyRef()    
      gcFlagRef.set(new WeakReference(gcFlagObj)) 
      weakEventBus.subscribe(self, nodeName) 
      actorSystem.eventStream.unsubscribe(self)  
     case GcCheck => 
      val gcFlag = gcFlagRef.get 
      if (gcFlag == null) { 
     sys.error("gcFlag") 
      } 
     gcFlag.get match { 
     case Some(gcFlagObj) => 
      scheduled.set(true) 
      schedule(gcCheckPeriod, self) 
     case None => 
      println("Actor stopped because of GC: " + self) 
      context.stop(self)   
     } 
     case GcCheckScheduled(isScheduled, gcFlag) => 
      if (isScheduled && scheduled.compareAndSet(false, isScheduled)) { 
     gcFlagRef.compareAndSet(null, gcFlag) 
     schedule(gcCheckPeriod, self)    
      } 
     case IsAlive(nodeName) => 
      println("Im alive (default EventBus): " + nodeName) 
      sender ! AmAlive(nodeName, self) 
     case e: MessageEvent => 
      println("Im alive (weak EventBus): " + e)  
    } 
} 

// :paste 2/2 
class Root extends Actor { 
    def receive = { 
     case start @ StartServer(nodeName) => 
     val server = context.actorOf(Props[Server], nodeName) 
     context.watch(server) 
     Await.result(server ? start, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
     case started @ ServerStarted(nodeName, _) => 
      sender ! started 
     case _ => 
      throw new RuntimeException(
      "[S][FAIL] Could not start server: " + start) 
     } 
     case isAlive @ IsAlive(nodeName) => 
     Await.result(context.actorFor(nodeName) ? isAlive, 
     timeout.duration).asInstanceOf[AmAlive] match { 
     case AmAlive(nodeName, _) => 
      println("[S][SUCC] Server is alive : " + nodeName) 
     case _ => 
     throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)  
      } 
     case isAliveWeak @ IsAliveWeak(nodeName) =>     
     actorSystem.eventStream.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-default"))) 
     weakEventBus.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-weak"))) 
} 
    } 

lazy val rootActor = actorSystem.actorOf(Props[Root], "root") 

object Root { 
    def start(nodeName: String) = { 
     val msg = StartServer(nodeName) 
     var startedActor: Option[ActorRef] = None 
     Await.result(rootActor ? msg, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
      case succ @ ServerStarted(nodeName, actor) => 
      println("[S][SUCC] Server started: " + succ) 
      startedActor = Some(actor) 
      case _ => 
     throw new RuntimeException("[S][FAIL] Could not start server: " + msg) 
      } 
     startedActor 
    } 
    def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName) 
    def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName) 
} 

//////////////// 
// actual test 
Root.start("weak") 
Thread.sleep(7000L) 
System.gc() 
Root.isAlive("weak")