2016-03-22 22 views
1

Şu anki gibi görünen bir RDD'ye okuduğum sekmeyle ayrılmış veriye sahibim. Bu örnek için 12 satır olduğunu varsayalım (2010'da her ay için bir tane).Spark (Scala) Veri Genişletme

Veri

1/2010 Red 500 Up 
2/2010 Blue 300 Left 
3/2010 Red 650 Down 
4/2010 Green 200 Left 
5/2010 Blue 250 Right 
6/2010 Blue 300 Up 
...  ... ... ... 

ben etkili büyüklüğünü iki katına aşağıdaki gibi bir şey yaparak daha büyük RDD taklidi oluşturabilir bu verileri kullanmaya çalışıyorum ... myRDD içine okuyun. sadece orijinal 2010 tarihleri ​​yayılan görünür yüzden tarih artırmak istediğiniz kendi üzerine RDD birliği ile

var biggerRDD = myRDD.union(myRDD) 

ancak 2011 de (aslında bir yıl ikinci yarısında tarihleri ​​artan.

ben bunu nasıl emin değilim ve benim girişimleri ile başarısız olmuştur.

+0

Yani Aynı * veriyi tekrarlamak istersiniz * ancak yıl bir ile artmıştır, değil mi? –

+0

RDD'nin imzası nedir? –

+0

@AlbertoBonsanto evet, tam olarak. – Sacrulen

cevap

0

peki ama her yıl birer artırmak için map kullanmak gerekir, aşağıda benim çek örneği.

val rdd = sc.parallelize(List(
    ("1/2010", "Red", 500, "Up"), 
    ("2/2010", "Blue", 300, "Left"), 
    ("3/2010", "Red", 650, "Down"), 
    ("4/2010", "Green", 200, "Left"), 
    ("5/2010", "Blue", 250, "Right"), 
    ("6/2010", "Blue", 300, "Up") 
)) 

val completeRDD = rdd.union(rdd.map{ 
    case (date: String, color: String, value: Int, direction: String) => { 
    val separator = "/" 
    val dt = date.split(separator) 
    val newDate = dt(0) + separator + (dt(1).toInt + 1) 
    (newDate, color, value, direction) 
    } 
}) 

completeRDD.collect 
/* 
(1/2010,Red,500,Up) 
(2/2010,Blue,300,Left) 
(3/2010,Red,650,Down) 
(4/2010,Green,200,Left) 
(5/2010,Blue,250,Right) 
(6/2010,Blue,300,Up) 
(1/2011,Red,500,Up) 
(2/2011,Blue,300,Left) 
(3/2011,Red,650,Down) 
(4/2011,Green,200,Left) 
(5/2011,Blue,250,Right) 
(6/2011,Blue,300,Up) 
*/ 
Bu arada 0

, her gözlem ayrıştırma sonucu böyle bir şey olmalı:

"1/2010\tBlue\t500\tUp".split("\t") 
//res14: Array[String] = Array(1/2010, Blue, 500, Up) 

Ve map ile kolayca işlenebilmektedir bir Tuple dönüşme kolaydır:

val xdd = sc.parallelize(List("1/2010\tBlue\t500\tUp", 
           "2/2010\tRed\t600\tDown")) 
val mdd = xdd.map{ 
    case (str) => { 
    val parsed = str.split("\t") 
    (parsed.head, parsed(1), parsed(2).toInt, parsed.last) 
    } 
} 

mdd.collect 
// res25: Array[(String, String, Int, String)] = Array((1/2010,Blue,500,Up), (2/2010,Red,600,Down)) 
İlgili konular