Iterable
8 öğesinden() oluşturulan bir flux
var. Her bir akı emisyonu için, bir senkronize olmayan bir yöntemi çağırmak istiyorum. Ben çalışmak ve sonunda ben flux
flux<CompletableFuture<Boolean>>
dönüştüğü map(CompletableFuture.supplyAsync(..), executor)
ile yerleşmiş vermedi dispatchOn
ve publishOn
ile çeşitli yollar denedik.flux.take (x) veya flux.all (..) tüm emisyonları beklemez
Şimdi yalnızca son öğe tamamlandığında akışa devam etmek istiyorum. all(..)
ve take(size of the Iterable)
ile denedim ancak her iki durumda da tüm elemanlar tamamlanmadan önce akış devam ediyor. Bunun, yürütücümün yalnızca 4 iş parçacığı olduğu ve CompletableFuture
-s akışına eklenmesinin biraz zaman alması nedeniyle olduğunu farz ediyorum.
Neden all(..)
veya take(8)
akı bitmiyor? Nasıl bekleyebilirim?
kodu: Her öğe için bir hesaplama ile paralel gitmek istiyorsanız
Mono
.fromFuture(dbUtil.getEntity(id))
.doOnError(t -> {
...
return;})
.doOnSuccess(s -> log.info("Got it: " + s))
.flatMap(s ->
Flux.fromIterable(s.getItemsMap().entrySet())
.map(e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
.take(s.getItemsMap().entrySet().size())
)
.all(...)
.consume(b -> done(b));
Teşekkürler @akarnokd. Önerinizi denedim ama bununla uğraştım ve zaman baskısı nedeniyle onu bir kenara bırakmalıyım. Her neyse, eğer doğru bir şekilde anladım eğer sonuç reaktörle denediğim şeye çok benzemeli, belki de 'dispatchOn' /' broadcastlishOn' yerine 'subscribeOn' yerine biraz fark yaratılabilir. – user5396668