Enums

2016-09-23 21 views
7

'u içeren vaka sınıflarından Spark Veri Kümesi veya Dataframe nasıl oluşturulur Enümler içeren vaka sınıflarını kullanarak Spark Veri Kümesi oluşturmaya çalışıyorum ama yapamıyorum. Spark versiyon 1.6.0 kullanıyorum. İstisnalar, Enum'um için kodlayıcı bulunmadığından şikayet ediyor. Spark’de bu verilerde mümkün değil mi?Enums

Kodu:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object MyEnum extends Enumeration { 
    type MyEnum = Value 
    val Hello, World = Value 
} 

case class MyData(field: String, other: MyEnum.Value) 

object EnumTest { 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 
    val sqlCtx = new SQLContext(sc) 

    import sqlCtx.implicits._ 

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS() 

    println(s"df: ${df.collect().mkString(",")}}") 
    } 

} 

Hata:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value 
- field (class: "scala.Enumeration.Value", name: "other") 
- root class: "com.company.MyData" 
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597) 
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509) 
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502) 
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394) 
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54) 
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41) 
at com.company.EnumTest$.main(EnumTest.scala:22) 
at com.company.EnumTest.main(EnumTest.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

cevap

4

Kendi kodlayıcı oluşturabilirsiniz:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object MyEnum extends Enumeration { 
    type MyEnum = Value 
    val Hello, World = Value 
} 

case class MyData(field: String, other: MyEnum.Value) 

object MyDataEncoders { 
    implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] = 
    org.apache.spark.sql.Encoders.kryo[MyData] 
} 

object EnumTest { 
    import MyDataEncoders._ 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 
    val sqlCtx = new SQLContext(sc) 

    import sqlCtx.implicits._ 

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS() 

    println(s"df: ${df.collect().mkString(",")}}") 
    } 
} 
+0

Teşekkür! ToDS() yerine toF() yapmak istiyorsam ne yapmalıyım? Sonra aşağıdaki hatayı alıyorum: "ana" java.lang.UnsupportedOperationException iş parçacığında özel durum: com.nordea.gpdw.dq.MyEnum.Value türü için şema desteklenmiyor – ErikHabanero

+0

Yanıtımda kullandığım kodun aynısını kullanıyor musunuz? ToDS'yi toFF'ye değiştirmeyi denedim ve işe yarıyor gibi görünüyor. –

+0

Evet, stdout'ta df: MyData (merhaba, Dünya)} 'nın yazdırıldığından emin misiniz? Çok fazla günlük çıkışı var. – ErikHabanero

İlgili konular