2015-12-31 15 views
12

Spark Streaming'de StreamingContext denetim noktalarını güvenilir bir veri deposuna (S3, HDFS, ...) yapabilecek şekilde ayarlamak mümkündür (ve zorunlu işlemler kullanacaksanız zorunludur). AND):DStreams için kıvılcım akış kontrol noktaları

  • Meta veri
  • DStream

olarak, yourSparkStreamingCtx.checkpoint(datastoreURL)

çağırmanız gerekir çıkış veri depolama ayarlamak için, here anlatılmakta olan kalıtsal

Diğer yandan, her DataStream için soy kontrol noktası aralıklarını checkpoint(timeInterval) numaralı telefonu arayarak ayarlamak mümkündür.

dstream.checkpoint (checkpointInterval): Aslında, DataStream 's kayar aralığı 5 ile 10 kez arasında soy kontrol noktası aralığını ayarlamak için tavsiye edilir. Tipik olarak, bir DStream'in 5 - 10 kayar aralıklı bir kontrol noktası aralığı, denemeye iyi bir ayardır.

Sorum şu:

akışı bağlam checkpointing gerçekleştirmek için kurulmuştur ve hiçbir ds.checkpoint(interval) denir zaman batchInterval eşit bir varsayılan checkpointInterval ile tüm veri akışları için etkin soy checkpointing nedir? Ya da tam tersine, yalnızca metadata ne etkisiz hale getiriliyor?

+0

nasıl Standart kontrol noktalarını denetleme tarafından Akış etkin olup söyleyebiliriz?Verileri ve soyunu hatırladığını biliyorum, ancak varsayılan olarak herhangi bir kontrol noktasını etkin görmüyorum. Yani, eğer sürücünüz başarısız olursa veya bazı düğümler azalırsa o zaman bu düğümde yer alan verileri kopyalayamazsınız. (StorageLevel öğesinin "_2" değerini kullanarak). – Sumit

+0

@Sumit Bunu hiç söylemedim. İstediğim şey, strmCtx.checkpoint ("hdfs: // ...") çağırarak denetim noktasını etkinleştirdiğinizde, tüm veri akışlarını kontrol aralıklarını, içerik parti aralığına eşit bir güncelleme aralığıyla etkinleştirip etkinleştirmeyeceğidir. –

+0

Denetim Noktası, tüm Akış Bağlamı için etkinleştirildiğinden, aynı İçerik'ten oluşturulan tüm DStreams'ler kontrol noktasında yararların tadını çıkaracaktır. – Sumit

cevap

9

Kıvılcım kodunu (v1.5) Denetleme ben DStream s- bulundu denetim noktası iki koşulda etkindir: onların checkpoint yöntemine açık bir çağrı ile

(değil StreamContext 'ler):

/** 
* Enable periodic checkpointing of RDDs of this DStream 
* @param interval Time interval after which generated RDD will be checkpointed 
*/ 
def checkpoint(interval: Duration): DStream[T] = { 
    if (isInitialized) { 
     throw new UnsupportedOperationException(
      "Cannot change checkpoint interval of an DStream after streaming context has started") 
    } 
    persist() 
    checkpointDuration = interval 
    this 
} 
DStream başlatma anda

sürece somut 'DStream' alt sınıf geçersiz olduğu gibi mustCheckpoint niteliğini (true ayarlayarak):

private[streaming] def initialize(time: Time) { 
    ... 
    ... 
    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger 
    if (mustCheckpoint && checkpointDuration == null) { 
    checkpointDuration = slideDuration * math.ceil(Seconds(10)/slideDuration).toInt 
    logInfo("Checkpoint interval automatically set to " + checkpointDuration) 
    } 
    ... 

İlk durum açıktır. Kıvılcım Akış yasası konulu bir naif analizin yapılması:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala") 

> ./org/apache/spark/streaming/api/python/PythonDStream.scala: override  val mustCheckpoint = true 
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true 
>./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true 

Ben genel olarak (PythonDStream görmezden), o bulabilirsiniz, StreamingContext kontrol noktası sadece StateDStream ve ReducedWindowedDStream örnekleri için soy kontrol noktaları sağlar. Bu örnekler, dönüşümler sonucudur (sırasıyla, ve):

  • updateStateByKey: Yani, akımı bir kaç pencerelerden bir durum sağlanır.
  • reduceByKeyAndWindow