2015-01-24 18 views
11
bu şekilde benim küme başlattı

:Kıvılcım: Repartition strateji metin dosyasını okuduktan sonra

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar 

Ben büyük metin dosyası okunur yapmak ve ilk şey saymak:

val file = sc.textFile("/path/to/file.txt.gz") 
println(file.count()) 

zaman Bunu yapıyorum, sadece benim düğümlerimden birinin aslında dosyayı okuduğunu ve sayımı yürüttüğünü görüyorum (çünkü sadece bir görevi görüyorum). Bu beklenen mi? RDD'yi daha sonra yeniden bölümlendirmeli miyim, yoksa harita işlevlerini kullandığımda, Spark'i benim için yapar mı?

+0

"defaultMinPartitions" nız nedir? Belge açıkça belirttiği gibi, textFile isteğe bağlı sayıda bölüm parametresi alır; –

+0

VarsayılanMinPartitions değerlerimden büyüktür. Belirtilen sayıda bölüm zorlayamayacağı görünüyor, çünkü bu sadece bir metin dosyası ... çalışıyor ... val file = sc.textFile ("/ yol/to/file.txt.gz", 8) println (file.partitions.length) , 1 – Stephane

+0

değerini döndürür. Bu, okumayı tek bir yerde yapmak zorundadır, çünkü bu, doğası gereği seridir. Fakat, eğer _something_ yapmadıysa, bu isteğe bağlı paramın neden olacağını göremiyorum. –

cevap

20

Gzip dosyasıyla çalıştığınız anlaşılıyor.

my answer here alıntı:

Bunu onlar paralel olarak yüklenemez içinde gzip'lenmiş dosyaları ile oldukça tipik bir sorunu isabet düşünüyorum. Daha spesifik olarak, tek bir gzipli dosya, çoklu görevler tarafından paralel olarak yüklenemez, bu yüzden Spark, 1 görevle yükler ve böylece 1 bölümlü bir RDD verir.

Bunu paralel daha fazla görevler çalıştırmak böylece açıkça yükledikten sonra RDD bölmeniz gerekir. Örneğin

: Sorunuza yapılan yorumlar İlişkin

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3) 
println(file.count()) 

, burada yardımcı olmuyor minPartitions ayarı nedendir a gzipped file is not splittable çünkü Kıvılcım hep dosyayı okumak için 1 görevi kullanmasını sağlayabilirsiniz.

normal bir metin dosyası veya bzip2 gibi bir ayrılabilir sıkıştırma formatı ile sıkıştırılmış bir dosya okurken minPartitions ayarlarsanız, o Spark aslında mevcut çekirdek sayısı kadar (paralel görevlerin bu sayıyı dağıtacak göreceksiniz dosyayı okumak için).

+0

Teşekkürler! O zaman bzip2'yi gzip üzerinden önerir misin? Bu dosyayı sık sık yüklerseniz, her koşuyu optimize etme stratejim ne olmalıdır? – Stephane

+0

@Stephane - Verilerin ne kadarının geldiğine ve kümenizin verileri yeniden bölümlemeye ne kadar zaman harcadığına bağlıdır. Tek bir gzip dosyası iyi olabilir. Bir dosya çok büyükse, her bir gzipli dosya aynı RDD'ye (dosya başına bir görev) paralel olarak yüklenebildiğinden, muhtemelen birden fazla gzipli dosyayla (yani, sıkıştırmadan önce bölme) de gidebilirsiniz. Muhtemelen en az direnç yoludur. –

+0

çok çok ilginç teşekkürler! Yani .gz.001 bölme dosyaları veya bzip2 ... Her ikisini de deneyeceğim!Evet, büyük darboğaz ilk yeniden bölümleme olduğunu düşünüyorum, bu yüzden dosyalarımı zaten geldiğinde bölmek için yönetirsem bana biraz zaman kazandırabilir – Stephane

İlgili konular