Ben bu basit Kafka Akış Kafka iletilerkafka DirectStream dstream haritası
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD(rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
, bu mümkün RDDs olarak onları almak için Akış Spark sahip yazdırmaz. Ama benim kodumdaki ikinci printn hiçbir şey yazdırmıyor. Yerel [2] modunda çalıştırıldığında sürücü konsolu kayıtlarına baktım, iplik-istemci modunda çalıştırıldığında iplik günlüklerini kontrol ettim.
Neyi eksik? Bunun yerine rdd.map ait
, iyi kıvılcım sürücü konsolunda aşağıdaki kod baskılar:
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
Ama bu uçuş nesne üzerinde işlem yerine uygulamakla arasında kıvılcım sürücü projesinde gerçekleşebilir korkuyorum. Yanılıyorsam lütfen beni düzeltin.
Teşekkür
* Worker * executor günlüklerine baktınız mı? Belki de 'FlightParser' sınıfınızı bulamıyor musunuz? –