2016-04-08 14 views
0

İlk olarak, Spark ve Python için yeni bir kullanıcıyım. Bir RDD'yi (esnek dağıtılmış veri kümesi) bir diğerine dönüştürmeye çalışıyorum.PySpark IPython - RDD'yi farklı tuşlarla yeni bir RDD'ye dönüştürün

giriş RDD geçerli:

Şimdi
(u'Task1', (u'James', 10)), 
(u'Task1', (u'James', 15)), 
(u'Task1', (u'James', 18)), 
(u'Task1', (u'James', 11)), 
(u'Task1', (u'Oliver', 10)), 
(u'Task1', (u'Oliver', 15)), 
(u'Task2', (u'Oliver', 18)), 
(u'Task2', (u'Oliver', 11)), 

ben her kişi için saat toplamını verir bir işlevi oluşturmak çalışıyorum, görevin olursa olsun:

def extract_time_tracking(time_bookings): 
    ??? 
    return (person, total_hours) 
time_trackings_sum = input_RDD.???(extract_time_tracking) 

çıktı olmalıdır:

(u'James', 54), # has been working on Task1 only 
(u'Oliver', 54), # has been working on Task1 and Task2 

PySpark IPython kullanıyorum. Ben CombineByKey'i düşünüyordum ya daByKey'i düşürdüm, ama her zaman aynı anahtarı kullanıyorlar. Ama benim durumumda sonuç anahtarı giriş anahtarından farklı?!?!?

Yardımlarınız için teşekkür ederiz.

cevap

1

Nesneyi dönüştürmek için nesneyi dönüştürmek üzere map işlevini kullanın. Bu görevi önemsemediğinden, aslında tamamen bırakabilirsiniz.

input_RDD.map(lambda x: x[1]).reduceByKey(lambda x,y: x+y) 

, daha sonra, sadece anahtar kaydırmak istiyorsanız, daha karmaşık bir harita yapacağını:

input_RDD.map(lambda x: (x[1][0],(x[0],x[1][1])) 
+0

ilk harita işlevinde lambda biraz daha açıklayabilir misiniz? neden x [1]? tuple görevi olurdu, Matthias

+1

@Matthias Supposing x = (u'Task1 ', (u'James', 10)), x [0] u'Task1 olacak ve x [1] olacaktır (u'James ', 10), [normal tuple işlevselliği] kullanarak (https://docs.python.org/2/tutorial/datastructures.html#tuples-and-sequences). Saatleri ada göre hesaplamak istediğimizden, ihtiyaç duyduğumuz iki bilgi. (Bu sorunun diğer bölümlerinde daha fazla yorum geliyor.) –

+0

"map" hakkında düşünmenin yolu, RDD'leri yazdığınız keyfi bir işlevi kullanarak dönüştürmesidir. Nesneleri X harfi olan bir RDD ile başlatırsanız ve Y türüne sahip olmak istiyorsanız, bir X'i bir Y'ye dönüştürecek bir harita yazarsınız ve sonra bunu satır satır sırayla paralel olarak uygularsınız. Bu problemin sadece basit bir dönüşüme ihtiyacı var - görevi atmak. Bunun yerine görevler ile özetlemek istediğimizi varsayalım ve bu isimleri bu şekilde ele alalım. Daha sonra aşağıdakileri yapıyoruz: 'input_RDD.map (lambda x: (x [0], x [1] [1]) .BasıkByKey (lambda x, y: x + y)' –

0
def extract_time_tracking(time_bookings): 
val splits = rec.split(",") 
val person = splits(1).replaceAll(" \\(u'", "").replaceAll("'", "") 
val total_hours = splits(2).replaceAll("\\)", "").trim().toInt 
return (person, total_hours) 


input_RDD.map(extract_time_tracking).reduceByKey 

Ben scala kullanıyorum, bu nedenle lütfen sözdizimini kontrol edin.