2015-09-11 31 views
7

Verileri toplamak için bir Spark işi çalıştırıyorum. Temel olarak bir mutable.HashMap[Zone, Double] içeren bir Profil adı verilen özel bir veri yapısına sahibim.'spark.driver.maxResultSize' kapsamı

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

bariz çözüm artırmak için geçerli:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1} 
val aggregated = dailyProfiles 
    .aggregateByKey(new Profile(), 3200)(merge, merge).cache() 

İlginçtir ki, Kıvılcım aşağıdaki hata nedeniyle başarısız: Aşağıdaki kod ile, belirli bir tuşa (bir UUID) paylaşan tüm profilleri birleştirmek istediğiniz "spark.driver.maxResultSize", ama iki şey beni şaşırtıyor. Ben 1024,0 büyük

  • 1024,0 daha tüm belgeleri almak ve yardım Bu özel hata ve yapılandırma parametresi geri sürücüye bir değer alabilir fonksiyonlarını etkileyen gösterir googling bulduğumuz bir tesadüf Çok fazla

    1. . (take() veya collect() deyin), ancak sürücüye HERHANGİ BİR ŞEY veriyorum, sadece HDFS'den okuma, bir araya getirme, HDFS'ye geri kaydetme.

    Bu hatayı neden aldığımı bilen var mı?

  • +0

    sen benim cevap doğrulayabildi? – mrsrinivas

    +0

    Bunu size tavsiye edeceğim, fakat ne yazık ki artık bu koda (veya şirkete) erişimim yok ve cevabınız # 2'yi çözüyor, yani işlem ilk sırada olmamalıdır: -S –

    cevap

    1

    Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

    That serialized output must be more than 1024.0 MB and less than 1024.1 MB.

    Eklendi Apache Spark kodu snippet'i, Bu hatayı almak çok ilginç ve çok nadirdir.

    Her ikisi de Uzun türleridir ve bayt cinsinden değeri içerir. Ancak msg, Utils.bytesToString()'dan yuvarlak değer taşır.

    //TaskSetManager.scala 
        def canFetchMoreResults(size: Long): Boolean = sched.synchronized { 
        totalResultSize += size 
        calculatedTasks += 1 
        if (maxResultSize > 0 && totalResultSize > maxResultSize) { 
         val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + 
         s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + 
         s"(${Utils.bytesToString(maxResultSize)})" 
         logError(msg) 
         abort(msg) 
         false 
        } else { 
         true 
        } 
        } 
    

    Apache Spark 1.3 - source


    //Utils.scala 
        def bytesToString(size: Long): String = { 
        val TB = 1L << 40 
        val GB = 1L << 30 
        val MB = 1L << 20 
        val KB = 1L << 10 
    
        val (value, unit) = { 
         if (size >= 2*TB) { 
         (size.asInstanceOf[Double]/TB, "TB") 
         } else if (size >= 2*GB) { 
         (size.asInstanceOf[Double]/GB, "GB") 
         } else if (size >= 2*MB) { 
         (size.asInstanceOf[Double]/MB, "MB") 
         } else if (size >= 2*KB) { 
         (size.asInstanceOf[Double]/KB, "KB") 
         } else { 
         (size.asInstanceOf[Double], "B") 
         } 
        } 
        "%.1f %s".formatLocal(Locale.US, value, unit) 
        } 
    

    Apache Spark 1.3 - source