2016-08-20 23 views
5

yılında zipwithindex birinin karşılığı aşağıdaki dataframe yaşıyorum varsayarsak:Kıvılcım: dataframe

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)] 
df = sc.parallelize(dummy_data).toDF(['letter','number']) 

Ve şu dataframe oluşturmak istiyorum:

[('a',0),('b',2),('c',1),('d',3),('e',0)] 

Ne yapmam rdd dönüştürmek olduğunu ve zipWithIndex işlevini kullanın ve sonra sonuçları katılmak:

convertDF = (df.select('number') 
       .distinct() 
       .rdd 
       .zipWithIndex() 
       .map(lambda x:(x[0].number,x[1])) 
       .toDF(['old','new'])) 


finalDF = (df 
      .join(convertDF,df.number == convertDF.old) 
      .select(df.letter,convertDF.new)) 

Veri çerçevelerinde zipWIthIndex gibi benzer bir işlev var mı? Bu görevi yapmanın daha verimli bir yolu var mı?

+2

http://stackoverflow.com/q/32760888/1560062 – zero323

cevap

0

Veriler, bu doğrudan işlevsellik paritesi için veri tabanlarında https://issues.apache.org/jira/browse/SPARK-23074'u işaretleyin. Spark'i bir noktada görmek istiyorsanız, jira'yı kullanın.

İşte bir çözüm var PySpark olsa:

abalon paketinde de kullanılabilir
def dfZipWithIndex (df, offset=1, colName="rowId"): 
    ''' 
     Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
     and preserves a schema 

     :param df: source dataframe 
     :param offset: adjustment to zipWithIndex()'s index 
     :param colName: name of the index column 
    ''' 

    new_schema = StructType(
        [StructField(colName,LongType(),True)]  # new added field in front 
        + df.schema.fields       # previous schema 
       ) 

    zipped_rdd = df.rdd.zipWithIndex() 

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) 

    return spark.createDataFrame(new_rdd, new_schema) 

.