2014-10-10 26 views
9

S3'deki json log verilerini S3'teki Parke dosyalarına aktarmak için Apache Spark SQL'i kullanmaya çalışıyorum. Kodum temelde:Spark SQL, Parşömen verilerini çok sayıda parça ile birlikte yazdıramıyor

import org.apache.spark._ 
val sqlContext = sql.SQLContext(sc) 
val data = sqlContext.jsonFile("s3n://...", 10e-6) 
data.saveAsParquetFile("s3n://...") 

Bu kod ben 2000 bölümüne kadar olduğunda çalışır ve ne olursa olsun verilerin hacminin, 5000 veya daha fazlası için başarısız olur. Normalde şey sadece bir kabul edilebilir sayıda için bölümleri coalesce olabilir ama bu çok büyük bir veri seti ve 2000 bölümleri de ben Sorun kıvılcım-1.1.0 bu çalıştırıyorum bu question

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s 
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s 
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ... 
java.io.IOException: Could not read footer: java.lang.NullPointerException 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190) 
     at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203) 
     at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319) 
     at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409) 
     at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409) 
     at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77) 
     at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52) 
     at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) 
     at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56) 
     at $line37.$read$$iwC$$iwC.<init>(<console>:58) 
     at $line37.$read$$iwC.<init>(<console>:60) 
     at $line37.$read.<init>(<console>:62) 
     at $line37.$read$.<init>(<console>:66) 
     at $line37.$read$.<clinit>(<console>) 
     at $line37.$eval$.<init>(<console>:7) 
     at $line37.$eval$.<clinit>(<console>) 
     at $line37.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
     at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NullPointerException 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106) 
     at java.io.BufferedInputStream.close(BufferedInputStream.java:472) 
     at java.io.FilterInputStream.close(FilterInputStream.java:181) 
     at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180) 
     at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

tarif vurmak ec2'de bir R3.xlarge üzerinde. Yukarıdaki kodu çalıştırmak için kıvılcım kabuğu konsolunu kullanıyorum. Daha sonra data SchemaRDD nesnesinde önemsiz olmayan sorgular gerçekleştirebiliyorum, bu yüzden bir kaynak sorunu gibi görünmüyor. Ayrıca ortaya çıkan Parquet dosyasını okumak ve sorgulamak da mümkündür, sadece özet dosyaları eksikliğinden dolayı oldukça uzun sürmektedir.

+1

Bu konuda hata başvurusunda ediyorum. https://issues.apache.org/jira/browse/SPARK/ –

cevap

0

deneyin false ile bu özelliği ayarlamak için:

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");