2017-06-23 22 views
10

İlk RDD ile mapWithState kullanan bir kıvılcım akışı işini kullanıyorum. Uygulamayı yeniden başlatırken ve denetim noktasından kurtarılırken hatayla başarısız olur:Spark Akış İşi kurtarılamıyor

Bu RDD bir SparkContext'den yoksundur. Aşağıdaki durumlarda gerçekleşebilir:

  1. RDD dönüşümleri ve eylemler sürücüsü tarafından çağrılan, ancak diğer dönüşümlerin iç DEĞİLDİR; örneğin, rdd1.map (x => rdd2.values.count() * x) geçersizdir çünkü değerler dönüşümü ve sayma eylemi rdd1.map dönüşümü içinde gerçekleştirilemez. Daha fazla bilgi için, bkz. SPARK-5063.
  2. Bir Spark Streaming işi denetim noktasından kurtarıldığında, akış işi tarafından tanımlanmayan bir RDD'ye yapılan bir referans DStream işlemlerinde kullanılırsa bu istisna vurulur. Daha fazla bilgi için SPARK-13758

Bu davranış https://issues.apache.org/jira/browse/SPARK-13758 açıklanan ama gerçekten nasıl çözüleceğini tarif edilmez bakın. RDD'm akış yürütme tarafından tanımlanmadı, ancak yine de buna ihtiyacım var.

class EventStreamingApplication { 
    private val config: Config = ConfigFactory.load() 
    private val sc: SparkContext = { 
    val conf = new SparkConf() 
     .setAppName(config.getString("streaming.appName")) 
     .set("spark.cassandra.connection.host", config.getString("streaming.cassandra.host")) 
    val sparkContext = new SparkContext(conf) 
    System.setProperty("com.amazonaws.services.s3.enableV4", "true") 
    sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true") 
    sparkContext 
    } 

    def run(): Unit = { 
    // streaming.eventCheckpointDir is an S3 Bucket 
    val ssc: StreamingContext = StreamingContext.getOrCreate(config.getString("streaming.eventCheckpointDir"), createStreamingContext) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

    def receiver(ssc: StreamingContext): DStream[Event] = { 
    RabbitMQUtils.createStream(ssc, Map(
     "hosts" -> config.getString("streaming.rabbitmq.host"), 
     "virtualHost" -> config.getString("streaming.rabbitmq.virtualHost"), 
     "userName" -> config.getString("streaming.rabbitmq.user"), 
     "password" -> config.getString("streaming.rabbitmq.password"), 
     "exchangeName" -> config.getString("streaming.rabbitmq.eventExchange"), 
     "exchangeType" -> config.getString("streaming.rabbitmq.eventExchangeType"), 
     "queueName" -> config.getString("streaming.rabbitmq.eventQueue") 
    )).flatMap(EventParser.apply) 
    } 

    def setupStreams(ssc: StreamingContext): Unit = { 
    val events = receiver(ssc) 
    ExampleJob(events, sc) 
    } 

    private def createStreamingContext(): StreamingContext = { 
    val ssc = new StreamingContext(sc, Seconds(config.getInt("streaming.batchSeconds"))) 
    setupStreams(ssc) 
    ssc.checkpoint(config.getString("streaming.eventCheckpointDir")) 
    ssc 
    } 
} 

case class Aggregation(value: Long) // Contains aggregation values 

object ExampleJob { 
    def apply(events: DStream[Event], sc: SparkContext): Unit = { 
    val aggregations: RDD[(String, Aggregation)] = sc.cassandraTable('...', '...').map(...) // some domain class mapping 
    val state = StateSpec 
     .function((key, value, state) => { 
     val oldValue = state.getOption().map(_.value).getOrElse(0) 
     val newValue = oldValue + value.getOrElse(0) 
     state.update(Aggregation(newValue)) 
     state.get 
     }) 
     .initialState(aggregations) 
     .numPartitions(1) 
     .timeout(Seconds(86400)) 
    events 
     .filter(...) // filter out unnecessary events 
     .map(...) // domain class mapping to key, event dstream 
     .groupByKey() 
     .map(i => (i._1, i._2.size.toLong)) 
     .mapWithState(state) 
     .stateSnapshots() 
     .foreachRDD(rdd => { 
     rdd.saveToCassandra(...) 
     }) 
    } 
} 

atılan StackTrace geçerli::

Bu

gibi benim grafiği göründüğüne ilişkin bir örnektir

Exception in thread "main" org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. 
    at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:534) 
    at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193) 
    at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
    at scala.Option.orElse(Option.scala:289) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
    at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
    at scala.Option.orElse(Option.scala:289) 
    ... 
    <991 lines omitted> 
    ... 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
    at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ...() 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571) 
    at com.example.spark.EventStreamingApplication.run(EventStreamingApplication.scala:31) 
    at com.example.spark.EventStreamingApplication$.main(EventStreamingApplication.scala:63) 
    at com.example.spark.EventStreamingApplication.main(EventStreamingApplication.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
+0

Spark grafiğinizi ekleyebilir misiniz? –

+0

kod? veya kullanıcı arayüzünden bir şey mi? –

+0

Kod, gerçek Spark DAG. –

cevap

0

O kıvılcım kurtarmak için çalışıyor iken, doğru son denetim noktası dosyası değil gibi görünüyor Seçilmiş olmak. Bu yanlış RDD'ler nedeniyle sevk ediliyor.

Bu, sürüm 2.1.1'in, sürümün sabit sürüm listesinde olmadığı için etkilendiği anlaşılıyor.

Lütfen sabitleme sürümünün henüz belirtilmediği apache dokümanları için aşağıdaki bağlantıya bakın. Bence

https://issues.apache.org/jira/browse/SPARK-19280

, sen kıvılcım işini yeniden başlatma sırasında son denetim noktası dosyası belirtebilirsiniz otomatik/manüel çözümü keşfetmeye deneyebilirsiniz.

Bunun çok yararlı olmadığını biliyorum, ancak bu sorunun kök nedenini ve mevcut gelişimi düzeltmenin ve olası çözüm hakkındaki fikrimi açıklamanın daha iyi olacağını düşündüm.

+0

Bu gerçekten aynı sorun mu? Onlar için sadece bazen oluyor gibi görünüyor. Tüm mapWithState olayının bozulduğuna gerçekten inanamıyorum. –