5

'da sıkıştırmak için İki tane DataFramea ve b var. Yapabileceğim nasılİki (veya daha fazla) DataFrame'i Spark

Column 1 | Column 2 | Column 3 
abc  | 123  | 1 
cde  | 23  | 2 

:

Column 1 | Column 2 
abc  | 123 
cde  | 23 

b ben gibi bir şey olur ki a ve b (hatta daha çok) DataFrames zip istiyorum

Column 1 
1  
2  

gibidir gibi a olduğunu o?

+0

iki dataframes satır aynı # sahip olduğunu varsaymak güvenli mi: Bu, bu uygulama benim saplama kodudur? –

cevap

16

Böyle bir işlem bir DataFrame API'si tarafından desteklenmez. Bu, zip iki RDD için mümkündür, ancak çalışmasını sağlamak için, bölüm başına sayı ve parça sayısı eşleştirmeniz gerekir. Bu durumda olduğunu varsayarsak:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, StructType, LongType} 

val a: DataFrame = sc.parallelize(Seq(
    ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") 
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") 

// Merge rows 
val rows = a.rdd.zip(b.rdd).map{ 
    case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} 

// Merge schemas 
val schema = StructType(a.schema.fields ++ b.schema.fields) 

// Create new data frame 
val ab: DataFrame = sqlContext.createDataFrame(rows, schema) 

Yukarıdaki koşullar bir dizin ekleme ve katılmak olduğunu akla gelen tek seçenek yerine getirilmezse:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
    // Add index 
    df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, 
    // Create schema 
    StructType(df.schema.fields :+ StructField("_index", LongType, false)) 
) 

// Add indices 
val aWithIndex = addIndex(a) 
val bWithIndex = addIndex(b) 

// Join and clean 
val ab = aWithIndex 
    .join(bWithIndex, Seq("_index")) 
    .drop("_index") 
+0

'DataFrame'de' withColumn' hakkında ne dersiniz? – Reactormonk

+0

@Reactormonk Burada nasıl kullanırsınız? – zero323

+0

Sütunu df b'den almak için .column ve sonra da eklemek için Colt sütununu kullanın. Denemedim ve Spark'un bunu desteklemediğini hayal edebiliyorum. –

1

Dataframes ait Scala'nın uygulamasında, basit yoktur iki veri çerçevesini bire birleştirme yolu. Veri çerçevelerinin her satırına indeks ekleyerek bu sınırlamayı basitçe halledebiliriz. Daha sonra, bu endekslerle bir iç birleştirme yapabiliriz.

val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") 
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) 

val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") 
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) 

aWithId.join(bWithId, "id") 

A little light reading - Check out how Python does this!

İlgili konular