2016-04-12 27 views
1

Ben bu basit Kafka Akış Kafka iletilerkafka DirectStream dstream haritası

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Each Kafka message is a flight 
val flights = messages.map(_._2) 

flights.foreachRDD(rdd => { 
    println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records"); 
    rdd.map { flight => {   
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
    } 
    }   
}) 

ssc.start() 
ssc.awaitTermination() 

, bu mümkün RDDs olarak onları almak için Akış Spark sahip yazdırmaz. Ama benim kodumdaki ikinci printn hiçbir şey yazdırmıyor. Yerel [2] modunda çalıştırıldığında sürücü konsolu kayıtlarına baktım, iplik-istemci modunda çalıştırıldığında iplik günlüklerini kontrol ettim.

Neyi eksik? Bunun yerine rdd.map ait

, iyi kıvılcım sürücü konsolunda aşağıdaki kod baskılar:

for(flight <- rdd.collect().toArray) { 
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
} 

Ama bu uçuş nesne üzerinde işlem yerine uygulamakla arasında kıvılcım sürücü projesinde gerçekleşebilir korkuyorum. Yanılıyorsam lütfen beni düzeltin.

Teşekkür

+1

* Worker * executor günlüklerine baktınız mı? Belki de 'FlightParser' sınıfınızı bulamıyor musunuz? –

cevap

1

rdd.map tembel dönüşümdür. Bu RDD'de bir eylem yapılmadığı sürece materyalize edilmeyecektir.
Bu özel durumda, RDD'deki her bir öğeye erişim sağlayan, RDD'deki en genel eylemlerden biri olan rdd.foreach'u kullanabiliriz.

flights.foreachRDD{ rdd => 
    rdd.foreach { flight =>   
     val flightRows = FlightParser.parse(flight) 
     println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently 
    } 
} 

bu RDD eylem uygulayıcıları bir şekilde gerçekleştirilmesini, biz infaz en STDOUT içinde println çıktı bulacaksınız göz önüne alındığında. bunun yerine sürücünün verileri yazdırmak istiyorsanız

, sen DStream.foreachRDD Kapak içindeki RDD verilerini collect edebilirsiniz.

flights.foreachRDD{ rdd => 
    val allFlights = rdd.collect() 
    println(allFlights.mkString("\n")) // prints to the stdout of the driver 
} 
+0

Öneriniz için teşekkürler @massg. org.apache.spark.SparkException: java.io.NotSerializableException: Neden Olduğu Görev seri hale getirilebilir değil org.apache.spark.streaming.StreamingContext Ben ilk yaklaşım çalıştığınızda , aşağıdaki özel durum alıyorum Sanırım bu oluyor çünkü uçuş değişkeni sadece sürücüde değil, Kıvılcım Sürücüsünde mevcut. Neyi eksik? –