Belirli bir veri boyutundan geçen bir kümede sparkJob çalıştırırken (~ 2,5 gb) ya "SparkContext kapatıldı çünkü iş iptal edildi" veya "executor lost" ". İplik gui'ye baktığımda öldürülen işin başarılı olduğunu görüyorum. 500mb olan veriler üzerinde çalışırken sorun yoktur. Bir çözüm arıyordum ve şu sonuca varmıştım: - "ipliğin bazıları, beklenenden daha fazla bellek istediği için, bazı idarecileri öldürüyor."Büyük bir veri kümesinde kıvılcım çıkarırken "sparkContext kapatıldı"
Herhangi bir önerinizde nasıl hata ayıklanır? Ben ile benim kıvılcım işi teslim
komut:
/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments
ve sparkContext ayarlarında bu
val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val broadcastParser = sc.broadcast(new Parser())
val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles)
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser))
val allWords= featuresRdd
.flatMap(line => line.split(" "))
.count
val wordQuantiles= featuresRdd
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.map(pair => (pair._2 , pair._2))
.reduceByKey(_+_)
.sortBy(_._1)
.collect
.scanLeft((0,0.0)) ((res,add) => (add._1, res._2+add._2))
.map(entry => (entry._1,entry._2/allWords))
val dictionary = featuresRdd
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _) // here I have Rdd of word,count tuples
.filter(_._2 >= moreThan)
.filter(_._2 <= lessThan)
.filter(_._1.trim!=(""))
.map(_._1)
.zipWithIndex
.collect
.toMap
Ve Hata yığını gibi görünüyor başarısız
val sparkConf = (new SparkConf()
.set("spark.driver.maxResultSize", "21g")
.set("spark.akka.frameSize", "2011")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", configVar.sparkLogDir)
)
Basitleştirilmiş kodu
Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
at sparkTesting.Runner$.main(Runner.scala:133)
at sparkTesting.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Benim durumumda bu neredeyse her zaman OOM istisnalarından kaynaklanıyor. Münferit yönetici makinelerindeki günlük dosyalarına bakmaya çalışın. –
İşinle gelen printstacktrace ve bazı Java'nın util araçlarıyla JVM Heap boyutu izleyecek: sınırlaması hakkında daha fazla bilgi edinmek için jstat, jstatd, jconsole .... Hala fiziksel belleğiniz varsa, uygulamanızı başlatmadan önce JVM bellek boyutunu artırabilirsiniz! Koleksiyonlarınızı optimize edilmiş Yığın boyutunuza göre yeniden boyutlandırabilirsiniz. –