2017-01-18 30 views
6

Yuvalanmış düzeydeki bir yapıya alanları nasıl ekleyebilir veya değiştirebilirim?Spark DataFrame'e iç içe bir sütun ekleme

Bu giriş:

val rdd = sc.parallelize(Seq(
    """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""", 
    """{"a": {"xX": 3},"b": {"z": 0}}""", 
    """{"a": {"XX": 3},"b": {"z": 0}}""", 
    """{"a": {"xx": 4},"b": {"z": 0}}""")) 
var df = sqlContext.read.json(rdd) 

aşağıdaki şemayı verir:

root 
|-- a: struct (nullable = true) 
| |-- XX: long (nullable = true) 
| |-- xX: long (nullable = true) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

Sonra bunu yapabilirsiniz:

import org.apache.spark.sql.functions._ 
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withColumn("a_xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 
    .dropNestedColumn("a.xx") 

(dropNestedColumn bu cevap ödünç: https://stackoverflow.com/a/39943812/1068385. Ben temel olarak ters operatiyi arıyorum. . Bunun üzerine)

Ve şema haline gelir:

root 
|-- a: struct (nullable = false) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 
|-- a_xx: long (nullable = true) 

Açıkçası yerine (veya ekleme) a.xx ama bunun yerine kök düzeyinde yeni bir alan a_xx ekler gelmez.

yerine bunu yapabilmek istiyorum:

val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) 
df = df 
    .withNestedColumn("a.xx", 
    coalesce(overlappingNames:_*)) 
    .dropNestedColumn("a.xX") 
    .dropNestedColumn("a.XX") 

bu şemada sebep olacağı Yani:

root 
|-- a: struct (nullable = false) 
| |-- xx: long (nullable = true) 
|-- b: struct (nullable = true) 
| |-- z: long (nullable = true) 

Bunu nasıl başarabiliriz?

Buradaki pratik amaç, JSON girişindeki sütun adlarıyla büyük/küçük harfe duyarlı olmaktır. Son adım basit olurdu: üst üste binen tüm sütun isimlerini toplayın ve her birine birleştirme uygulayın.

object DataFrameUtils { 
    private def nullableCol(parentCol: Column, c: Column): Column = { 
    when(parentCol.isNotNull, c) 
    } 

    private def nullableCol(c: Column): Column = { 
    nullableCol(c, c) 
    } 

    private def createNestedStructs(splitted: Seq[String], newCol: Column): Column = { 
    splitted 
     .foldRight(newCol) { 
     case (colName, nestedStruct) => nullableCol(struct(nestedStruct as colName)) 
     } 
    } 

    private def recursiveAddNestedColumn(splitted: Seq[String], col: Column, colType: DataType, nullable: Boolean, newCol: Column): Column = { 
    colType match { 
     case colType: StructType if splitted.nonEmpty => { 
     var modifiedFields: Seq[(String, Column)] = colType.fields 
      .map(f => { 
      var curCol = col.getField(f.name) 
      if (f.name == splitted.head) { 
       curCol = recursiveAddNestedColumn(splitted.tail, curCol, f.dataType, f.nullable, newCol) 
      } 
      (f.name, curCol as f.name) 
      }) 

     if (!modifiedFields.exists(_._1 == splitted.head)) { 
      modifiedFields :+= (splitted.head, nullableCol(col, createNestedStructs(splitted.tail, newCol)) as splitted.head) 
     } 

     var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*) 
     if (nullable) { 
      modifiedStruct = nullableCol(col, modifiedStruct) 
     } 
     modifiedStruct 
     } 
     case _ => createNestedStructs(splitted, newCol) 
    } 
    } 

    private def addNestedColumn(df: DataFrame, newColName: String, newCol: Column): DataFrame = { 
    if (newColName.contains('.')) { 
     var splitted = newColName.split('.') 

     val modifiedOrAdded: (String, Column) = df.schema.fields 
     .find(_.name == splitted.head) 
     .map(f => (f.name, recursiveAddNestedColumn(splitted.tail, col(f.name), f.dataType, f.nullable, newCol))) 
     .getOrElse { 
      (splitted.head, createNestedStructs(splitted.tail, newCol) as splitted.head) 
     } 

     df.withColumn(modifiedOrAdded._1, modifiedOrAdded._2) 

    } else { 
     // Top level addition, use spark method as-is 
     df.withColumn(newColName, newCol) 
    } 
    } 

    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Add nested field to DataFrame 
     * 
     * @param newColName Dot-separated nested field name 
     * @param newCol New column value 
     */ 
    def withNestedColumn(newColName: String, newCol: Column): DataFrame = { 
     DataFrameUtils.addNestedColumn(df, newColName, newCol) 
    } 
    } 
} 

onu geliştirmek için çekinmeyin:

+0

Çözümü elde ettiniz? –

+0

@ShankarKoirala: Spark ile değil. Kovanda istediğimi elde etmek için COALESCE kullanmak çok önemliydi. – Arvidaa

cevap

1

Burada Olması gerektiği kadar zarif veya verimli olmayabilir ama ben ile geldi budur.

val data = spark.sparkContext.parallelize(List("""{ "a1": 1, "a3": { "b1": 3, "b2": { "c1": 5, "c2": 6 } } }""")) 
val df: DataFrame = spark.read.json(data) 

val df2 = df.withNestedColumn("a3.b2.c3.d1", $"a3.b2") 

üretmelidir:

assertResult("struct<a1:bigint,a3:struct<b1:bigint,b2:struct<c1:bigint,c2:bigint,c3:struct<d1:struct<c1:bigint,c2:bigint>>>>>")(df2.shema.simpleString) 
+0

Teşekkürler. Gelecek hafta doğrulayacağım ve kabul edilen cevap olarak işaretlenecek. – Arvidaa

+0

@Michel Lemay Soruda durum için iyi çalışıyor. Teşekkürler. Onu iç içe geçmiş bir yapıya uygulamaya çalışıyorum ve başarısız oluyor, gerçek kıvılcım bilgim için biraz fazla uzakta ... bana yardım edebilir misin? – Gab

+0

Gerçekten de, ihtiyaç duyduğumuz bir özellik değil, gelecekteki gelişmeler için onu bıraktım. Bunu mevcut kodu kullanarak desteklemek için, 'case _' dosyasını değiştirmek ve iç içe geçmiş yapıların dizilerini desteklemek gerekir. İç içe geçmiş basit türlerin de bir yapıya tanıtılması gerekir. Ayrıca, newCol'daki dizileri desteklememiz ve hedef dizideki farklı sayıda elemanla ilgilenmemiz gerekir. –