2013-11-04 27 views
6

Üzerinde çalıştığım bir proje, SQS'den gelen mesajların okunmasını gerektiriyor ve bu mesajların işlenmesini dağıtmak için Akka'yı kullanmaya karar verdim.Akka, SQS ve Camel ile Tüketici Anket Oranı

SQS Camel tarafından desteklendiği için ve Tüketici sınıfında Akka'da kullanılmak üzere geliştirilmiş bir işlev bulunduğundan, son noktayı uygulamak ve iletileri bu şekilde okumak en iyi olacağını düşündüm, ancak birçok örnek görmedim. insanlar bunu yapıyor.

Sorunum, kuyruğumu boş bırakmak veya sıranın yakınında tutmak için kuyruğumu yeterince hızlı bir şekilde sorgulayamam. İlk başta düşündüğüm şey, bir Tüketicinin SQS'den X/s hızında Camel üzerinden mesaj alabilmesiydi. Oradan, işlenen mesajlara ihtiyaç duyduğum orana ulaşmak için daha fazla Tüketiciyi oluşturabilirim.

My Tüketici:

görüldüğü gibi
import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

, ben mesajların oranını artırmak için delay=1 yanı sıra &maxMessagesPerPoll=10 kurdum ama aynı bitiş noktası ile birden tüketicileri yumurtlamaya veremiyoruz.

Ben By default endpoints are assumed not to support multiple consumers. ve ben bu çoklu tüketicileri yumurtlama bana sadece bir tüketici verecek şekilde nerede bir dakika sistemi çalıştırdıktan sonra çıkış mesajı yerine Count for actor: x olduğu, hem de OKS uç noktaları için de geçerlidir inanıyoruz docs okumak Diğerleri Count for actor: 0 çıktı.

Eğer bu işe yararsa; Tek bir tüketici üzerindeki bu mevcut uygulama ile yaklaşık 33 mesaj/saniye okuyabiliyorum.

Bu, Akka'daki bir SQS kuyruğundan gelen iletileri okumak için uygun yol mu? Eğer öyleyse, bunu dışarı ölçeklemek için yol var mı, böylece mesaj tüketim oranımı 900 mesaj/saniyeye yakınlaştırabilir miyim?

cevap

5

Sadly Camel şu anda SQS'de paralel mesaj tüketimini desteklemiyor.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

bu ben aws-java-sdk kullanarak toplu mesajlar SQS'in yoklamak için kendi Erkek Oyuncu yazdım ele almak. Bu en iyi uygulama ise belirli değilim ve tabii talepleri sürekli boş kuyruğu isabet olmadığını, bu anket için güçlü olmak benim şimdiki ihtiyaçlarına uygun bu yüzden üzerine geliştirilebilir olsa

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

aynı sıradaki mesajlar.

Bu konuda SQS'den 133 (mesaj/saniye)/aktör hakkında bilgi edinin.

1

Camel 2.15 eşzamanlı Destekleyicileri desteklemektedir, ancak bunun ne kadar kullanışlı olduğunu bilmiyorum ancak akka deve desteğinin 2.15 olup olmadığını bilmiyorum ve bir Tüketici aktörünün birden fazla tüketici olsa bile bir fark yaratıp yaratmadığını bilmiyorum.