2016-04-04 14 views
13

Öğeleri dinamik olarak bir Akka Stream kaynağına göndermek için bir SourceQueue kullanmak isterim. Yürütme denetleyicisinin, chuncked yöntemini kullanarak bir sonucu akışlandırabilmesi için bir Kaynağa ihtiyacı vardır.
Play, kendi Akka Stream Sink'i başlığın altında kullandığı için, kaynak kuyruğunu bir Sink kullanarak gerçekleştiremiyorum çünkü kaynak chunked yöntemi tarafından kullanılmadan önce tüketilecektir (aşağıdaki uyarıyı kullanmam dışında).Bir Akka Streams SourceQueue, PlayFramework ile nasıl kullanılır?

Ben reaktif-akışları yayıncı kullanarak kaynak kuyruğu ön gerçekleşmek eğer o iş yapmak mümkün, ama bu kirli hack 'bir tür:

def sourceQueueAction = Action{ 

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run() 

    //stupid example to push elements dynamically 
    val tick = Source.tick(0 second, 1 second, "tick") 
    tick.runForeach(t => queue.offer(t)) 

    Ok.chunked(Source.fromPublisher(pub)) 
    } 

daha basit bir yolu var mı PlayFramework ile bir Akka Streams SourceQueue kullanıyor musunuz?

Teşekkür

cevap

19

çözüm onun kuyruk gerçekleşme bir gelecek elde etmek kaynağına mapMaterializedValue kullanmaktır:

def sourceQueueAction = Action { 
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail)) 

    futureQueue.map { queue => 
     Source.tick(0.second, 1.second, "tick") 
     .runForeach (t => queue.offer(t)) 
    } 
    Ok.chunked(queueSource) 

    } 

    //T is the source type, here String 
    //M is the materialization type, here a SourceQueue[String] 
    def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = { 
    val p = Promise[M] 
    val s = src.mapMaterializedValue { m => 
     p.trySuccess(m) 
     m 
    } 
    (s, p.future) 
    } 
+0

Neden örneğin '' queueSource.map {_.toUpperCase} yaparsanız ben Kaynak [String, NotUsed] almıyor musunuz? Bunun yerine, "queueSource.Repr [String] tipinin ifadesi, beklenen tür Kaynak ile uyumlu değil. Source [String, NotUsed]." Kaynağın geri dönüşümlerini nerede yapacağız? [Örneğinizdeki] keneler gibi (http://loicdescotte.github.io/posts/play-akka-streams-queue/) – gabrielgiussi

İlgili konular