2016-10-27 15 views
11

Bizim uygulamanızda (Spark 2.0.1), bu istisnayı sık sık ortaya çıkarıyoruz. Bu konuda bir şey bulamıyorum. Sebep ne olabilir?Spark Listener EventLoggingListener bir istisna attı/ConcurrentModificationException

16/10/27 11:18:24 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception 
java.util.ConcurrentModificationException 
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) 
    at java.util.ArrayList$Itr.next(ArrayList.java:851) 
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) 
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) 
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:590) 
    at scala.collection.AbstractTraversable.to(Traversable.scala:104) 
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294) 
    at scala.collection.AbstractTraversable.toList(Traversable.scala:104) 
    at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314) 
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) 
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291) 
    at scala.Option.map(Option.scala:146) 
    at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291) 
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) 
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
    at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283) 
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145) 
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76) 
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137) 
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157) 
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) 
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249) 
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) 

DÜZENLEME: Bir çok bilgi bizim uygulama uzun çalışır durumdadır, potansiyel olarak başarısız kıvılcım bağlamdan devam etmek, iki "işler" arasında SparkBuilder.getOrCreate() yöntemini kullanın. Bu, dinleyicilerle karıştırılabilir mi?

cevap

2

Ayrıca Spark 2.0.1'e yeni geçiş yaptık ve aynı istisnayı görmeye başladık. Aşağıdaki deyim içeren Python kodunun bir bölümüne neden daralmış:

a = spark_context.textFile('..') 
a = a.map(stuff) 
b = a.filter(stuff).map(stuff) 

Ben Spark değişken kendinden atama ile geçmişte sorunları vardı, ama sorun gerçekten var 2.0.1 yükselttikten sonra akut ve ConcurrentModification istisnalarını görmeye başladık.

Bizim için düzeltme, kendi kendini atamaları yapmamak için sadece kodu değiştiriyordu.

+0

'Spark 2.0.1''e yükselttikten sonra, aynı hata mesajını alıyorum ve kodumda herhangi bir değişken atamam yok. –

+1

"kendi atamaları" bir şey değildir. "A = a.map (şeyler)" dediğinizde, a'ya "a" adlı yeni DataFrame değişkenini atamazsınız, ancak eski referans kaybolmaz, çünkü sonuçta elde edilen DataFrame tarafından referans alınır. bağımlılık grafiği. Bence gerçekten başka bir şey görüyorsun. – vy32