2015-06-24 15 views
10

içinden arıyorsanız abonelikten ben onNext işleyicisi böyle içinden unsubscribe aramaya yasal olup olmadığını merak:RxJava: OnNext

List<Integer> gatheredItems = new ArrayList<>(); 

Subscriber<Integer> subscriber = new Subscriber<Integer>() { 
    public void onNext(Integer item) { 
     gatheredItems.add(item); 
     if (item == 3) { 
      unsubscribe(); 
     } 
    } 
    public void onCompleted() { 
     // noop 
    } 
    public void onError(Throwable sourceError) { 
     // noop 
    } 
}; 

Observable<Integer> source = Observable.range(0,100); 

source.subscribe(subscriber); 
sleep(1000); 
System.out.println(gatheredItems); 

yukarıdaki kodu doğru sadece dört element toplanan olduğunu verir: [0, 1, 2, 3]. Ancak, birileri önbelleğe alınacak kaynağı değiştirirse, tüm yüz öğelerin toplanması sağlanır:

Observable<Integer> source = Observable.range(0,100).cache(); 

Gözlemlenebilen kaynak üzerinde denetime sahip değilim (önbelleğe alınmış olsun ya da olmasın), bu yüzden onNext içinden kesinlikle nasıl çıkılır?

BTW: onNext içinde yanlış bir şey mi yapıyorsunuz?

(My fiili kullanım durumu onNext yılında Aslında çıktı akımına yazıyorum olması ve bir IOException başka bir şey çıkışına yazılmış olabilir oluştuğunda bu yüzden ben bir şekilde daha ileri işleme durdurmak gerekir.)

cevap

3

önbellek(), ilk abone geldiğinde her şeyi önbelleğe alır ve alt akıştan durdurmak için herhangi bir seçenek sunmaz.

Observable<Integer> source = Observable.range(1, 100); 

PublishSubject<Integer> stop = PublishSubject.create(); 

source 
.doOnNext(v -> System.out.println("Generating " + v)) 
.takeUntil(stop) 
.cache() 
.doOnNext(new Action1<Integer>() { 
    int calls; 
    @Override 
    public void call(Integer t) { 
     System.out.println("Saving " + t); 
     if (++calls == 3) { 
      stop.onNext(1); 
     } 
    } 
}) 
.subscribe(); 

Düzenleme:: Çok fazla tutulmasını önlemek için) önbelleği (önce akışı durdurmak için gereken Yukarıdaki örnek işte 1.0.13'ten altına çalışmayan bir versiyonu olan yapmalıdır:

SerialSubscription ssub = new SerialSubscription(); 

ConnectableObservable<Integer> co = source 
     .doOnNext(v -> System.out.println("Generating " + v)) 
     .replay(); 

co.doOnNext(v -> { 
    System.out.println("Saving " + v); 
    if (v == 3) { 
     ssub.unsubscribe(); 
    } 
}) 
.subscribe(); 

co.connect(v -> ssub.set(v)); 
+0

Bu kod beklendiği gibi çalışmıyor, yüzlerce kez "Oluşturuluyor" ve ardından yüzlerce kez "Kaydetme" yazdırıyor. Sadece dört kez "Tasarruf" diyorum. –

+0

Örneğimi yayınlamadan önce örneğimi çalıştırıyorum, böylece her biri 3 satır basıyor. Denemeye çalıştın mı yoksa akışına sokmayı mı denedin? – akarnokd

+0

Örnek verbatim'i kullandım, sadece ana ve ithalatı ekledim. İki farklı bilgisayarda javarx 1.0.12 üzerinde çalıştırdım ve her zaman yüzlerce kez "Tasarruf" basıyor. –

İlgili konular