2016-03-25 25 views
5

Bir Akka Akımım var ve akışın yaklaşık saniyede bir mesaj göndermesini istiyorum.Bir Akka Akışını yürütmek ve saniyede bir kez bir mesaj göndermek için nasıl sınırlanır?

Bu sorunu çözmek için iki yol denedim, ilk yol yapımcısını akış başlangıcında yapmak için her saniye saniyede bir mesaj göndererek bu iletiye bir Devam iletisinin gelmesiydi. Daha sonra iletileri Devam bir sel ActorPublisher aktör görünür iken

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

Bu kısa bir için çalışır, ben hızlı tüketebilir mansap olarak mesajlar verilmesini talep arka basınç yoluyla aşağı dan (ama emin değilim tahmin) varsayalım ama yukarı yönde hızlı bir oranda üretmiyor. Yani bu yöntem başarısız oldu.

Denediğim diğer bir yöntem de, geri tepme kontrolü ile yapıldı. Iletim sayısını saniyede 1 ile sınırlamak için ActorSubscriber akışında MaxInFlightRequestStrategy kullandım. Bu çalışır, ancak gelen mesajlar sadece bir seferde değil, her seferinde yaklaşık olarak üç veya daha fazla gelir. Geri tepme kontrolünün, OR mesajlarında gelen mesajların oranını derhal değiştirmediği ve işlenmeyi beklediği görülüyor.

Sorun şu ki, sadece bir saniyede bir mesajı işleyebilen bir Akka Akışı'na nasıl sahip olabilirim?


ben MaxInFlightRequestStrategy bunu yapmak için geçerli bir yol olduğunu keşfetti ama 1'e toplu boyutunu ayarlamak gerekir, onun parti boyutu buldum soruna neden olduğu varsayılan 5 vardır. Aynı zamanda sorunu çözmek için aşırı karmaşık bir yol şu anda burada sunulan cevaba bakıyorum.

+2

Eğer 'Source.tick' kullanmayı düşündünüz mü? – cmbaxter

+0

Hayır, şuna bir bakayım. – Phil

+0

ayrıca "gazı" deneyebilirsiniz. –

cevap

10

Öğelerinizi, bastırma hızlı kaynağına geri dönecek olan kısma akışından geçirebilir veya tick ve zip kombinasyonunu kullanabilirsiniz.

yumruk çözüm böyle olacağını:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val throttlingFlow = Flow[Long].throttle(
    // how many elements do you allow 
    elements = 1, 
    // in what unit of time 
    per = 1.second, 
    maximumBurst = 0, 
    // you can also set this to Enforcing, but then your 
    // stream will collapse if exceeding the number of elements/s 
    mode = ThrottleMode.Shaping 
) 

veryFastSource.via(throttlingFlow).runWith(Sink.foreach(println)) 

ikinci çözüm bu gibi olurdu:

val veryFastSource = 
    Source.fromIterator(() => Iterator.continually(Random.nextLong() % 10000)) 

val tickingSource = Source.tick(1.second, 1.second, 0) 

veryFastSource.zip(tickingSource).map(_._1).runWith(Sink.foreach(println)) 
İlgili konular