2015-11-20 16 views
6

Belirli bir bölümlemeye göre S3'e yazmam gereken bir DataFrame var. Kod şöyle görünür:PartitionBy kullanılarak oluşturulan parke dosyalarının sayısı nasıl kontrol edilir

partitionBy
dataframe 
    .write 
    .mode(SaveMode.Append) 
    .partitionBy("year", "month", "date", "country", "predicate") 
    .parquet(outputPath) 

her veri sadece biraz (~ 1GB) ile klasörlerin oldukça büyük sayıda (~ 400) içine veri bölme. Ve problem geliyor - spark.sql.shuffle.partitions'un varsayılan değeri 200 olduğundan, her bir klasördeki 1GB'lık veri 200 küçük parke dosyasına bölünerek toplamda yaklaşık 80000 adet parke dosyası yazılıyor. Bu birkaç nedenden dolayı optimal değildir ve bundan kaçınmak istiyorum.

Elbette spark.sql.shuffle.partitions'u 10'dan daha küçük bir sayıya ayarlayabilirim, ancak bu ayar, birleştirme ve kümelemede karışıklıkların bölüm sayısını da kontrol ettiğinden, bunu gerçekten değiştirmek istemiyorum.

Kaç dosya yazılacağını denetlemenin başka bir yolu olup olmadığını bilen var mı?

+1

Veritabanını ".write" önce yeniden bölümlemeye çalıştınız mı? İlk bakışta 'spark.sql.shuffle.partitions' sadece karışıklık ve birleşmelerde kullanılıyor, ama başka hiçbir yerde görünmüyor. Aksi takdirde, partitionBy'de ek bir "numParameter" param için bir bilet açmalısınız. –

+0

@MariusSoutier Hmmm ... "Yeniden bölümleme" _before_ 'write' işlevinin çağrılmasının, orijinal 'dataframe'inin' partitionBy 'işlevi tarafından yeniden bölümlenmeden önce yeniden bölümlendirilmesine neden olacağını düşünürdüm. Orijinal veri çerçevesinin sadece 10 bölüm halinde yeniden bölümlenmesi, kesinlikle bir OOM istisnasına yol açacaktır. Ancak, bunu test etmek için yeni başladım. Tamamlandığında, bir güncellemeyle geri döneceğim. –

+0

@MariusSoutier işe yarıyor! Fantastik. Teşekkür ederim! Bir cevap olarak göndermek istiyor musunuz - o zaman cevap olarak işaretleyeceğim :-) –

cevap

6

Doğru olarak belirttiğiniz gibi, spark.sql.shuffle.partitions sadece SparkSQL'deki karıştırmalar ve birleştirmeler için geçerlidir. basitçe bölümleri önceki sayısına göre çalışır DataFrameWriter yılında

partitionBy (eğer write dediğimiz gibi en kısa sürede DateFrameDateFrameWriter ila taşımak). (Yazarın bölümüYalnızca yazılacak olan tabloya/parke dosyasına sütunlar atadığı için, böylelikle bölümlerin sayısı ile bir ilgisi yoktur. Bu biraz kafa karıştırıcıdır.)

Uzun hikaye kısa, sadece DataFrame'i yeniden bölümlendir onu bir yazara dönüştürmeden önce.

İlgili konular