2013-10-20 26 views
5

Örneğime veri akışı için WebSockets kullanan bir Play Framework 2.2.0-scala kullanarak yapıyorum. Sorun şu ki, her ne sebeple olursa olsun, ana aktörün çocuklarından biri düzgün bir şekilde kapatılmıyor. Tüm günlükler durduğunu ve kapatıldığını gösterir, ancak bunun veriyi yayınlayarak gerçekte olmadığını görüyorum. İşte ilk benim denetleyicisi eylem ile, bazı kod: Scala: Akka aktör, Play Framework 2.2.0'da ölmüyor.

def scores(teamIds: String) = WebSocket.async[JsValue] { request => 
    val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el => 
     s"nfl-streaming-scores-${el}" 
    } 

    val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr))) 
    ScoresStream.join(scoresStream) 
    } 

Yani bir istemci bağlayan her zaman, bunlar ilgili Iteratee döndüren ScoresStream katılmak, WebSocket.async gerektirir Listeleyicisi söyledi. Gerçek ScoresStream nesne aşağıdaki gibi görünür:

object ScoresStream { 

    implicit val timeout = Timeout(5 seconds) 

    def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = { 

    (scoresStream ? BeginStreaming).map { 

     case Connected(enumerator) => 
     val iteratee = Iteratee.foreach[JsValue] { _ => 
      Logger.info("Ignore iteratee input.") 
     }.map { _ => 
      Logger.info("Client quitting - killing Actor.") 
      scoresStream ! UnsubscribeAll 
      scoresStream ! PoisonPill 
     } 
     (iteratee,enumerator) 
} 

buradaki fikir, ana Erkek Oyuncu, ScoresStream öldürmek istemci bağlantıyı kestiğinde etmektir. Bunu scoresStream ! PoisonPill kullanarak yapıyorum. sırayla

ScoresStream iletilere yayıncılık/çizim için REDIS bağlanmak sargı olan Pub ve Sub örneklerini oluşturur, burada Aktör kod: Nihayet

class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging { 

    val (scoresEnumerator, scoresChannel) = Concurrent.broadcast[JsValue] 

    case class Message(kind: String, user: String, message: String) 
    implicit val messageReads = Json.reads[Message] 
    implicit val messageWrites = Json.writes[Message] 

    val sub = context.child("sub") match { 
    case None => createSub(scoresChannel) 
    case Some(c) => c 
    } 

    val pub = context.child("pub") match { 
    case None  => createPub(teamIds) 
    case Some(c) => c 
    } 

    def receive = { 
    case BeginStreaming => { 
     log.info("hitting join...") 
     sub ! RegisterCallback 
     sub ! SubscribeChannel(teamIds) 
     sender ! Connected(scoresEnumerator) 
    } 

    case UnsubscribeAll => { 
     sub ! UnsubscribeChannel(teamIds) 
    } 
    } 

} 

trait CreatePubSub { self:Actor => 
    def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)), "sub") 
    def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)), "pub") 
} 

, burada asıl Alt Aktör kod: (Pub doesn ince kapatılıyor olarak 't) burada alakalı görünmese:

class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging { 
    val s = context.child("subscriber") match { 
    case None => createSubscriber 
    case Some(c) => c 
    } 

    def callback(pubsub: PubSubMessage) = pubsub match { 
    case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") 
    case S(channel, no) => println("subscribed to " + channel + " and count = " + no) 
    case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) 
    case M(channel, msg) => 
     msg match { 
     // exit will unsubscribe from all channels and stop subscription service 
     case "exit" => 
      println("unsubscribe all ..") 
      pChannel.end 
      r.unsubscribe 

     // message "+x" will subscribe to channel x 
     case x if x startsWith "+" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } 
      } 

     // message "-x" will unsubscribe from channel x 
     case x if x startsWith "-" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) 
             pChannel.end 
      } 

     case x => 
     try { 
      log.info("Just got a message: " + x) 
      pChannel.push(Json.parse(x)) 
      } 
      catch { 
      case ex: com.fasterxml.jackson.core.JsonParseException => { 
       log.info("Malformed JSON sent.") 
      } 
      } 
     } 
    } 

    def receive = { 
    case RegisterCallback => { 
     log.info("Creating a subscriber and registering callback") 
     s ! Register(callback) 
    } 
    case SubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("subscribing to channel " + x + " ") } 
     //sub ! Subscribe(Array("scores-5","scores-6")) 
     s ! Subscribe(teamIds) 
    } 
    case UnsubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") } 
     s ! Unsubscribe(teamIds) 
    } 
    case true => println("Subscriber successfully received message.") 
    case false => println("Something went wrong.") 
    } 
} 

