2016-03-21 18 views
1

zip reddediyorum benKıvılcım RDD

org.apache.spark.SparkException Spark ile aşağıdaki kodu çalıştırarak son satırında aşağıdaki istisna: Her bölüm elemanların aynı sayıda RDDs zip sadece Can

val rdd1 = anRDD 
val rdd2 = AnotherRDD 

println(rdd1.count() == rdd2.count()) // Write true 
val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions 

val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless) 
val rdd2Bis = rdd2.repartition(nparts) 

val zipped = rdd1Bis.zip(rdd2Bis) 
println(zipped.count()) 

Sorun nedir?

PS: Ben sıkıştırma önce rdd1 ve rdd2 toplamak eğer çalıştığı ama RDD olarak tutmak gerekir

bu kontrol işleri
+1

Eğer bir bölünmeyi atlarsanız, zip çalışır? –

+1

Hayır. – Benjamin

+1

Bir bölümlemenin her bölümdeki aynı sayıda elemanla, yalnızca aynı sayıda benzer boyutta bölümle sonuçlanacağının bir garantisi olmadığına inanıyorum. ZipPartitions'ı kullanabilir misiniz? "Bu RDD'nin bölümlerini bir (veya daha fazla) RDD (ler) ile sıkıştırın ve sıkıştırılmış bölümlere bir işlev uygulayarak yeni bir RDD döndürün. Tüm RDD'lerin * aynı sayıda bölüme * sahip olduğunu varsayar, ancak * bunları * gerektirmez her bölümdeki aynı sayıda elemanın bulunması " –

cevap

0

: Eğer

val list1 = List("a","b","c","d") 
val list1 = List("a","b","c","d") 
val rdd1 = sc.parallelize(list1) 
val rdd1 = sc.parallelize(list2) 

yürütülmesi için başarısız hangi bölümünün yanıt verin ur kodu:

val nparts = rdd1.getNumPartitions + rdd2.getNumPartitions 
val rdd1Bis = rdd1.repartition(nparts) // Try to repartition (useless) 
val rdd2Bis = rdd2.repartition(nparts) 
val zipped = rdd1Bis.zip(rdd2Bis) 

Sonuç:

println(zipped.count()) 
4 
zipped.foreach(println) 
(a,a) 
(b,b) 
(c,c) 
(d,d) 
+0

Bu küçük bir veri kümesi ile yeniden bölümlemeyle ilgili sorunların ortaya çıkacağından şüpheleniyorum. Çalışmak için çok daha büyük bir liste verebilir misiniz? –

+0

Bu, listeleriniz aynı olduğundan çalışacaktır. Örneğimde rdd1 ve rdd2, orijinal olarak aynı RDD'ye dayanıyor, ancak bunlardan biri (diyelim ki rdd1) harici bir kütüphaneyle dönüştürüldü. Dönüşümün sonunda, hala aynı sayıda eleman içeriyor. – Benjamin

+0

İstediğiniz takdirde çalışmayacağınız bir örnek vereyim, rdd1'den 3'e ve rdd2'den 4'e kadar olan bölümlerin sayısını ayarlamayı deneyin. – eliasah

2

bir çözüm ile zip olabilecek katılmak:

val rdd1Bis = rdd1.zipWithIndex.map((x) =>(x._2, x._1)) 
val rdd2Bis = rdd2.zipWithIndex.map((x) =>(x._2, x._1)) 
val zipped = rdd1Bis.join(rdd2Bis).map(x => x._2) 
İlgili konular