2016-05-10 16 views
8

Akka akışları tabanlı akışları Play 2.5 uygulamasına entegre etmeye çalışıyorum. Buradaki fikir, bir fotoğrafta akış yapabilmeniz, daha sonra ham dosya, küçük boyutlu bir sürüm ve filigranlı bir sürüm olarak diske yazılabilmesidir.Bir Akka Streams lavabosunun birden fazla dosya yazmasından nasıl birleştirilir?

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray}) 
            .map(_.result().toArray) 

def toByteArray = Flow[ByteString].map(b => b.toArray) 

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => 
    import GraphDSL.Implicits._ 
    val streamFan = builder.add(Broadcast[ByteString](3)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 
    val output = builder.add(Flow[ByteString].map(x => Success(Done))) 

    val rawFileSink = FileIO.toFile(file) 
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

    streamFan.out(0) ~> rawFileSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 
    streamFan.out(2) ~> output.in 

    byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink 
    byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink 

    FlowShape(streamFan.in, output.out) 
}) 

graph 

}

Sonra benim oyun denetleyicisi bunu tel böyle bir akümülatör kullanarak:

val sink = Sink.head[Try[Done]] 

val photoStorageParser = BodyParser { req => 
    Accumulator(sink).through(graph).map(Right.apply) 
} 

Böyle bir grafik şey kullanarak bu çalışma almak başardı

Sorun şu ki, işlenen iki dosya havuzum tamamlanmadı ve işlenen dosyalar için sıfır boyutlar alıyorum ancak işlenmemiş dosyalar için değil. Benim teorim, akümülatörün sadece fanımın çıkışlarından birini beklemesidir, bu yüzden giriş akışı tamamlandığında ve byteAccumulator tüm dosyayı tükürdüğünde, işlem bittiğinde, oynatma gerçekleşen değeri çıktıdan alır. .

Sorularım şunlardır:
Yaklaşımım ilerledikçe doğru yolda mıyım? Böyle bir grafiği çalıştırmak için beklenen davranış nedir? Tüm lavabolarımı bir son lavabo oluşturmak için nasıl bir araya getirebilirim?

+0

Bunun sebebi, akışların işlemden sonra birleştirilmemesidir. Sink.combin'i denediniz mi (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat

+0

Evet, Sink.combine bir iş verdim, ancak bu, bir fan gibi _to_ göndermek için birden fazla alıcıyı birleştiriyor. Sanırım bir fan arıyordum ama sanki bunu sadece kaynaklarla batırmıyorsun! – Tompey

+0

Bu benzer bir örnek gibi görünüyor: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Belki de akışınızın bittiğini bildirmek için bir "FlowShape" yerine bir "SinkShape" döndürmeniz gerekir. – devkat

cevap

7

Tamam, küçük bir yardım (Andreas doğru yolda idi), ben hile yapar bu çözümün geldi sonra: her oyundan bu çağırmak için öldü

val rawFileSink = FileIO.toFile(file) 
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) { 
    implicit builder => (rawSink, thumbSink, waterSink) => { 
    val streamFan = builder.add(Broadcast[ByteString](2)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 

    streamFan.out(0) ~> rawSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink 
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink 

    SinkShape(streamFan.in) 
    } 
}) 

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done))) 

hangi sonra:

val photoStorageParser = BodyParser { req => 
    Accumulator(theSink).map(Right.apply) 
} 

def createImage(path: String) = Action(photoStorageParser) { req => 
    Created 
} 
+0

Teşekkürler, sadece benzer bir görev yaptım ve tüm materyalize Vadeli İşlemler için nasıl bekleyeceğimi anlayamadım. Çözümünüz çok yardımcı oldu ve işe yarıyor! –

+0

Merhaba! Kombinasyon için değişken sayıda lavaboya ne dersiniz? – Alexander

İlgili konular