2016-09-03 31 views
5

Akış ile kaydeder:Serializable Değil istisna okuma Kafka Spark Spark 2.0 kullanılarak Kafka gelen Akış sırasında aşağıdaki hatayı alıyorum

org.apache.spark.SparkException: 
Job aborted due to stage failure: 
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: 
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337, 
CreateTime = 1472871209063, checksum = 2826679694, 
serialized key size = -1, serialized value size = 95874, 
key = null, value = <JSON GOES HERE...> 
İşte

kod ilgili kısmı şunlardır:

val ssc = new StreamingContext(sc, Seconds(2)) 

val topics = Array("ecfs") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

stream 
    .map(_.value()) 
    .flatMap(message => { 
    // parsing here... 
    }) 
    .foreachRDD(rdd => { 
    // processing here... 
    }) 

ssc.start() 

Anlatabildiğim kadarıyla, bu sorun .map(_.value()) soruna neden oluyor, bu nasıl düzeltilebilir?

cevap

0

Dstream'de .map kullanamazsınız: [String, String] burada kullandığınız gibi. Sana dönüşümü kullanmak ve sonra

val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}

izler ya da kullanılan ziyade _.value() Eğer ben gibi, haritaya bir işlev gönderme deneyin gerektiğini yapıyor gibi .map kullanabilirsiniz olarak haritayı uygulayabilirsiniz düşünüyorum

stream.map{case (x, y) => (y.toString)}