2015-08-11 14 views
9

Apache Spark'un DataFrameReader.json() gzipli JSONlines dosyalarını otomatik olarak işleyebilir ancak sıkıştırılmış JSONline dosyaları yazmak için DataFrameWriter.json() yolunu bulmanın bir yolu yoktur. Ekstra ağ G/Ç'si bulutta çok pahalıdır.Spark: DataFrame'i sıkıştırılmış JSON olarak yazmak

Bu soruna bir çözüm var mı?

+0

Json çıkışını sıkıştırmanın bir yolunu mu keşfettiniz? Ben de bir çözüm arıyorum. –

+0

Henüz bunu yapmanın bir yolunu bulamadım. – Sim

cevap

11

Aşağıdaki çözümler pyspark kullanmaktadır, ancak Scala'daki kodun benzer olacağını varsayıyorum. Eğer sparkContext otomatik gzip kullanarak sıkıştırılmış olduğunu kullanılarak üretmek herhangi bir dosya üzerinde kod ile

conf = SparkConf() 
conf.set("spark.hadoop.mapred.output.compress", "true") 
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") 
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK") 

:

İlk seçenek SparkConf initialize zaman aşağıdaki ayarlamaktır.

İkinci seçenek, yalnızca içeriğinizde seçilen dosyaları sıkıştırmak istiyorsanız. sıkıştırılmış JSON, gerektirmez yazmak için daha basit bir yolu var

df_rdd = self.df.toJSON() 
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec") 
+0

Scala RDD API'si 'def saveAsTextFile'dır (yol: String, codec: Class [_ <: CompressionCodec])' dır, böylece kod sınıfı doğrudan bir dizge olarak değil de geçirilmelidir. – Sim

+0

Verileri bir dosyaya kaydederken dolandırıcılık biçiminden kaçınmanın mümkün olup olmadığını öğrenmek. Dizini '_SUCCES' ve' part- * 'dosyası ile kullanamıyorum. Tek bir dosyaya ihtiyacım var ... – lisak

+0

Diriliş için özür dilerim, ama buna inanmak zor buluyorum '' 'conf.set (" spark.hadoop.mapred.output.compression.codec "," true ") '' gerekli – tarzan

7
Kıvılcım 2.x ile

(ve belki daha erken, ben test etmedi): "df" senin dataframe olduğunu söylemek ve hedefinizi filename Lets yapılandırmasını değiştirme: Bu da CSV için ve Parke işleri

val df: DataFrame = ... 
df.write.option("compression", "gzip").json("/foo/bar") 

sadece sıkıştırma seçeneğini ayarladıktan sonra dosyası yazmak yerine .json() ait() ve .parquet() .csv kullanın.

Olası kodekler şunlardır: yok, bzip2, deflate, gzip, lz4 ve snappy.