2016-08-06 20 views
9

Düzenle: Yanıt yardımcı oluyor, ancak çözümüm şu adreste açıklandı: memoryOverhead issue in Spark.Verilerim bölümler arasında nasıl dengelenir?


Ben başkaları tarafından oluşturulan bir veri kümesi okur 202.092 bölümleri, bir RDD var.

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
: elle bazıları 0 görüntüler var ve verileri işlerken ortalama 432 yatıyor iken diğeri 4k var örneğin veri, bölmeler arasında dengeli olmadığını görebilirsiniz bu hata var

bellekte iken, Osilatör zaten yükseltildi. İpliklerin benim kapımı öldürmesini sağlayan bazı sivri uçların olduğunu hissediyorum, çünkü bu başak belirtilen sınırları aşar.

Yani benim veri (kabaca) bölümleri arasında dengeli olduğundan emin olun yapmalıyım?


Benim fikrim repartition() çalışacağına dair, o çağıran karıştırma:

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

: programming-guide talimatlarına rağmen

dataset = dataset.repartition(202092) 

ama sadece aynı hata var,


Oyuncak örneğimi kontrol et:

data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000)) 
d = data.glom().collect() 
len(d[0])  # 1000 
len(d[1])  # 2000 
len(d[2])  # 3000 
repartitioned_data = data.repartition(3) 
re_d = repartitioned_data.glom().collect() 
len(re_d[0]) # 1854 
len(re_d[1]) # 1754 
len(re_d[2]) # 2392 
repartitioned_data = data.repartition(6) 
re_d = repartitioned_data.glom().collect() 
len(re_d[0]) # 422 
len(re_d[1]) # 845 
len(re_d[2]) # 1643 
len(re_d[3]) # 1332 
len(re_d[4]) # 1547 
len(re_d[5]) # 211 
repartitioned_data = data.repartition(12) 
re_d = repartitioned_data.glom().collect() 
len(re_d[0]) # 132 
len(re_d[1]) # 265 
len(re_d[2]) # 530 
len(re_d[3]) # 1060 
len(re_d[4]) # 1025 
len(re_d[5]) # 145 
len(re_d[6]) # 290 
len(re_d[7]) # 580 
len(re_d[8]) # 1113 
len(re_d[9]) # 272 
len(re_d[10]) # 522 
len(re_d[11]) # 66 

cevap

4

Sanırım yükleme sırasında kullanılan DirectMemory arabelleklerinden kaynaklanan bellek genel gider sınırı aşılıyor. Sanırım 2.0.0'da. (Biz aynı sorunu vardı, ama biz 2.0.0 için bu yükseltme Ne yazık ki bana destek olmak Kıvılcım sorunu numaraları yok. Çözüldü bulunca çok daha derin kazma durdu.)


dengesiz bölümleri sonra repartition şaşırtıcıdır. https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443 ile kontrast. Spark, repartition numaralı telefondan rastgele anahtarlar bile üretir, bu nedenle önyargılı olabilecek bir karma ile yapılmaz.

Örneğini denedim ve Spark 1.6.2 ve Spark 2.0.0 ile kesin aynı sonuçları aldım. Ancak, Scala spark-shell:

scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator } 
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24 

scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq 
res1: Seq[Int] = WrappedArray(1000, 2000, 3000) 

scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq 
res2: Seq[Int] = WrappedArray(1999, 2001, 2000) 

scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq 
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000) 

scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq 
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500) 

Böyle güzel bölümler! Ben yükseltme olamazdı beri (.. Değil tam cevap Şimdiye kadar sadece benim bulguları paylaşmak istedim Maalesef bu)


+0

, burada ne yaptım: [memoryoverhead konu içinde Spark] (https://gsamaras.wordpress.com/code/memoryoverhead-issue-in-spark/) (bunun üzerindeki herhangi bir girdi * karşılandı *). DirectMemory arabelleğinin ne olduğunu bilmiyorum, ya da hangi özel getiriye başvurduğunuzu, benimle çıplak kalıyorum ama ben yeniyim.Biz Spark1.6.2 kullanıyoruz, ama ben buna kontrol etmedim. Kıvılcım kabuğunu çalıştırmaya çalışıyorum ama bir hata alıyorum: bir gizli anahtar olmalı ..', bu yüzden onaylayamıyorum. BTW, iyi cevaplar için teşekkürler, [bana daha önce çok yardımcı oldu] (http://stackoverflow.com/questions/38755522/read-a-distributed-tab-delimited-csv)! – gsamaras

+2

Doğrudan bellek arabellekleri, JVM tarafından ayrılan ancak öbeğin dışında kalan belleklerdir. Normalde, bir Spark yürütücüsü yığın boyutuyla sınırlıdır, dolayısıyla YARN tarafından öldürülmez. Doğrudan arabelleklerin ayrıştırılması, yığın boyutundan daha fazla bellek kullanmasına ve YARN tarafından öldürülmesine izin verir. (https://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html) Büyük doğrudan tamponlar, redüktör görevleri maptorlardan _fetch_ verisi alındığında, karışıklıklar sırasında tahsis edilir. –

İlgili konular