Akış ile kaydeder:Serializable Değil istisna okuma Kafka Spark Spark 2.0 kullanılarak Kafka gelen Akış sırasında aşağıdaki hatayı alıyorum
org.apache.spark.SparkException:
Job aborted due to stage failure:
Task 0.0 in stage 1.0 (TID 1) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class:
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337,
CreateTime = 1472871209063, checksum = 2826679694,
serialized key size = -1, serialized value size = 95874,
key = null, value = <JSON GOES HERE...>
İşte
kod ilgili kısmı şunlardır:
val ssc = new StreamingContext(sc, Seconds(2))
val topics = Array("ecfs")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
.map(_.value())
.flatMap(message => {
// parsing here...
})
.foreachRDD(rdd => {
// processing here...
})
ssc.start()
Anlatabildiğim kadarıyla, bu sorun .map(_.value())
soruna neden oluyor, bu nasıl düzeltilebilir?