2015-08-31 12 views
9

Bir Android cihazından arka uçta yüklemek istediğim dosyaların bir listesini hazırladım. Bellek kısıtlamaları nedeniyle, ikinci API çağrısı sadece ilk bittikten sonra, ikincisinin bittikten sonra üçüncü olmasını istiyorum. RxJava ve Retrofit kullanarak N ardışık api aramaları yapma

Ben

private Observable<Integer> uploadFiles(List<File> files) { 
     return Observable.create(subscriber -> { 
      for (int i = 0, size = files.size(); i < size; i++) { 
       UploadModel uploadModel = new UploadModel(files.get(0)); 
       int uploadResult = retrofitApi.uploadSynchronously(uploadModel); 
       subscriber.onNext(uploadResult); 
      } 
      subscriber.onCompleted(); 
     }).subscribeOn(Schedulers.newThread()); 
    } 

gibi bir şey yazdım Ama bu Rx ruhuna aykırı çıkabilirimama gibi hissediyorum ve sen Observable.create kullanıyorsanız deyişi, muhtemelen yanlış yapıyoruz olduğunu. .. Bu makul bir yaklaşım mı? Bunu, Retrofit'in RxJava entegrasyonu ile başarmanın daha iyi bir yolu var mı?

+0

Neden API arayüzünüzde 'uploadSynchronously' ** tanımlanamıyor ** return Gözlenebilir mi? Diğer bir yol, gözlemlenebilir her birinde blokaj kullanmaktır. – marwinXXII

+0

Yaptığım şey buydu - retrofitApi.uploadSynchronousSynchronous bir gözlemlenebilir değil, int döndüren bir retrofit çağrı engelliyor. Ve eğer sorunun cevabı Rx olmadan basit bir döngü yapmamış olsaydı, hatalar ve UI ilerleme güncellemelerini zarif bir şekilde ele almak zor olurdu, ayrıca bu Gözlemlenebilir zincirden önceki ve sonraki bazı ek adımlar var. –

+0

Aslında, hayır, bana göre 'map' kullanılarak temel döngü tercih edilir. – marwinXXII

cevap

4

safça, ben (o pek işe aşağıya bakınız değildir) bunu yapabilir:

return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel)); 

Şimdi mesele bu çağrılar için tek iplik kullanımı güçlendirme anlatmak için bir yolu yoktur olmasıdır. Bununla birlikte, bir işlev çağrısının sonucunu, orijinal gözlemlenebilirden bir sonraki yayılan değerle birlikte, bir sonraki işlevine iletir. Bu işe yarayacak, ancak reduce'a iletilen işlevin senkronize olması gerekiyor. İyi değil.

başka bir yaklaşım, ardışık olarak gözlemlenebilir ve değişiklikler olacaktır: yaklaşık

void getNextFile(int i) { 
    return retrofit.upload(i). 
     onNext(result -> getNextFile(i + 1)); 
} 

. Ama bunu daha okunaklı hale getirmek için nasıl temizleyeceğimi bilmiyorum.

ben gibi bir şey olacağını düşünürdüm en temiz: paralel olarak sanki Observable.from(...) tüm öğeleri yayarlar olur RxJava ait

Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file))); 
+0

Gözlemlenebilir.İlk olarak başlangıçta (ama Gözlemlenebilir güçlendirme çağrısı ile) gittiğim ve bu şekilde hepsinin paralel olarak çağrıldığı şeydir. Bu her çağrı ile zinciri engelleyecek ve sonucu onNext()? –

+0

Anlayamadığım şey, her seferinde gözlemlenebilirleri yeniden oluşturmak dışında, bir sonraki çağrıyı nasıl tetikleyeceğinizdir. Son öneri çalışmalıdır, çünkü 'map' sadece bir iş parçacığı üzerinde çağrılır. – njzk2

+0

Son çözümünüz tam olarak amaçlandığı gibi çalışır ve ilk başımdan çok daha az gölgeli görünüyor, bu yüzden teşekkürler! Sadece zincirin başlangıcında haritayı yeni bir konuya itmek zorundaydım, ama sonra her şey onun üzerine koştu. Teşekkürler! –

0

yerlilere. Paralel emisyon olarak düşünmenin en iyi yolu bu. Ancak bazı durumlarda, tüm zincirin gerçek bir şekilde yürütülmesi gerekmektedir. Aşağıdaki çözüme geldim, muhtemelen en iyisi değil ama çalışıyor.

import rx.Observable; 
import rx.Subscriber; 

import java.util.Iterator; 
import java.util.function.Function; 

public class Rx { 
    public static void ignore(Object arg) { 
    } 

    public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { 
     return Observable.create(collectorSubscriber -> 
       Observable.<Void>create(producerSubscriber -> 
         producerSubscriber.setProducer(ignoredCount -> { 
          if (!iterator.hasNext()) { 
           producerSubscriber.onCompleted(); 
           return; 
          } 

          E model = iterator.next(); 
          action.apply(model) 
            .subscribe(
              Rx::ignore, 
              producerSubscriber::onError, 
              () -> producerSubscriber.onNext(null)); 
         })) 
         .subscribe(new Subscriber<Void>() { 
          @Override 
          public void onStart() { 
           request(1); 
          } 

          @Override 
          public void onCompleted() { 
           collectorSubscriber.onNext(null); 
           collectorSubscriber.onCompleted(); 
          } 

          @Override 
          public void onError(Throwable e) { 
           collectorSubscriber.onError(e); 
          } 

          @Override 
          public void onNext(Void aVoid) { 
           request(1); 
          } 
         })); 
    } 
} 

Örnek kullanım olacaktır:

Iterator<? extends Model> iterator = models.iterator(); 

    Rx.sequential(iterator, model -> someFunctionReturnsObservable(model)) 
      .subscribe(...); 

Bu yöntem garanti

Observable<Dummy> someFunctionReturnsObservable(Model model)

0

zincirleme infaz anda gözlenebilirleri oluşturma tercih edilen yöntem fromAsync ile:

Observable.fromAsync(new Action1<AsyncEmitter<Object>>() 
    { 
     @Override 
     public void call(final AsyncEmitter<Object> emitter) 
     { 
      emitter.onNext(object); 
      emitter.onCompleted(); 

      emitter.setCancellation(new AsyncEmitter.Cancellable() 
      { 
       @Override 
       public void cancel() throws Exception 
       { 
        // on unSubscribe() callback 
       } 
      }); 
     } 
    }, AsyncEmitter.BackpressureMode.BUFFER); 
İlgili konular