2015-02-13 22 views
9

Bazı web hizmetlerine karşı girişi denetleyerek, gözlemlenebilir tarafından yayılan girdileri filtrelemem gerekiyor. Normal gözlemlenebilir.filter operatörü, yükleme işlevinin eşzamanlı olarak karar vermesini beklediği için burada uygun değildir, ancak bu durumda, karar yalnızca eşzamansız olarak alınabilir.RxJs'de "async" filtre operatörünün sürümü var mı?

Aşağıdaki kodla geçiş yapabilirim, ancak bu dava için kullanabileceğim daha iyi bir operatör olup olmadığını merak ediyordum.

someObservable.flatmap(function(entry) { 
    return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) { 
    return { 
     verdict: verdict, 
     entry: entry 
    }; 
    }); 
}).filter(function(obj) { 
    return obj.verdict === true; 
}).map(function(obj) { 
    return obj.entry; 
}); 
+0

bir (sigara) iş durumu, bir şey 'kopyala/paste'-muktedir biz hiç thte kod örneği alabilir miyim yardım etmek için çalışabilir miyim? –

+0

RxJs dağıtımında bir operatör olarak var olduğunu sanmıyorum. Ancak, yukarıdaki kodu kolayca alabilir ve kendi yeniden kullanılabilir 'filterAsync' operatörünüze dönüştürebilirsiniz. – Brandon

cevap

9

Burada mevcut operatörleri kullanarak bu tür bir operatör uygulamak istiyorum nasıl. Düşünmen gereken tek bir sapık var. Filtre işleminiz uyumsuz olduğundan, yeni öğelerin filtre işleminizden daha hızlı gelmesi mümkündür. Bu durumda ne olmalı? Filtreleri sırayla çalıştırmak ve öğelerinizin sırasının korunmasını garanti etmek ister misiniz? Filtreleri paralel olarak çalıştırmak ve öğelerinizin farklı bir sırada çıkabileceğini kabul etmek ister misiniz? İşte

operatörün 2 sürümleridir

// runs the filters in parallel (order not guaranteed) 
// predicate should return an Observable 
Rx.Observable.prototype.flatFilter = function (predicate) { 
    return this.flatMap(function (value, index) { 
     return predicate(value, index) 
      .filter(Boolean) // filter falsy values 
      .map(function() { return value; }); 
    }); 
}; 

// runs the filters sequentially (order preserved) 
// predicate should return an Observable 
Rx.Observable.prototype.concatFilter = function (predicate) { 
    return this.concatMap(function (value, index) { 
     return predicate(value, index) 
      .filter(Boolean) // filter falsy values 
      .map(function() { return value; }); 
    }); 
}; 

Kullanımı:

var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc); 
someObservable.concatFilter(predicate).subscribe(...); 
+1

Bens'in çözümü işe yarıyor ama bence Brandon'ın daha açık. Ben bunu seçiyorum. – xwk

+1

Tatlı, teşekkürler. Bu hala en güncel çözüm mü? – Pipo

2

Bildiğim kadarıyla değil. Kendi başını alabilirsin. Bu test, ama işte bir fikir değil:

Observable.prototype.flatFilter = function(predicate) { 
    var self = this; 
    return Observable.create(function(obs) { 
    var disposable = new CompositeDisposable(); 
    disposable.add(self.subscribe(function(x) { 
     disposable.add(predicate(x).subscribe(function(result) { 
     if(result) { 
      obs.onNext(x); 
     } 
     }, obs.onError.bind(obs))); 
    }, obs.onError.bind(obs), obs.onCompleted.bind(obs))); 
    return disposable; 
    }); 
}; 

Ve bunu gibi kullanabilirsiniz:

someStream.flatFilter(function(x) { 
    return Rx.DOM.get('/isValid?check=' + x); 
}).subscribe(function(x) { 
    console.log(x); 
}); 
+0

lol ... Neden "flatFilter" dediğim hakkında hiçbir fikrim yok, sanırım "flatMap" ile yapmayı seviyorum. ;) –

+0

Ben, çözüm için teşekkürler. Ancak, bana ilk obs.onCompleted.bind (obs) doğru değil gibi görünüyor. Görünüşe göre geri getirilen gözlemlenenin tamamlanması (x), flatFilter() tarafından döndürülen gözlemlenebilirliği hatalı olarak tamamlayacaktır. Yanlış mıyım? – xwk

+0

Evet, bu bir gözetimdi. İlk onCompleted kaldırılabilir. OnError'ı orada bıraktım. –

İlgili konular