7

a Spark DataFrame, bir dizi Array [Double] sütunu içerir. Bir map() işlevinde geri almayı denediğimde bir ClassCastException istisnası atar. Aşağıdaki scala kodu bir istisna oluşturur.Spark içinde bir Erişim Dizisi sütunu

case class Dummy(x:Array[Double]) 
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3)))) 
val s = df.map(r => { 
    val arr:Array[Double] = r.getAs[Array[Double]]("x") 
    arr.sum 
}) 
s.foreach(println) 

çalışmıyor neden istisna

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Kam biri beni açıklamak mı? bunun yerine ne yapmalıyım? Ben Spark 1.5.1 ve scala 2.10.6

Teşekkür

cevap

19

ArrayType bir scala.collection.mutable.WrappedArray olarak temsil edilmektedir kullanıyorum. Sen

val arr: Seq[Double] = r.getAs[Seq[Double]]("x") 

veya

val i: Int = ??? 
val arr = r.getSeq[Double](i) 

hatta örneğin kullanarak seçebiliriz:

import org.apache.spark.sql.Row 

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)} 
: DataFrame sonra desen eşleştirme daha iyi bir yaklaşım olabilir nispeten ince

import scala.collection.mutable.WrappedArray 

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x") 

ise

, dizinin türünün işaretli olmadığını unutmamak gerekir. aşağıdaki gibi Spark ise

> = 1.6 de Dataset kullanabilirsiniz:

df.select("x").as[Seq[Double]].rdd 
İlgili konular