2016-01-25 19 views
5

SFDC'den basit bir satış paketi paketi kullanarak çıkarılan verilerle çalışıyorum. Komut dosyası ve Spark 1.5.2 için Python3 kullanıyorum. pyspark kullanarak tuples listesinden DataFrame oluştur

Ben aşağıdaki verileri içeren bir RDD oluşturuldu:

[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')] 
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')] 
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')] 
... 

Bu veriler RDD denilen v_rdd

Benim şema şöyle görünür: Ben DataFrame oluşturmaya çalışıyorum

StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true))) 

Bu RDD'nin dışında:

benim DataFrame yazdırmak:

sqlDataFrame.printSchema() 

Ve aşağıdaki olsun:

+--------------------+--------------------+--------------------+ 
|     Id| PackSize|       Name| 
+--------------------+--------------------+--------------------+ 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...| 

böyle, gerçek verileri görmek için bekliyorum:

+------------------+------------------+--------------------+ 
|    Id|PackSize|       Name| 
+------------------+------------------+--------------------+ 
|a0w1a0000003xB1A |    1.0|  A   | 
|a0w1a0000003xAAI |    1.0|  B   | 
|a0w1a00000xB3AAI |    30.0|  C   | 

Bana tespit yardım edebilir Burada yanlış yapıyorum.

Python betiğimde uzun bir süre var, insanların eline geçmesinin elverişli bir durum olmadığından emin değilim, bu yüzden yalnızca sorun yaşadığım bölümleri gönderdim.

Teşekkür bir ton önceden!

cevap

12

Bir dahaki sefere bir çalışma örneği sunabilir miydiniz? Bu daha kolay olurdu.

RDD'nizin nasıl sunulduğu, bir DataFrame oluşturmak için temelde tuhaftır. Spark Documentation'a göre bir DF oluşturursunuz.

>>> l = [('Alice', 1)] 
>>> sqlContext.createDataFrame(l).collect() 
[Row(_1=u'Alice', _2=1)] 
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect() 
[Row(name=u'Alice', age=1)] 

Yani bunun yolu gibi İstenen çıktıyı oluşturmak için örnek ilişkin:

# Your data at the moment 
data = sc.parallelize([ 
[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')], 
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')], 
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')] 
    ]) 
# Convert to tuple 
data_converted = data.map(lambda x: (x[0][1], x[1][1], x[2][1])) 

# Define schema 
schema = StructType([ 
    StructField("Id", StringType(), True), 
    StructField("Packsize", StringType(), True), 
    StructField("Name", StringType(), True) 
]) 

# Create dataframe 
DF = sqlContext.createDataFrame(data_converted, schema) 

# Output 
DF.show() 
+----------------+--------+----+ 
|    Id|Packsize|Name| 
+----------------+--------+----+ 
|a0w1a0000003xB1A|  1.0| A| 
|a0w1a0000003xAAI|  1.0| B| 
|a0w1a00000xB3AAI| 30.0| C| 
+----------------+--------+----+ 

Umut bu

yardımcı olur