2016-12-20 19 views
5

Ben kafka 0.10.1.0 ile hata altına alma ve kıvılcım am için güvenli değildir 2.0.2Kıvılcım: 2.0.2 java.util.ConcurrentModificationException: KafkaConsumer çok kanallı erişim

private val spark = SparkSession.builder() 
.master("local[*]") 
.appName(job.name) 
.config("spark.cassandra.connection.host","localhost")) 
.config("spark.cassandra.connection.port","9042") 
.config("spark.streaming.receiver.maxRate", 10000) 
.config("spark.streaming.kafka.maxRatePerPartition", 10000) 
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1) 
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1) 
.getOrCreate() 

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"), 
"key.deserializer" -> classOf[StringDeserializer], 
"value.deserializer" -> classOf[StringDeserializer], 
"group.id" -> job.name, 
"auto.offset.reset" -> config.getString("kafka.offset"), 
"enable.auto.commit" -> (false: java.lang.Boolean) 
)` 

İstisna

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

allreade bir posta zincirini ama aynı yanılgıya hiçbir çözünürlüğü henüz https://www.mail-archive.com/[email protected]/msg56566.html

cevap

0

Ran görülen ve bir çözüm bulamadık. Bunun yerine, bu sorunu önlemek için "--executor-cores 1" komutunu kıvılcım göndererek kullanıyorum. Herhangi bir çözüm bulursanız, lütfen

'u gönderin.
İlgili konular