2014-11-29 19 views
6

değiştiğinde yayılır Sık akış değerlerine sahip bir akımım var ve diğeri daha yavaş olanlarla. Onları birleştirmek istiyorum, ancak daha yavaş olanı yaydığında sadece bir değer yayar. Yani combineLatest çalışmıyor. böylece gibi: Birleşik akışları yalnızca akışlardan biri

a1 
a2 
b1 
(a2,b1) 
a3 
a4 
a5 
b2 
(a5,b2) 

Şu anda bu gibi yapıyor şu ediyorum temizleyici bir yolu var mı?

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = 
    Observable({ o => 
    var last : A 
    fast.subscribe({a => last = a}) 
    slow.subscribe({b => o.onNext((last,b))}) 
    }) 

düzenlemek: Bu operatör Rx şimdi ve withLatestFrom denir.

+0

Üzgünüz. Önceki cevabım yanlış. Onu sildim. – zsxwing

+1

Tam olarak bu soru RxJava'nın konu tracker sordu: https://github.com/ReactiveX/RxJava/issues/405 Ama cevaplar tatmin edici değildi ... –

cevap

4

Aradığın, "combinePrev" adını verdiğim, API'da bulunmayan ama birçok durumda çok gerekli olan bir birleştirici. sample operatörü yakın geliyor, ancak iki akışı birleştirmiyor. I've also missed "combinePrev" in RxJS. Bu, "combinePrev" ("withLatest") uygulanmasını çıkıyor basittir ve sadece harita ve anahtarı bağlıdır:

İşte
withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = { 
    val hotSlow = slow.publish.refCount 
    fast.map({a => hotSlow.map({b => (a,b)})}).switch 
} 

RxJS uygulanan aynı operatörün bir jsfiddle örneğidir.

implicit class RXwithLatest[B](slow: Observable[B]) { 
    def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */ 
} 

Not: slowhot olmalıdır operatör Rx olmasa da sen slow.withLatest(fast) kullanabilmesi

, bir örtük sınıf kullanabilirsiniz. slow soğuksa Gözlemlenebilir, withLatest çalışmıyor.

+0

Sadece bir değil görmek güzel. Umarım bu, RxJava'ya eklenir. Ancak şu anki cevaplarınızda 'B' nin 'yavaş' ve 'A'nın' hızlı 'olması gerektiğini tahmin ediyorum. – dtech

+0

Scala ve değişken adlarınızı düzeltin, ancak aksi halde mükemmel şekilde çalışır. Teşekkürler! – dtech

+0

Evet, B yavaş ve Hızlı olmalı, içeriği getirmemek için üzgünüm. Ve evet, bir @Beta operatörü olarak RxJava'ya eklenmeli, bu github sorununu kontrol et: https://github.com/ReactiveX/RxJava/issues/405 –