2016-03-25 13 views
1

Kıvılcım işlenirken bir istisna söz konusu olduğunda, aşağıdaki logda görebildiğimiz gibi üç kez daha tekrar işlemeye çalışır. Ardından, Sahne Alanı başarısız olarak işaretler. Aşamanın daha sonra analiz edemediği veya onunla başka bir şey yapmadığı tüm verileri almak istiyorum. Bu nasıl yapılabilir? Bunu SparkListeners ile araştırıyorum ama geliştirici API'sı gibi görünüyor.Spark: Hangi aşamada başarısız olan orijinal veriler nasıl alınır?

Teşekkürler.

16/03/23 18:33:00 WARN TaskSetManager: Lost task 1.0 in stage 11.0 (TID 88, 192.168.213.53): java.lang.RuntimeException: Amit baby its exception time 
    at com.yourcompany.custom.identifier.JavaRecoverableNetworkWordCount$1.call(JavaRecoverableNetworkWordCount.java:141) 
    at com.yourcompany.custom.identifier.JavaRecoverableNetworkWordCount$1.call(JavaRecoverableNetworkWordCount.java:131) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.1 in stage 11.0 (TID 89, 192.168.213.53, NODE_LOCAL, 2535 bytes) 
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.1 in stage 11.0 (TID 89) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 1] 
16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.2 in stage 11.0 (TID 90, 192.168.213.53, NODE_LOCAL, 2535 bytes) 
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.2 in stage 11.0 (TID 90) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 2] 
16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.3 in stage 11.0 (TID 91, 192.168.213.53, NODE_LOCAL, 2535 bytes) 
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.3 in stage 11.0 (TID 91) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 3] 
16/03/23 18:33:00 ERROR TaskSetManager: Task 1 in stage 11.0 failed 4 times; aborting job 
16/03/23 18:33:00 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
16/03/23 18:33:00 INFO TaskSchedulerImpl: Cancelling stage 11 
+0

Soru neden oylandı? –

+1

Ayrıca konu dışı olmak için yakın bir oy aldınız. Muhtemelen downvote ile birlikte geldi. Birisinin neden bu sorunun konu dışı olduğunu düşündüğünü anlamıyorum. –

cevap

3

Bu yapılamaz. Bir görevde işlenen veriler genellikle parçası olduğu işten daha uzun süre yaşamaz. Sahne başarısız olduğunda, iş artık mevcut değildir ve veriler çöp toplama için hazırdır. Ona bir atıfta bulunmuyor, bu yüzden ellerinizi alamıyorsunuz.

SparkListener gerçekten DeveloperAPI, ama bu onu kullanamaz anlamına gelmez. Hala bir genel API. Sadece Spark versiyonları arasında istikrarlı olduğu garanti edilmez. Belki bir yıl önce SparkListener kullanıyoruz ve aslında mükemmel bir şekilde kararlı. Bir vermek için çekinmeyin. Ama senin problemini çözebileceğini sanmıyorum.

Yine de geçerli ve ilginç bir fikir. Verilere erişebilmek hata ayıklama ile büyük ölçüde yardımcı olacaktır. Spark JIRA'da bir özellik isteğinde bulunabilirsiniz. Olmak için basit bir şey değil. Spark görevi, verdiğiniz kullanıcı kodundan çok daha karmaşıktır. Dolayısıyla, görevin girişi hata ayıklama için kullanılabilir olsa bile, iyi bir şekilde nasıl yararlanabileceğinizi önemsiz değildir. Her neyse, sanırım bir konuşmaya değer!

+0

Bunun için Spark Spark işaretleme kullanılabilir mi? –

+1

İyi bir denetim noktası tüm RDD'yi yazar. Evet, bir sahneden önce her RDD'yi kontrol ederseniz, görev girişlerini yeniden yapılandırabilirsiniz. Ama bu büyük bir yük olurdu. Arızanın ardından geri dönüp girişleri kontrol edemezsiniz. –

+0

Cevabınız için teşekkürler, bir şey daha, Bağlam başlatıldıktan sonra DAG'yi güncelleyebileceğim herhangi bir yol var mı? Aynı anda birçok şeyi attığım için üzgünüm. –

İlgili konular