2016-06-03 21 views
10

'daki sütunların bir listesi boyunca satır sütunları ekleme Birkaç sütun içeren bir Spark veri çerçevem ​​var. Veritabanına belirli bir sayıda sütunun toplamı olan bir sütun eklemek istiyorum. ÖrneğinSpark Dataframe

verilerim şöyle görünür:

ID var1 var2 var3 var4 var5 
a 5  7 9 12 13 
b 6  4 3 20 17 
c 4  9 4 6 9 
d 1  2 6 8 1 

Bir sütun belirli sütunlar için satırları toplayarak eklendi istiyorum:

ID var1 var2 var3 var4 var5 sums 
a 5  7 9 12 13 46 
b 6  4 3 20 17 50 
c 4  9 4 6 9  32 
d 1  2 6 8 10 27 

Ben eğer birlikte sütunları eklemek mümkündür biliyorum

val newdf = df.withColumn("sumofcolumns", df("var1") + df("var2")) 

Ama çölü listesini geçmek mümkündür: eklemek belirli sütunları biliyorum mn isimleri ve onları bir araya ekleyin? Ben istiyorum ama bunun yerine skalası piton API kullanarak ne temelde bu cevabın kapalı Tabanlı (Add column sum as new column in PySpark dataframe) Böyle bir şey işe yarar mı:

//Select columns to sum 
val columnstosum = ("var1", "var2","var3","var4","var5") 

// Create new column called sumofcolumns which is sum of all columns listed in columnstosum 
val newdf = df.withColumn("sumofcolumns", df.select(columstosum.head, columnstosum.tail: _*).sum) 

Bu hata değeri toplamı üyesi değildir atar org.apache.spark.sql.DataFrame. Sütunlarda toplamı bir yolu var mı? Yardımlarınız için şimdiden

teşekkürler.

cevap

15

Aşağıdaki denemelisiniz:

import org.apache.spark.sql.functions._ 

val sc: SparkContext = ... 
val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._ 

val input = sc.parallelize(Seq(
    ("a", 5, 7, 9, 12, 13), 
    ("b", 6, 4, 3, 20, 17), 
    ("c", 4, 9, 4, 6 , 9), 
    ("d", 1, 2, 6, 8 , 1) 
)).toDF("ID", "var1", "var2", "var3", "var4", "var5") 

val columnsToSum = List(col("var1"), col("var2"), col("var3"), col("var4"), col("var5")) 

val output = input.withColumn("sums", columnsToSum.reduce(_ + _))) 

output.show() 

Sonra sonucudur:

+---+----+----+----+----+----+----+ 
| ID|var1|var2|var3|var4|var5|sums| 
+---+----+----+----+----+----+----+ 
| a| 5| 7| 9| 12| 13| 46| 
| b| 6| 4| 3| 20| 17| 50| 
| c| 4| 9| 4| 6| 9| 32| 
| d| 1| 2| 6| 8| 1| 18| 
+---+----+----+----+----+----+----+ 
7

Sade ve basit:

import org.apache.spark.sql.Column 
import org.apache.spark.sql.functions.{lit, col} 

def sum_(cols: Column*) = cols.foldLeft(lit(0))(_ + _) 

val columnstosum = Seq("var1", "var2", "var3", "var4", "var5").map(col _) 
df.select(sum_(columnstosum: _*)) 

Python eşdeğer:

from functools import reduce 
from operator import add 
from pyspark.sql.functions import lit, col 

def sum_(*cols): 
    return reduce(add, cols, lit(0)) 

columnstosum = [col(x) for x in ["var1", "var2", "var3", "var4", "var5"]] 
select("*", sum_(*columnstosum)) 

Satırda eksik bir değer varsa her ikisi de NA'ya döner. Bunu önlemek için DataFrameNaFunctions.fill veya coalesce işlevini kullanabilirsiniz.

0

İşte hoş bir çözüm kullanarak piton var:

NewDF = OldDF.withColumn('sums', sum(OldDF[col] for col in OldDF.columns[1:])) 

Umarım bu Spark benzer bir şey ... kimseyi etkileyecektir ?.

0

Veritabanınız olduğunu varsayalım df. Öyleyse kimlik kodunuz hariç tüm cols'ları toplayabilirsiniz. Çok sayıda kodunuz olduğunda ve yukarıda belirtilen herkes gibi tüm sütunların adlarını elle söylemek istemediğinizde bu yardımcı olur. This post aynı cevaba sahiptir.

val sumAll = df.columns.collect{ case x if x != "ID" => col(x) }.reduce(_ + _) 
df.withColumn("sum", sumAll)