2016-03-24 22 views
2

Bir Spark akış işim var. Giriş RDD'ye filtre uygulamak istiyorum.Spark Streaming - Dinamik olarak süz

Her bir kıvılcım akışı toplu işlemi sırasında Hbase'den her seferinde filtre ölçütlerini dinamik olarak almak istiyorum.

Bunu nasıl başarabilirim?

Harita bölümlerini bir kez kullanarak bağlantı nesnesi oluşturabilirim.

Ancak kıvılcım filtresinde aynı şeyi nasıl elde edebilirim?

DStream<Integer> intDstream= someIntegerIntoDStream; 
intDstream.foreachPartition{ 
    create HBase connection here if you need it for a batch 
    while(arg0.hasNext()){ //here you have an iterator 
      Integer current = arg0.next(); 
      create HBase connection here if you need it for each element 
      //Here is your filter function: 
      if(current meets your condition) 
       arg0.remove(); 

ne olur size uygulamakla üzerinde çalışan ve el ile her eleman toplama, bir durumun uygulanması şudur:

+0

Filtre ölçütlerinize bağlı olarak, bunu "birleştir" ile başarabilirsiniz. Yapmaya çalıştığınız şeyin daha tam bir örneğini vermek zorundasınız, ancak birleşimin sol tarafı Spark Akışınızsa, sağ taraf bir dizi kriter olacaktır. Ölçütlerden hiçbiri geçerli değilse, katılma hiçbir satır içermez - bunları filtreler. –

cevap

0

Birazdan yaklaşım kendi (pseudocode) bir filtre işlevi yazıyor, düşünmek Kriterleri karşılarsa ve onu kaldırmak.