2016-01-22 11 views
5

RxJava'yı gerçekten çok seviyorum, harika bir araç ama bazı şeylerin nasıl çalıştığını anlamak çok zor. Android projemizde bir RxJava ile Retrofit kullanıyoruz ve aşağıdaki kullanım durumu var:Sunucu sorgulaması uygulamak için RxJava'da "repeatWhen" ile birlikte "skipWhile" kullanımı

Sunucu bir işi yaparken, yeniden denemeler arasında biraz gecikme ile sunucuyu sorgulamam gerekiyor. Sunucu bittiğinde sonucu teslim etmek zorundayım. Ben başarıyla RxJava ile bunu yaptık Yani, burada kod parçacığı geçerli: Ben kod çalışıyor

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob) 
     .skipWhile(new Func1<CheckJobResponse, Boolean>() { 

      @Override 
      public Boolean call(CheckJobResponse checkJobResponse) { 
       boolean shouldSkip = false; 

       if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus()); 

       switch (checkJobResponse.getJobStatus()){ 
        case CheckJobResponse.PROCESSING: 
         shouldSkip = true; 
         break; 
        case CheckJobResponse.DONE: 
        case CheckJobResponse.ERROR: 
         shouldSkip = false; 
         break; 
       } 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip); 

       return shouldSkip; 
      } 
     }) 
     .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { 
      @Override 
      public Observable<?> call(Observable<? extends Void> observable) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable); 
       return observable.delay(1, TimeUnit.SECONDS); 
      } 
     }).subscribe(new Subscriber<CheckJobResponse>(){ 
      @Override 
      public void onNext(CheckJobResponse response) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response); 

      } 

      @Override 
      public void onError(BaseError error) { 
       if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error); 
       Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show(); 

      } 

      @Override 
      public void onCompleted() { 
       if (SHOW_LOGS) Logger.v(TAG, "onCompleted"); 
      } 
     }); 

"repeatWhen" ile "skipWhile" kullandı: Sunucu cevap verince

o işi devam I "skipWhile" zincirinden "true" döndür, orijinal Gözlemlenebilir 1 saniye bekler ve http isteğini tekrar yapar. Bu işlem, "skipWhile" zincirinden "false" döndürene kadar tekrarlanır. Ben dönene kadar orijinal gözlemlenebilir bir şey (onError, OnNext, onComplete) yayarlar etmeyeceğini "skipWhile" nin belgelerinde gördüğümüz

  1. : Burada

    Anlamadığım birkaç şey olduğunu "çağrı" yönteminden "yanlış". Yani eğer bir şey yaymazsa, neden "tekrarlı" diye bir şey yapmıyor? Bir saniye bekler ve isteği tekrar çalıştırır. Kim başlattı?

  2. İkinci soru şudur: Neden "repeatWhen" den sonsuza kadar çalışmıyor, Neden "skipWhile" dan "false" döndürdüğümde yinelenen durur? "False" değerini döndürürsem, Aboneliğimde başarıyla tamamlanır.

  3. "repeatWhile" belgesinde, abonemde "onComplete" için bir çağrı aldığımı ancak "onComplete" çağrısının hiçbir zaman çağrılmadığını söylüyor. "SkipWhile" ve "repeatWhen" zincirleme sırasını değiştirirsem bu bir fark yaratmaz. Neden ?

RxJava'nın açık kaynak olduğunu ve kodu okuyabildiğimi anlıyorum, ancak dediğim gibi - anlamak gerçekten zor.

Teşekkürler.

cevap

13

Daha önce repeatWhen ile çalıştım ama bu soru beni meraklandırdı, bu yüzden biraz araştırma yaptım.

skipWhile asla daha önce true dönse bile, onError ve onCompleted yayarlar yok. Bu nedenle, checkJob(), onCompleted her seferinde repeatWhen çağrılmaktadır. Bu 1 numaralı soruya cevap veriyor. Soruların geri kalanı yanlış varsayımlara dayanmaktadır. Aboneliğiniz sonsuza dek sürüyor çünkü repeatWhen’unuz hiçbir zaman sona ermiyor. Bunun nedeni, repeatWhen'un fark ettiğinizden daha karmaşık bir canavar olmasıdır. Observable, null kaynağından onCompleted kaynağından alındığında yayar. Bunu alır ve onCompleted döndürürseniz, o zaman biter, aksi takdirde bir şey yayarsanız yeniden dener. delay sadece bir emisyon aldığından ve geciktirdiğinden, her zaman null'u yayar.Bu şekilde, sürekli olarak yeniden gönderir.

# 2'nin cevabı, o zaman sonsuza kadar koşuyor olmasıdır; Aboneliği iptal etmek için muhtemelen bu kodun dışında başka bir şey yapıyorsunuz. # 3 için hiçbir zaman tamamlanmadığı için onCompleted elde edemezsiniz. # 4 için sipariş önemli değil çünkü süresiz tekrarlıyorsunuz.

Şimdi sorun şu ki, doğru davranışları nasıl ediniyorsunuz? skipWhile yerine takeUntil'u kullanmak kadar basit. Bu sayede 'u tekrar'a kadar tekrarlayarak istediğiniz sonucu elde edersiniz, böylece akış sonlandırmak istediğinizde sona erer. Bu örnekte

Observable<Boolean> source = ...; // Something that eventually emits true 

source 
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS)) 
    .takeUntil(result -> result) 
    .filter(result -> result) 
    .subscribe(
     res -> System.out.println("onNext(" + res + ")"), 
     err -> System.out.println("onError()"), 
     () -> System.out.println("onCompleted()") 
    ); 

, source Boolean yayıyor:

İşte bir kod örneği var. Kaynak her 0 saniyede true yayar. resulttrue olana kadar devam ediyorum. Ve ben false olan tüm bildirimleri filtreliyorum, böylece abone true olana kadar almaz.

+0

Bir yazım hatası yaptım: "skipWhile" belgesinde, "arama" yönteminden "false" i döndürene kadar orijinal Gözlenebilir'den hiçbir şey yaymayacağını (onError, onNext, onComplete) gördüm. İşte orijinal belgeler: Belirtilen bir * koşulu geçerli olduğu sürece Gözlemlenebilir kaynak tarafından yayılan tüm öğeleri atlayan bir Gözlemlen döndürür, ancak koşul yanlış olduğunda, tüm diğer kaynak öğeleri yayılır. Bu yüzden "İŞLEM" ilk defa birkaç kez alıyorum ve yöntemden "true" değerini döndürüyorum. Hiçbir şey yaymamalı. Öyleyse neden tekrar deniyor? –

+0

# 2 ile ilgili. Hayır, yalnızca Aboneliği Etkinliğin etkinliğini iptal eden bir kod var, ancak durum böyle değil. Ben hala API çağrıları ile sunucuyu yok ediyorum gördüm :) –

+0

İlk yorum hakkında - checkJob() emisyonları muhtemelen onNext (CheckJobResponse) '-> onCompleted()' dir. Birinciyi 'İŞLEME''yi döndürürken iletmeyi atlıyor, ancak 'onCompleted()' hala devam ediyor. (Bunu kaynak koduna bakarak teyit ettim.) –

İlgili konular