2016-12-07 20 views
11

Bir satır ve birkaç sütun içeren bir veri çerçevem ​​var. Bazı sütunlar tek değerdir, diğerleri ise listelerdir. Tüm liste sütunları aynı uzunluktadır. Liste dışı bir sütunu olduğu gibi tutarken, her liste sütununu ayrı bir satıra ayırmak istiyorum.Pyspark: Satırlara çoklu dizi sütunlarını bölme

Numune DF:

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')]) 
# +---+---------+---------+---+ 
# | a|  b|  c| d| 
# +---+---------+---------+---+ 
# | 1|[1, 2, 3]|[7, 8, 9]|foo| 
# +---+---------+---------+---+ 

Benim istediğim:

+---+---+----+------+ 
| a| b| c | d | 
+---+---+----+------+ 
| 1| 1| 7 | foo | 
| 1| 2| 8 | foo | 
| 1| 3| 9 | foo | 
+---+---+----+------+ 

Ben sadece bir liste sütunu, bu sadece bir explode yaparak kolay olacağını olsaydı:

df_exploded = df.withColumn('b', explode('b')) 
# >>> df_exploded.show() 
# +---+---+---------+---+ 
# | a| b|  c| d| 
# +---+---+---------+---+ 
# | 1| 1|[7, 8, 9]|foo| 
# | 1| 2|[7, 8, 9]|foo| 
# | 1| 3|[7, 8, 9]|foo| 
# +---+---+---------+---+ 

Ancak, explodec sütununu da kullanmaya çalışırsam, bir dataf ile sonuçlanırım. uzunluğunda ne istiyorum karesini Rame:

df_exploded_again = df_exploded.withColumn('c', explode('c')) 
# >>> df_exploded_again.show() 
# +---+---+---+---+ 
# | a| b| c| d| 
# +---+---+---+---+ 
# | 1| 1| 7|foo| 
# | 1| 1| 8|foo| 
# | 1| 1| 9|foo| 
# | 1| 2| 7|foo| 
# | 1| 2| 8|foo| 
# | 1| 2| 9|foo| 
# | 1| 3| 7|foo| 
# | 1| 3| 8|foo| 
# | 1| 3| 9|foo| 
# +---+---+---+---+ 

I istediğim şey - her sütun için, o sütundaki dizinin inci elemanını almak ve yeni bir satıra eklemek. Bir dataframe tüm sütunlara genelinde patlayabilir haritalama denedim ama bu da işe görünmüyor: DataFrames ile

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF() 

cevap

13

ve UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType 
from pyspark.sql.functions import col, udf 

zip_ = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     # Adjust types to reflect data types 
     StructField("first", IntegerType()), 
     StructField("second", IntegerType()) 
    ])) 
) 

(df 
    .withColumn("tmp", zip_("b", "c")) 
    # UDF output cannot be directly passed to explode 
    .withColumn("tmp", explode("tmp")) 
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d")) 

RDDs olarak:

(df 
    .rdd 
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) 
    .toDF(["a", "b", "c", "d"])) 

Her iki çözüm de Python iletişim yükü nedeniyle verimsizdir. veri boyutu sabittir Eğer böyle bir şey yapabilirsiniz: hatta

from functools import reduce 
from pyspark.sql import DataFrame 

# Length of array 
n = 3 

# For legacy Python you'll need a separate function 
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") 
     for i in range(n)) 
).toDF("a", "b", "c", "d") 

ya:

from pyspark.sql.functions import array, struct 

# SQL level zip of arrays of known size 
# followed by explode 
tmp = explode(array(*[ 
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) 
    for i in range(n) 
])) 

(df 
    .withColumn("tmp", tmp) 
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d")) 

Bu önemli ölçüde daha hızlı UDF veya RDD kıyasla edilmelidir. Her giriş sıranın dışına birden çıktı satırları yapmak istediğiniz gibi flatMap değil map kullanmak Sen gerekirdi

# This uses keyword only arguments 
# If you use legacy Python you'll have to change signature 
# Body of the function can stay the same 
def zip_and_explode(*colnames, n): 
    return explode(array(*[ 
     struct(*[col(c).getItem(i).alias(c) for c in colnames]) 
     for i in range(n) 
    ])) 

df.withColumn("tmp", zip_and_explode("b", "c", n=3)) 
4

: sütunların isteğe bağlı sayıda desteklemek için Genelleştirilmiş.

from pyspark.sql import Row 
def dualExplode(r): 
    rowDict = r.asDict() 
    bList = rowDict.pop('b') 
    cList = rowDict.pop('c') 
    for b,c in zip(bList, cList): 
     newDict = dict(rowDict) 
     newDict['b'] = b 
     newDict['c'] = c 
     yield Row(**newDict) 

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))