trait CreatePublisherSubscriber { self:Actor => 
    def r = new RedisClient("localhost", 6379) 
    def createSubscriber = context.actorOf(Props(new Subscriber(r)), "subscriber") 
    def createPublisher = context.actorOf(Props(new Publisher(r)), "publisher") 
} 

istemci bağlandığında Şimdi zaman, başlangıç ​​mesajlar sağlıklı bak:

[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921] 
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991] 
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started ([email protected]) 
Subscriber successfully received message. 
Subscriber successfully received message. 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862] 
subscribed to nfl-streaming-scores-5 and count = 1 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539] 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join... 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514] 

Ve bağlantı kesme sağlıklı görünüyor:

redis-cli publish "nfl-streaming-scores-5" "{\"test\":\"message\"}" 
:

[info] application - Client quitting - killing Actor. 
unsubscribed from nfl-streaming-scores-5 and count = 0 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters]) 
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped 
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped 

Ve burada istemci bağlantısını kesti sonra, problem, Şu anda kapatma Aktör abone olan bir mesaj göndermek için gidiyorum

ve burada olmamalı, bu Aktör teknik olarak ölmüş olmalı. Daha önce var olan diğer Aktörler de mesajı alırlar, olanlar $ a/$ b ile etiketlenir. Başka hiçbir müşterinin bağlı olduğunu doğrulayabilirim.

[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"} 

Ayrıca, adların hiçbir zaman yeniden kullanılmaması, garip bir göstergedir.

akka://application/user/$c 
akka://application/user/$d 
akka://application/user/$e 

eski referanslar yeniden alışması Görmez: Ben/kesmek bağladığınızda isimlerin aşağıdaki gibi bir eğilim yumurtlamaya görmeye devam.

Burada varsayımım, Redis ile bağlantının temiz bir şekilde kapatılmamasıdır. Günlüğün neden aktörün durduğunu söylemediğini açıklamıyor ama tüm aktörler ölümünden sonra bile netstat'u çalıştırdıktan sonra kesinlikle yeniden bağlantı kurmak için bağlantıları görüyorum. Uygulamayı tamamen çalışır durumda bıraktığımda, bu bağlantılar temizleniyor. Abonelikten çıkmanın sessizce başarısız olması ve Aktör'ü canlı tutması ve aynı zamanda bağlantı olması, çoklu nedenlerden dolayı gerçekten kötü bir durumdur, çünkü sonuçta sistem dosya tanımlayıcılarının bitmesine ve/veya bellek sızıntılarına neden olacaktır. Burada yanlış yaptığım bir şey var mı?

+1

'def r = new RedisClient', sanırım tembel val daha iyi olurdu, bu nedenle 'r.doSomeThing' çağrılırken her zaman yeni bir örnek oluşturmak yerine RedisClient'in yalnızca bir örneğini yaratırsınız. – Schleichardt

+0

Bunu denedim. Hatta onu özellikten ve doğrudan Aktör'e taşıdım. Yine de, sonuç aynıdır. 'redis-cli' kullanarak, UNSUBSCRIBE’in de geldiğini görüyorum. Ancak bağlantı hala kurulmaktadır. Aktörleri kurduğum/indirdiğim yol burada zahmetli olduğunu düşünüyorum. – randombits

+0

Otomatik olarak oluşturulan aktör isimleri asla yeniden kullanılmayacak, bu da onları benzersiz kılmak için en güvenli yoldur. Şüphelendiğim şey, Abone aktörünün (göstermediğiniz), geri bildirimi, iletinin geldiği zaman, oyuncudan bağımsız olarak redis istemci kütüphanesine iletmesidir. –

cevap

3

Sadece oyuncuyu durdurduğunuz için, bu aktöre ait kaynakların otomatik olarak temizlendiği anlamına gelmez.Bu eylemci örneğine bağlı bir RedisClient varsa ve düzgün şekilde temizlenebilmesi için bu bağlantının durdurulması gerekiyorsa, postStop yönteminde böyle bir şey yapmanız gerekir. Ayrıca @Schleichardt ile def r = new RedisClient'unuzu val ya da tembel bir değere (başlatma sırasına ve gereksinimlerine bağlı olarak) değiştirmeniz gerektiğine katılıyorum. Böylelikle abone başına bir tek RedisClient temizlemeniz gerektiğini biliyorsunuz. Kullandığınız RedisClient için API'yi bilmiyorum, ancak bağlantıyı sonlandıracak ve kaynaklarını temizleyecek bir shutdown yöntemine sahip olduğunu varsayalım. Sonra basit şöyle abone aktöre bir postStop ekleyebilirsiniz:

override def postStop { 
    r.shutdown 
} 

Eğer val değişime def yapmak varsayarsak, bu aradığınız şey olabilir.

İlgili konular