Böyle bir işlem bir DataFrame API'si tarafından desteklenmez. Bu, zip
iki RDD için mümkündür, ancak çalışmasını sağlamak için, bölüm başına sayı ve parça sayısı eşleştirmeniz gerekir. Bu durumda olduğunu varsayarsak:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
Yukarıdaki koşullar bir dizin ekleme ve katılmak olduğunu akla gelen tek seçenek yerine getirilmezse:
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")
iki dataframes satır aynı # sahip olduğunu varsaymak güvenli mi: Bu, bu uygulama benim saplama kodudur? –