5

Zeppelin'deki Spark bileştiren kmmeans algoritması çalıştırılıyor. Hep olsa bu hatayı alabilirsinizSpark, java.util.NoSuchElementException öğesini atar: anahtar bulunamadı: 67

//I transform my data using the TF-IDF algorithm 

val idf = new IDF(minFreq).fit(data) 
val hashIDF_features = idf.transform(dbTF)  

//and parse the transformed data to the clustering algorithm. 

val bkm = new BisectingKMeans().setK(100).setMaxIterations(2) 
val model = bkm.run(hashIDF_features) 
val cluster_rdd = model.predict(hashIDF_features) 

:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 270.0 failed 4 times, most recent failure: Lost task 0.3 in stage 270.0 (TID 126885, IP): java.util.NoSuchElementException: key not found: 67 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:58) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337) 
    at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125) 
    at scala.collection.immutable.List.reduceLeft(List.scala:84) 
    at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231) 
    at scala.collection.AbstractTraversable.minBy(Traversable.scala:105) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:933) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:261) 
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$run$1.apply$mcVI$sp(BisectingKMeans.scala:194) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:189) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101) 

Im Spark 1.6.1 kullanarak. Ben bağımsız bir uygulama bu algoritma çalıştırırsanız İlginç, artık hata vurur ama Zeppelin bu olsun. Buna ek olarak, giriş harici bir algoritma ile hesaplanmıştır, bu yüzden bir formatlama problemi olduğunu düşünmüyorum. Herhangi bir fikir?

Düzenleme: Daha az sayıda küme kullanarak sistemi tekrar test ettim ve hata gerçekleşmedi. Algoritma neden büyük küme değerleri için kesilir?

cevap

1

Sorun, closure nedeniyle olduğuna inanıyorum. Uygulamanızı yerel olarak çalıştırdığınızda, her şey aynı bellekte/işlemde çalışıyor olabilir. Başka bir bellekte/işlemde çalışabilen bir clousre'dan yerel değişkene erişmeye çalışmadığınızdan emin olun. Sorununuzu çözmek için This yardımcı olacaktır.

+1

Kapatma işlemini tamamlamak için komutları yukarı kaydırdım, ancak sorun hala devam ediyor. – Mnemosyne

1

Aynı konuyla da ilgileniyorum, ayrıca bu konuyu Spark JIRA'ya da bildirdim, ancak herhangi bir yanıt almadım. https://issues.apache.org/jira/browse/SPARK-16473

+0

Bu sorunun çözümü olan var mı? Ya da bunun kök sebebi –

+0

Sanırım problemin ne kadar olduğunu bilmeme rağmen hafıza ile ilgili olduğunu düşünüyorum. Daha güçlü bir kümeye geçtim ve üst sınır arttı. 150-200 kümeye gidebilirdim ve aynı hatayı tekrar vurabilirdim. Algoritmada muhtemelen bir hata. Bu üzücü çünkü BKM KM'den çok daha hızlı. – Mnemosyne

+0

Herkes sbt depolarına BisectingKmeans.scala sınıfında bu zaten summited değişikliği inlcuded olacak bilir ne zaman ?. Güncellenmiş fonksiyona ihtiyacım var ama mllib'nin son sbt versiyonunda (2.11) değişiklik yok. – eifersucht

İlgili konular