Akka akışları tabanlı akışları Play 2.5 uygulamasına entegre etmeye çalışıyorum. Buradaki fikir, bir fotoğrafta akış yapabilmeniz, daha sonra ham dosya, küçük boyutlu bir sürüm ve filigranlı bir sürüm olarak diske yazılabilmesidir.Bir Akka Streams lavabosunun birden fazla dosya yazmasından nasıl birleştirilir?
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
Sonra benim oyun denetleyicisi bunu tel böyle bir akümülatör kullanarak:
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
Böyle bir grafik şey kullanarak bu çalışma almak başardı
Sorun şu ki, işlenen iki dosya havuzum tamamlanmadı ve işlenen dosyalar için sıfır boyutlar alıyorum ancak işlenmemiş dosyalar için değil. Benim teorim, akümülatörün sadece fanımın çıkışlarından birini beklemesidir, bu yüzden giriş akışı tamamlandığında ve byteAccumulator tüm dosyayı tükürdüğünde, işlem bittiğinde, oynatma gerçekleşen değeri çıktıdan alır. .
Sorularım şunlardır:
Yaklaşımım ilerledikçe doğru yolda mıyım? Böyle bir grafiği çalıştırmak için beklenen davranış nedir? Tüm lavabolarımı bir son lavabo oluşturmak için nasıl bir araya getirebilirim?
Bunun sebebi, akışların işlemden sonra birleştirilmemesidir. Sink.combin'i denediniz mi (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat
Evet, Sink.combine bir iş verdim, ancak bu, bir fan gibi _to_ göndermek için birden fazla alıcıyı birleştiriyor. Sanırım bir fan arıyordum ama sanki bunu sadece kaynaklarla batırmıyorsun! – Tompey
Bu benzer bir örnek gibi görünüyor: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Belki de akışınızın bittiğini bildirmek için bir "FlowShape" yerine bir "SinkShape" döndürmeniz gerekir. – devkat