2016-04-01 24 views
3

Bir dosyayı Spark kullanarak HDFS'ye yazarken, bu bölümleme kullanılmadığında oldukça hızlıdır. Bunun yerine, dosyayı yazmak için bölümleme kullandığımda, yazma gecikmesi faktörü ~ 24 artar.Dosya yazmak için kıvılcım ayırma çok yavaş

Aynı dosya için, bölümsüz yazma yaklaşık 600 ms sürer. Kimliği ile bölümle yazma (dosyada 1.000 kimlik olduğu için tam olarak 1.000 bölüm oluşturur) yaklaşık 14 saniye sürer.

Bazılarınız bölümlenmiş bir dosya yazmanın çok uzun sürdüğü deneyime sahip misiniz? Bunun temel nedeni nedir, belki de Spark'un her bölüm için 1.000 klasör ve dosya oluşturması gerekiyor mu? Bunun nasıl hızlandırılabileceğine dair bir fikriniz var mı?

val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) } 

val df = myRdd.toDF 

df.write.mode(SaveMode.Append) 
.partitionBy("id") 
.parquet(path) 
+0

Kullandığınız kodu ekleyebilir misiniz? – zero323

cevap

0

Kıvılcım executors sizin veri bölümleme sonra küme arasında nasıl dağıldığına bağlı, sahip oldukları verileri yazmak için HDF'ler ile iletişim kurar.

Görünüşe göre daha küçük veri kümeleri için, birden çok yürütücü düğümden HDFS'ye bağlantı kurma ve yazma işlemi, tüm dosyayı sırayla yazmakla karşılaştırıldığında, daha fazla olacaktır. Bunu önlemek için nasıl

: Varsayılan kıvılcım By

, örnek parçacıkları (anahtar ve aynı karma ile anahtar aynı düğüme gider Karmaların) Menzil bölümleyici belirtmeyi deneyin bulabilirsiniz Hash bölümleme kullanarak veri bölmeler aşağıda:

Aşağıdaki parçacık, Hash bölümleyici sizinRdd.groupByKey() kullanır. saveAsTextFile ("HDFS PATH");

Aşağıdaki snippet, numaralı özel aralık bölümleyicimizi kullanıyor. RangePartitioner(8, yourRdd) numaralı belgede belirtildiği gibi 8 bölüm oluşturur ve 8 bağlantı üzerinden yazma, daha sonra 1000 bağlantı üzerinden yazarak daha iyi bir seçim olacaktır.

val tunedPartitioner = new RangePartitioner(8, yourRdd) 
val partitioned = yourRdd.partitionBy(tunedPartitioner).saveAsTextFile("HDFS PATH"); 

Yine bu yazmak için veriler arasında bir değiş tokuş ve oluşturduğunuz bölümleri sayısıdır.

+1

Fikir güzel, ancak veri çerçeveleriyle çalışmayacak. Yeniden bölümlendirme verilerinin bir örneğini gösterebilir ve parke içine kaydedebilir misiniz? – alexeipab

+0

@alexeipab Şu anda özel bir Partitioner kullanamazsınız. Yapabileceğiniz tek şey, yeniden bölümlemeyi kullanarak sütuna göre bölümdür. Alternatif olarak, veri çerçevenizi temel alan RDD'yi bölümlemek için myDF.rdd.partitionBy() öğesini kullanabilirsiniz. – Vektor88