2015-12-07 23 views
5

İki RDDs PySpark sahip tarih verilerine dayanarak satırlarının toplamını Nasıl yapılır?tek RDD ila 2 RDDs sütunlarını ekleyin ve sonra PySpark

RDD1:

[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....] 

RDD2:

[(u'41',u'42.0'),(u'24',u'98.0'),....] 

İkisi RDDs aynı sayıda veya satırları var. Şimdi yapmak istediğim, her satırdaki tüm sütunları RDD1'den (unicode'dan normal string'a dönüştürülür) ve RDD2'deki her satırdaki 2. sütunu (unicode string'dan float'a dönüştürülür) almak ve bununla birlikte yeni bir RDD oluşturmaktır. Yani yeni RDD aşağıdaki gibi görünecektir:

RDD3:

[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....] 

o zaman yapıldıktan sonra ben bu yeni RDD3 her satır (şamandıra değeri) son değerin aggregation yapmak istiyorum 1. sütuntaki date değeri. Bu, date'un 2013-01-31 00:00:00 olduğu tüm satırların mans, son sayısal değerleri eklenmelidir.

Bunu PySpark'ta nasıl yapabilirim? Sorunuzun ilk bölümü için

+1

Onlara katılmak için bir anahtar yok, dolayısıyla ben zip olurdu sanırım ... –

+0

@AlbertoBonsanto sen bunu nasıl yapabilirim ben gösterebilirim? –

+0

@AlbertoBonsanto will rdd3 = izip (rdd1.toLocalIterator(), rdd2.toLocalIterator()) 'yeterli mi? –

cevap

0

, yani her satır 7'lik bir başlık olur içine bir iki rdds birleştirerek, görebilmek için bunu yapabilirsiniz:

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,b,c,d,e,f,g)) 

Eninde sonunda ne gerek emin değilim , sadece tarih ve ikinci değerin toplamı mı? Eğer öyleyse, tüm değerleri gerekmez:

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,g)) 
rdd4 = rdd3.reduceByKey(lambda x, y: x+y) 
+0

evet rdd4'te (toplamadan sonraki değer) –

+0

toplayıcısından sonra tarih ve son değere ihtiyacım var, bu cevap sizin için çalışıyor mu, yoksa Bu konuda daha fazla yardım? –

2

Sen RDDszipWithIndex gerekir, bu yöntem bu nedenle katılabilir, verilerinizi ve bu girişin dizinini temsil başka bir değere sahip bir tuple oluşturur her ikisi de index tarafından RDDs.

Kişisel yaklaşımı (Ben daha verimli yollar vardır bahis) benzer olmalıdır:

rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"]) 
rdd2 = sc.parallelize(xrange(5)) 

zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v)) 
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v)) 

print zdd1.join(zdd2).collect() 

çıkışı olacaktır: [(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))], bu sadece bir map veriyi yeniden oluşması için gereklidir sonra. Örneğin. Aşağıda:

combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v) 
print combinedRDD.collect() 

# You can use the .zip method combinedRDD = rdd1.zip(rdd2) 

çıkışı olacaktır: veri türü dönüştürme Hakkında [(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]

, daha önce böyle bir sorun vardı ve ben this snippet kullanmak bu çözmek için.

import unicodedata 

convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1) 
             .encode('ascii','ignore'), v2) 

combinedRDD = combinedRDD.map(convert) 
print combinedRDD.collect() 

Will çıkışı: [('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]