2016-03-22 12 views
0

Iterable 8 öğesinden() oluşturulan bir flux var. Her bir akı emisyonu için, bir senkronize olmayan bir yöntemi çağırmak istiyorum. Ben çalışmak ve sonunda ben fluxflux<CompletableFuture<Boolean>> dönüştüğü map(CompletableFuture.supplyAsync(..), executor) ile yerleşmiş vermedi dispatchOn ve publishOn ile çeşitli yollar denedik.flux.take (x) veya flux.all (..) tüm emisyonları beklemez

Şimdi yalnızca son öğe tamamlandığında akışa devam etmek istiyorum. all(..) ve take(size of the Iterable) ile denedim ancak her iki durumda da tüm elemanlar tamamlanmadan önce akış devam ediyor. Bunun, yürütücümün yalnızca 4 iş parçacığı olduğu ve CompletableFuture -s akışına eklenmesinin biraz zaman alması nedeniyle olduğunu farz ediyorum.

Neden all(..) veya take(8) akı bitmiyor? Nasıl bekleyebilirim?

kodu: Her öğe için bir hesaplama ile paralel gitmek istiyorsanız

Mono 
    .fromFuture(dbUtil.getEntity(id)) 
    .doOnError(t -> { 
     ... 
     return;}) 
    .doOnSuccess(s -> log.info("Got it: " + s)) 
    .flatMap(s -> 
     Flux.fromIterable(s.getItemsMap().entrySet()) 
      .map(e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR)) 
      .take(s.getItemsMap().entrySet().size()) 
    ) 
    .all(...) 
    .consume(b -> done(b)); 

cevap

1

, sen flatMap + sadece + subscribeOn + haritayı kullanabilirsiniz. Sana RxJava bir örnek vereyim ben Reactor'ün zamanlayıcı türleri ile çok aşina değilim: Ben eklemek denemek istiyorum

ExecutorService exec = ... 
Scheduler scheduler = Schedulers.from(exec); 

Observable.from(...) 
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v))) 
.subscribe(...); 
+0

Teşekkürler @akarnokd. Önerinizi denedim ama bununla uğraştım ve zaman baskısı nedeniyle onu bir kenara bırakmalıyım. Her neyse, eğer doğru bir şekilde anladım eğer sonuç reaktörle denediğim şeye çok benzemeli, belki de 'dispatchOn' /' broadcastlishOn' yerine 'subscribeOn' yerine biraz fark yaratılabilir. – user5396668

0

senin sana oluşturamazsınız varsayabiliriz Çünkü

.finallyDo("here arrive when onComplete or onError") 

gözlemlenebilir yineleyici başına bir gözlemci, daha sonra hepsini birleştir ya da sıkıştır ve sadece abone ol.

Observable.merge(obsevableIter1(), observableIter2()) 
       .subscribe(result -> showResult(result.toString())); 
+0

Teşekkürler @Paul. 'Finale' ye baktım ama akıĢkan (/ gözlemlenebilir) tamamlanıncaya kadar herhangi bir emisyonun geçmesini istemediğim için buraya uymuyor gibi görünüyor. – user5396668

İlgili konular