2016-04-14 10 views
0

scala içinde rdds birleştirmek için en iyi yöntem nedir, aynı formatın şunlardır:Ben sonuç olarak çoklu RDDs var ve bunları birleştirmek istediğiniz var

: Burada
RDD(id, HashMap[String, HashMap[String, Int]]) 
    ^   ^ ^
    |    |  | 
    identity  category distribution of the category 

o RDD bir örnektir HashMap[String, HashMap] arasında
(1001, {age={10=3,15=5,16=8, ...}}) 

birinci anahtar String istatistik kategorisi ve HashMap[String, HashMap] içinde HashMap[String, Int] olduğu kategori dağılımıdır. Değişiklik kategorilerinin her bir dağılımını hesapladıktan sonra, bunları kimliğe göre birleştirmek istiyorum, böylece sonuçları veritabanına kaydedebilirim. İşte şu anda ne var:

def mergeRDD(rdd1: RDD[(String, util.HashMap[String, Object])], 
       rdd2:RDD[(String, util.HashMap[String, Object])]): RDD[(String, util.HashMap[String, Object])] = { 

    val mergedRDD = rdd1.join(rdd2).map{ 
    case (id, (m1, m2)) => { 
     m1.putAll(m2) 
     (id, m1) 
    } 
    } 
    mergedRDD 
} 
val mergedRDD = mergeRDD(provinceRDD, mergeRDD(mergeRDD(levelRDD, genderRDD), actionTypeRDD)) 

Ben iki rdds her zaman birleştirme, böylece bir işlev mergeRDD yazma Ama bu fonksiyon skalasına bir acemi gibi herhangi ilham takdir, çok şık değil bulundu.

+0

Birleştirme işlevinizin özellikleri nelerdir? – eliasah

+0

@eliasah Cevabınız için teşekkür ederiz, ama özellikleriyle ne kastediyorsunuz? – armnotstrong

cevap

2

Performansı yakalamadan bunu başarmanın kolay bir yolunu göremiyorum. Nedeni, basitçe, iki rdd'yi birleştirmiyorsunuz, bunun yerine, hasaksınızın rdd birleşmesinden sonra konsolide değerlere sahip olmasını istiyorsunuz.

Şimdi, birleştirme işleviniz yanlış. Halihazırda, birleştirme, aslında, içsel birleştirme yapacak, diğerinde bulunmayan rdd'de bulunan satırları yok edecektir.

Doğru yol, bir şey gibi olurdu.

val mergedRDD = rdd1.union(rdd2).reduceByKey{ 
    case (m1, m2) => { 
     m1.putAll(m2) 
     } 
} 
+0

bunu işaret ettiğin için teşekkürler – armnotstrong

0

Sen Oradan scala.collection.immutable.Map

ile java.util.HashMap değiştirebilir: Bu kategoriler rdds arasında örtüşmeyen varsayarak

val rdds  = List(provinceRDD, levelRDD, genderRDD, actionTypeRDD) 
val unionRDD = rdds.reduce(_ ++ _) 
val mergedRDD = unionRDD.reduceByKey(_ ++ _) 

.

İlgili konular