2015-09-28 14 views
6

Belirli bir veri boyutundan geçen bir kümede sparkJob çalıştırırken (~ 2,5 gb) ya "SparkContext kapatıldı çünkü iş iptal edildi" veya "executor lost" ". İplik gui'ye baktığımda öldürülen işin başarılı olduğunu görüyorum. 500mb olan veriler üzerinde çalışırken sorun yoktur. Bir çözüm arıyordum ve şu sonuca varmıştım: - "ipliğin bazıları, beklenenden daha fazla bellek istediği için, bazı idarecileri öldürüyor."Büyük bir veri kümesinde kıvılcım çıkarırken "sparkContext kapatıldı"

Herhangi bir önerinizde nasıl hata ayıklanır? Ben ile benim kıvılcım işi teslim

komut:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments 

ve sparkContext ayarlarında bu

val hc = new org.apache.spark.sql.hive.HiveContext(sc) 
val broadcastParser = sc.broadcast(new Parser()) 

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles) 
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser)) 

val allWords= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .count 

val wordQuantiles= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) 
    .map(pair => (pair._2 , pair._2)) 
    .reduceByKey(_+_) 
    .sortBy(_._1) 
    .collect 
    .scanLeft((0,0.0)) ((res,add) => (add._1, res._2+add._2)) 
    .map(entry => (entry._1,entry._2/allWords)) 

val dictionary = featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) // here I have Rdd of word,count tuples 
    .filter(_._2 >= moreThan) 
    .filter(_._2 <= lessThan) 
    .filter(_._1.trim!=("")) 
    .map(_._1) 
    .zipWithIndex 
    .collect 
    .toMap 

Ve Hata yığını gibi görünüyor başarısız

val sparkConf = (new SparkConf() 
    .set("spark.driver.maxResultSize", "21g") 
    .set("spark.akka.frameSize", "2011") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.dir", configVar.sparkLogDir) 
    ) 

Basitleştirilmiş kodu

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) 
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511) 
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435) 
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715) 
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185) 
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714) 
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) 
at org.apache.spark.rdd.RDD.count(RDD.scala:1121) 
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50) 
at sparkTesting.Runner$.main(Runner.scala:133) 
at sparkTesting.Runner.main(Runner.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
+4

Benim durumumda bu neredeyse her zaman OOM istisnalarından kaynaklanıyor. Münferit yönetici makinelerindeki günlük dosyalarına bakmaya çalışın. –

+1

İşinle gelen printstacktrace ve bazı Java'nın util araçlarıyla JVM Heap boyutu izleyecek: sınırlaması hakkında daha fazla bilgi edinmek için jstat, jstatd, jconsole .... Hala fiziksel belleğiniz varsa, uygulamanızı başlatmadan önce JVM bellek boyutunu artırabilirsiniz! Koleksiyonlarınızı optimize edilmiş Yığın boyutunuza göre yeniden boyutlandırabilirsiniz. –

cevap

4

Cevabı buldu.

Masam 20gb avro dosyası olarak kaydedildi. Yöneticiler açmaya çalıştığında. Her biri 20gb belleğe yüklemek zorunda kaldı.

1

Belirtiler, yürütme görevlerinde bir OutOfMemory hatası için tipiktir. İş başındayken yürütücü için belleği artırmayı deneyin. Saprk-submit, spark-shell parametresinin --executor-memory parametresine bakınız. Varsayılan değer 1G

1

başka olanaklı neden csv kullanarak yerine avro bunu çözüldü hata bazı başka kod değerlendirdikten sonra bir kavanoz dosyasını içe olmasıdır "SparkContext kapatma". ,

sorunu gidermek için (Bu yalnızca. Kıvılcım Notebook meydana gelebilecek) dosyanızın başlangıcına tüm :cp myjar.jar ifadeleri taşıyın.

İlgili konular