2015-10-21 24 views
7

Kafka konusuna giren mesajlara cevap veren Java tabanlı bir Spark Streaming uygulaması üzerinde çalışıyorum. Her mesaj için, uygulama bazı işlemleri yapar ve sonuçları farklı bir Kafka konusuna yazar.Kıvılcımda Yakalanmamış İstisna İşleme

Bazen beklenmedik verilerle ilgili sorunlar nedeniyle, RDD'lerde çalışan kod başarısız olabilir ve bir istisna atar. Bu olduğunda, gerekli işlemi yapabilecek ve bir hata konusuna bir mesaj bırakabilecek genel bir işleyiciye sahip olmak isterim. Şu anda, bu istisnalar Spark’in Spark’in log’unda yazılıyor.

Bunu yapmak için en iyi yaklaşım nedir, RDD'lerde çalışan her kod bloğu için try-catch blokları yazmak yerine?

+0

Birisi bu soruyu söyleyerek yakın bir oyu döküm olduğunu görüyoruz görüş içerikli olup ayrıca bir deneyin catch ile bu uygulamaya düşünüyorum. Eğer şu anda Spark ile mümkün değilse, uzmanlar en azından yakın oyu kullanmadan önce bir miktar ışık tutabilirlerse memnun olurum. Açıklama yapmadan yakın bir oylama yapmak, topluluğa hiçbir şekilde yardımcı olmaz. –

+0

Bunu yapan genel bir işlev yazabilirsiniz. Sadece Spark istisnalarını atabilecek tek kişiler olduğu için RDD eylemleri etrafına sarmanız gerekir (.map ve .filter gibi dönüştürücüler, eylemlerle tembeldir). (Bunun Scala'da olduğunu varsayarsak) Hatta, hata yapma işini sadece tip imzalarla zorlayarak uygulamak için oluşturduğunuz zenginleştirilmiş RDD sınıfını ele alan bir hata ve dolaylı olarak bir şey deneyebilirsiniz. Yakın oyu vermedim, ama “en iyi” yaklaşımın uygulama ihtiyaçları için biraz öznel olduğunu hayal ediyorum. – Rich

+0

Teşekkürler @Rich. Yani, temelde söylemek istediğin şey şu ki Spark'u şu an halletmek için inşa edilmiş bir yol yok, bu yüzden her bir uygulama onunla ilgilenmeli. Yorumunuzu cevap olarak gönderebilirseniz, kabul edeceğim. –

cevap

3

Bunu yapan genel bir işlev yazabilirsiniz. Sadece, sadece Spark istisnalarını atabilecek olanlar olan RDD eylemleri etrafında sarmanız gerekir (.map ve .filter gibi transformatörler, eylemler tarafından tembellik ile yürütülür).

(Bunun Scala'da olduğunu varsayarsak) Hatta bir şeyleri örtülü olarak da deneyebilirsiniz. RDD'yi tutan ve hatayı işleyen bir sınıf oluşturun. İşte böyle görünüşüne ait bir kroki: Sen failsafeAction içine girmesine ya da hata konu mesajlaşma ekleyebilir

implicit class FailSafeRDD[T](rdd: RDD[T]) { 
    def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try { 
    fn(rdd) 
    } 
} 

Eğer başarısızlık her zaman yapmak istiyorum. gerçi Bunun yanı sıra

val rdd = ??? // Some rdd you already have 
val resultOrException = rdd.failsafeAction { r => r.count() } 

, ben "en iyi" yaklaşımı uygulama ihtiyaçlarına biraz sübjektif hayal: Ve sonra kullanım gibi olabilir.

2

I =>

dstream.foreachRDD { case rdd: RDD[String] => 
    rdd.foreach { case string: String => 
     try { 
     val kafkaProducer = ... 
     val msg = ... 
     kafkaProducer.send(msg) 
     } catch { 
     case d: DataException=> 
      val kafkaErrorProducer = ... 
      val errorMsg = ... 
      kafkaErrorProducer.send(errorMsg) 
     case t: Throwable => 
      //further error handling 
     } 
    } 
} 
İlgili konular