2016-03-23 17 views
1

Spark kullanarak Hadoop Sekans Dosyalarımı okuyorum (v1.6.1). RDD'yi önbelleğe aldıktan sonra, RDD'deki içerik geçersiz hale gelir (son giriş n kez kopyalanır). İşte Önbelleğe Alınmış Spark RDD (Dizi Dosyasından okunan) geçersiz girişlere sahip, bunu nasıl düzeltirim?

benim kod parçacığı geçerli:

import org.apache.hadoop.io.Text 
import org.apache.hadoop.mapred.SequenceFileOutputFormat 
import org.apache.spark.{SparkConf, SparkContext} 

object Main { 
    def main(args: Array[String]) { 
    val seqfile = "data-1.seq" 
    val conf: SparkConf = new SparkConf() 
     .setAppName("..Buffer..") 
     .setMaster("local") 
     .registerKryoClasses(Array(classOf[Text])) 
    val sc = new SparkContext(conf) 

    sc.parallelize((0 to 1000).toSeq) //creating a sample sequence file 
     .map(i => (new Text(s"$i"), new Text(s"${i*i}"))) 
     .saveAsHadoopFile(seqfile, classOf[Text], classOf[Text], 
     classOf[SequenceFileOutputFormat[Text, Text]]) 

    val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text]) 
     .cache() 
     .map(t => {println(t); t}) 
     .collectAsMap() 
    println(c) 
    println(c.size) 

    sc.stop() 
    } 
} 

çıkışı:

(1000,1000000) 
(1000,1000000) 
(1000,1000000) 
(1000,1000000) 
(1000,1000000) 
...... //Total 1000 lines with same content as above ... 
Map(1000 -> 1000000) 
1 

DÜZENLEME: gelecek ziyaretçiler için : Yukarıdaki kodda olduğu gibi, sıranın dosyasını okuyorsanız snippet, kabul edilen cevaba bakın.

val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text]) 
    .map(t =>(new Text(t._1), new Text(t._2))) //Make copy of writable instances 
+2

Muhtemelen [SPARK-993] ile ilişkili (https://issues.apache.org/jira/browse/SPARK-993). – climbage

+0

Evet, ilgili [SPARK-993] (https://issues.apache.org/jira/browse/SPARK-993) ile ilgilidir. Teşekkürler. –

cevap

3

sequenceFile içinde açıklamalara bakınız: Basit bir geçici çözüm Hadoop Writable örneğinin bir kopyasını yapmaktır.

/** Verilen anahtar ve değer türleriyle bir Hadoop SekansDosyası için RDD alın. * * '' 'Not:' '' Hadoop'un RecordReader sınıfı her bir * kaydı için aynı Yazılabilir nesneyi yeniden kullandığından, doğrudan geri döndürülmüş RDD'yi önbelleğe alarak veya doğrudan bir toplama veya shuffle'a ileterek * işleminde çok sayıda başvuru oluşturacaktır. aynı nesneye * Hadoop yazılabilir nesnelerini doğrudan önbelleğe almayı, sıralamayı veya birleştirmeyi planlıyorsanız, önce bir map işlevini kullanarak kopyalayın. */

İlgili konular