2017-07-31 17 views
5

Birden fazla birlikte-rutininden alıp sonucuChannel'e geri iterek ürünler üretiyorum. Yapımcı son öğeden sonra kanalını kapatıyor.Fan-out/fan-in-kapanış sonucu kanalı

Kod hiçbir zaman sonuç olarak bitmezKanal hiçbir zaman kapalı değildir. hasNext()false dönüşü nasıl yinelenir ve düzgün bir şekilde sonlanır?

val inputData = (0..99).map { "Input$it" } 
val threads = 10 

val bundleProducer = produce<String>(CommonPool, threads) { 
    inputData.forEach { item -> 
     send(item) 
     println("Producing: $item") 
    } 

    println("Producing finished") 
    close() 
} 

val resultChannel = Channel<String>(threads) 

repeat(threads) { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

val iterator = object : Iterator<String> { 
    val iterator = resultChannel.iterator() 
    override fun hasNext() = runBlocking { iterator.hasNext() } 
    override fun next() = runBlocking { iterator.next() } 
}.asSequence() 

println("Starting interation...") 

val result = iterator.toList() 

println("finish: ${result.size}") 
+0

Bunu yapmak için hackish yolu bulduğum sıraya göre (100) .take olduğunu ancak hangi durumda altta yatan yapıları bırakır emin değilim. – atok

cevap

3

Daha sonra tüketicilerin bitirmek için bekliyor bir eşyordam çalıştırabilir ve resultChannel kapatır.

Birincisi, Job s kurtarmak için tüketicilere başlar kodunu yeniden yazın:

val jobs = (1..threads).map { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

Ve sonra tüm Job s tamamladıktan sonra kanal kapatır başka eşyordam çalıştırın:

launch(CommonPool) { 
    jobs.forEach { it.join() } 
    resultChannel.close() 
}