2017-06-13 28 views
5

Bir veri kümesine benim dataframe dönüştürmek gerekiyor ve aşağıdaki kodu kullandı:Scala'da Apache Spark'deki veri kümesine veri tabanı nasıl dönüştürülür?

val final_df = Dataframe.withColumn(
     "features", 
     toVec4(
     // casting into Timestamp to parse the string, and then into Int 
     $"time_stamp_0".cast(TimestampType).cast(IntegerType), 
     $"count", 
     $"sender_ip_1", 
     $"receiver_ip_2" 
    ) 
    ).withColumn("label", (Dataframe("count"))).select("features", "label") 

    final_df.show() 

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7)) 
    val TrainingDF = trainingTest(0) 
    val TestingDF=trainingTest(1) 
    TrainingDF.show() 
    TestingDF.show() 

    ///lets create our liner regression 
    val lir= new LinearRegression() 
    .setRegParam(0.3) 
    .setElasticNetParam(0.8) 
    .setMaxIter(100) 
    .setTol(1E-6) 

    case class df_ds(features:Vector, label:Integer) 
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 

    val Training_ds = TrainingDF.as[df_ds] 

Sorunum aşağıdaki hata var, yani:

Error:(96, 36) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 
    val Training_ds = TrainingDF.as[df_ds] 

O görünüyor sayı veri çerçevesindeki değerlerin, sınıfımdaki değer sayısı farklıdır. Bununla birlikte, şu anda TrainingDF veri çerçevemde case class df_ds(features:Vector, label:Integer) kullanıyorum, Özellikler ve bir tamsayı etiketi var. Ayrıca burada

+--------------------+-----+ 
|   features|label| 
+--------------------+-----+ 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,10...| 10| 
+--------------------+-----+ 

benim orijinal final_df dataframe geçerli:: İşte TrainingDF dataframe olan

+------------+-----------+-------------+-----+ 
|time_stamp_0|sender_ip_1|receiver_ip_2|count| 
+------------+-----------+-------------+-----+ 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.3|  10.0.0.2| 10| 
+------------+-----------+-------------+-----+ 

Ancak bahsettiğim hata var! Birisi bana yardım edebilir mi? Şimdiden teşekkürler.

cevap

8

Okuduğunuz hata mesajı oldukça iyi bir işaretçidir.

Eğer DataFrame satır depolanır ne olursa olsun uygun Encoder olması bir DataFrame Dataset bir dönüştürmeniz,.

ilkel benzeri türleri için Kodlayıcılar (böylece Int s, String ler ve benzeri) ve case classes sadece SparkSession gibi sizin için implicits ithal tarafından sağlanmaktadır aşağıdaki gibidir:

case class MyData(intField: Int, boolField: Boolean) // e.g. 

val spark: SparkSession = ??? 
val df: DataFrame = ??? 

import spark.implicits._ 

val ds: Dataset[MyData] = df.as[MyData] 

bu da işe yaramazsa için DataFrame dökümünü desteklemiyorsa, desteklenmiyor. Bu durumda, kendi Encoder yazmanız gerekir: Bu konuda daha fazla bilgi bulabilirsiniz here ve bir örnek (java.time.LocalDateTime için Encoder) here bakın.

+0

dead link [burada] (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-Encoder.html) :( –

+0

@joeybaruch Rapor için teşekkürler. Yazar, Spark SQL ile ilgili maddeleri kendi kitabına taşıdı – stefanobaghino

+0

Ayrıca, bu cevabı yazdığımdan beri kitaba eklenmiş bir örnek ekleme şansını da yakaladım. – stefanobaghino

İlgili konular