2016-04-06 15 views
0

Kullanıcı bilgisinin olduğu diziyle çalışıyorum. Bu dizelere benzersiz tamsayı değerleri atamak istiyorum.Bir pyspark veri çerçevesi sütununu RDD of tuple ile güncelleme

Bu yığın taşması sonrası here takip ediyordum. Ben sonuçta üzerinde ALS modeli çalıştırılır yapmak istiyorum ne

data = data.map(lambda x: Rating(int(user.lookup(x[0])), int(x[1]), float(x[2]))) 

yaptım Bundan sonra

user = data.map(lambda x:x[0]).distinct().zipWithUniqueId() 

, ama şu ana kadar: Ben dizilerini bir RDD olması aşağıdaki ifadeyi kullanıyorum

Bu hata mesajını alıyorum: Bir RDD yayınlamaya çalıştığınız veya bir işlem veya dönüştürme işleminden bir RDD'ye başvurduğunuz anlaşılıyor.

Veri türü bir şekilde yanlış olduğunu düşünüyorum, ancak bunu nasıl düzelteceğimi bilmiyorum. Bağlantılı cevabın önerdiği

+1

burada 2 sorunları vardır. Birincisi DataFrame'deki değerleri güncellemek istiyor, bu imkansız! DataFrame değişmezdir, mevcut olandan güncelleme dönüşümü ile yeni bir tane oluşturmanız gerekir. İkincisi, bir RDD'yi başka bir RDD dönüşümü içine yerleştiremezsiniz. RDD'niz küçükse yayın değişkenini düşünebilirsiniz. – eliasah

+0

@eliasah Girişiniz için teşekkür ederiz. newData = data.map (lambda x: Rating (int (user.lookup (x [0])), int (x [1]), float (x [2]))) çalışacak mı, yoksa yapmak zorunda mıyım df = sqlContext.createDataFrame (?, [cols]) gibi bir şey, nerede yerine malzeme koymak için emin değilim. İkinci kısımda ise, RDD dönüşümünün içinde RDD'yi nerede yerleştiriyorum? Verilerim aslında oldukça büyük. – user2857014

+1

Bu işe yarayabilir, denemeniz gerekecek! Yorumda kod okuyamıyorum. İkinci kısım için, kullanıcı değeriniz bir RDD'dir. İşte burada RDD'yi yerleştirmeye çalıştığınız yer. – eliasah

cevap

1

lookup yaklaşımı sadece geçersizdir. Spark, iç içe eylemi ve dönüşümü desteklemez, böylece map'un içinde RDD.lookup'u arayamazsınız. veri büyük ise sadece join ve yeniden şekillendirmek olabilir aramalar için standart Python dict kullanılarak ele alınması:

from operator import itemgetter 
from pyspark.mllib.recommendation import Rating 

data = sc.parallelize([("foo", 1, 2.0), ("bar", 2, 3.0)]) 

user = itemgetter(0) 

def to_rating(record): 
    """ 
    >>> to_rating((("foobar", 99, 5.0), 1000)) 
    Rating(user=1000, product=99, rating=5.0) 
    """ 
    (_, item, rating), user = record 
    return Rating(user, item, rating) 

user_lookup = data.map(user).distinct().zipWithIndex() 

ratings = (data 
    .keyBy(user) # Add user string as a key 
    .join(user_lookup) # Join with lookup 
    .values() # Drop keys 
    .map(to_rating)) # Create Ratings 

ratings.first() 
## Rating(user=1, product=1, rating=2.0) 
İlgili konular