2016-12-28 20 views
5

Özellikleri normalleştirmek için StandardScaler kullanmak istiyorum. İşte SparkException: Birleştirilecek değerler boş olamaz

benim kodudur:

val Array(trainingData, testData) = dataset.randomSplit(Array(0.7,0.3)) 
val vectorAssembler = new VectorAssembler().setInputCols(inputCols).setOutputCol("features").transform(trainingData) 
val stdscaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false).fit(vectorAssembler) 

ama ben StandardScaler

[Stage 151:==>             (9 + 2)/200]16/12/28 20:13:57 WARN scheduler.TaskSetManager: Lost task 31.0 in stage 151.0 (TID 8922, slave1.hadoop.ml): org.apache.spark.SparkException: Values to assemble cannot be null. 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:159) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:142) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:142) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98) 
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:97) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) 
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) 
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1093) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1093) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1094) 
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1094) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

VectorAssembler yanlış bir şey var mı kullanmaya çalıştığında bir istisna attı?

VectorAssembler birkaç satırını kontrol ettim ve Tamam görünüyordu.

vectorAssembler.take(5) 

cevap

8

VectorAssembler yanlış bir şey yoktur. Spark Vector sadece null değerlerini içeremez.

import org.apache.spark.ml.feature.VectorAssembler 

val df = Seq(
    (Some(1.0), None), (None, Some(2.0)), (Some(3.0), Some(4.0)) 
).toDF("x1", "x2") 

val assembler = new VectorAssembler() 
    .setInputCols(df.columns).setOutputCol("features") 

assembler.transform(df).show(3) 
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<x1:double,x2:double>) => vector) 
... 
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. 

boş ML algoritmaları için anlamlı değildir ve scala.Double kullanılarak temsil edilemez.

Ya damla zorunda:

assembler.transform(df.na.drop).show(2) 
+---+---+---------+ 
| x1| x2| features| 
+---+---+---------+ 
|3.0|4.0|[3.0,4.0]| 
+---+---+---------+ 

veya/impute doldurun (ayrıca Replace missing values with mean - Spark Dataframe bakınız):

// For example with averages 
val replacements: Map[String,Any] = Map("x1" -> 2.0, "x2" -> 3.0) 
assembler.transform(df.na.fill(replacements)).show(3) 
+---+---+---------+ 
| x1| x2| features| 
+---+---+---------+ 
|1.0|3.0|[1.0,3.0]| 
|2.0|2.0|[2.0,2.0]| 
|3.0|4.0|[3.0,4.0]| 
+---+---+---------+ 

nulls.

İlgili konular