2016-09-12 15 views
6

Ben RDD [Satır]:Scala: String değerleri için GroupBy toplamı nasıl yapılır?

|---itemId----|----Country-------|---Type----------| 
    |  11  |  US   |  Movie  | 
    |  11  |  US   |  TV   | 
    |  101  |  France  |  Movie  |  

her satır ayrı json nesnesi (RDD her satır) nerede olduğunu json Listesi olarak kaydedebilirsiniz böylece GroupBy ItemID Nasıl yapılır?

{"itemId" : 11, 
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} }, 
{"itemId" : 101, 
"Country": {"France" :1 },"Type": {"Movie" :1} } 

RDD:

denedim:

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../"  
val input = sc.textFile(mappingPath) 

girdi listesidir

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 


val events = sqlContext.sql("select itemId EventList") 

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val çountryInfo = showTitleInfo(MappingsList.get(itemId)); 
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() 
    val type = countryInfo.getType() 

    Row(itemId, country, type) 
     }) 

bazı biri bana bunu başarmak nasıl bildirin Can: Ben JSON ayrıştırma ve dönüştürme ilgilenir MappingUtils kullanarak CountryInfo POJO sınıfına eşleme ediyorum her satır json olduğu jsons?

Teşekkür ederiz!

+0

RDD [Satır] bir DataFrame/DataSet'den mi geldi? RDD ile çalışmak [Row] hala ideal olmamasına rağmen ideal değildir. –

+0

RDD'yi veri kümesinden oluşturdum. –

+0

@ASpotySpot RDD'imle güncellendi –

cevap

3

Bunu tamamlamak için fazladan zaman ödeyemem, ama size bir başlangıç ​​verebilir.

Buradaki fikir, RDD[Row]'u JSON yapınızı temsil eden tek bir Haritaya toplamanızdır.

  1. seqOp

    nasıl hedef türü hedef türlerinin iki birleştirme nasıl
  2. combOp içine unsurların bir koleksiyon katlamak için: Toplama iki işlev parametreleri gerektiren bir yönlüdür. Eğer seqOp görülen değerlerin sayısını biriktirmek gerekir olarak, birleştirme sırasında

zor kısmı combOp geliyor. Bunu bir egzersiz olarak bıraktım çünkü yakalayacak bir uçağım var! Umarım bir sorun olduğunda başkası boşlukları doldurabilir.

case class Row(id: Int, country: String, tpe: String) 

    def foo: Unit = { 

    val rows: RDD[Row] = ??? 

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = { 
     acc.get(r.id) match { 
     case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1))) 
     case Some((countries, types)) => 
      val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1) 
      val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1) 
      acc.updated(r.id, (countries_, types_)) 
     } 
    } 

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])] 

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = { 
     l.foldLeft(z) { case (acc, (id, (countries, types))) => 
      r.get(id) match { 
      case None => acc.updated(id, (countries, types)) 
      case Some(otherCountries, otherTypes) => 
       // todo - continue by merging countries with otherCountries 
       // and types with otherTypes, then update acc 
      } 
     } 
    } 

    val summaryMap = rows.aggregate(z) { seqOp, combOp }