2016-03-21 18 views
1

Bir RDD'm var ve ona daha fazla RDD eklemek istiyorum. Spark’de bunu nasıl yapabilirim? Aşağıda kodum var. RDD'yi sahip olduğum dStream'den geri almak istiyorum.Spark'deki mevcut RDD'ye nasıl RDD eklenir?

JavaDStream<Object> newDStream = dStream.map(this); 
JavaRDD<Object> rdd = context.sparkContext().emptyRDD(); 
return newDStream.wrapRDD(context.sparkContext().emptyRDD()); 

ben çok dokümantasyonu Apache Kıvılcım tarafından sağlanan JavaDStream sınıfının yaklaşık wrapRDD yöntemini bulmuyorum.

cevap

1

Sen JavaStreamingContext.queueStream kullanabilir ve Queue<RDD<YourType>> ile doldurun: RDD yana

public JavaInputDStream<Object> FillDStream() { 
    LinkedList<RDD<Object>> rdds = new LinkedList<RDD<Object>>(); 
    rdds.add(context.sparkContext.emptyRDD()); 
    rdds.add(context.sparkContext.emptyRDD()); 

    JavaInputDStream<Object> filledDStream = context.queueStream(rdds); 
    return filledStream; 
} 
+0

JavaRDD Listesini tek bir JavaRDD'ye dönüştürebilir miyim? –

+0

Evet. 'JavaRDD.union' kullanabilirsiniz. –

+0

birliği bana Dstream'i verecek, ancak JavaRDD'yi yöntemimin geri dönüş tipi olduğu için istiyorum. –

1

değişmez olduğunu, ne yapmak yeni bir RDD oluşturup yenisini dönmek için sparkContext.parallize kullanmaktır.

List<Object> objectList = new ArrayList<Object>; 
objectList.add("your content"); 

JavaRDD<Object> objectRDD = sparkContext.parallize(objectList); 
JavaRDD<Object> newRDD = oldRDD.union(objectRDD); 
İlgili konular