2016-03-28 20 views
2

'da yığın akış hatası yaratıyor Apache Spark'in yeni başlamasından biriyim. Şu anda bir RDD'yi tekrar güncellemeyi ve daha sonra yaklaşık 10 KB'lık verileri sürücülerden sürücüye toplayan bir Makine Öğrenimi programı üzerinde çalışıyorum. Ne yazık ki, 600 yineleme çalıştırıldığında StackOverFlow hatası alıyorum! Aşağıdaki kodum. Yineleme sayısı 400'ün üzerinde olduğunda stackovers hatası işlevinde stackoverflow hatası oluştu! indexedDevF ve indexedData olan indexedRDDUzun seri RDD ile yinelemeli kod, Apache Spark

breakable{ 
    while(bLow > bHigh + 2*tolerance){ 
    indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues(x => (x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow))) 
    if (iteration % 50 == 0) { 
      indexedDevF.checkpoint() 
    } 
    indexedDevF.persist() // essential to get correct answer 

    val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will stackoverflow 

    var min_value = Double.PositiveInfinity 
    var max_value = -min_value 
    var min_i = -1 
    var max_i = -1 

    i = 0 
    while(i < m){ 

     if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){ 
      if(devFMap(i) <= min_value){ 
       min_value = devFMap(i) 
       min_i = i 
      } 
     } 

     if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){ 
      if(devFMap(i) >= max_value){ 
       max_value = devFMap(i) 
       max_i = i 
      } 
     } 
     i = i+1 
    } 

    iHigh = min_i 
    iLow = max_i 
    bHigh = devFMap(iHigh) 
    bLow = devFMap(iLow) 

    dataiHigh = indexedData.get(iHigh.toLong).get 
    dataiLow = indexedData.get(iLow.toLong).get 

    eta = 2 - 2 * kernel(dataiHigh, dataiLow) 

    alphaHighOld = alpha(iHigh) 
    alphaLowOld = alpha(iLow) 
    var alphaDiff = alphaLowOld - alphaHighOld 
    var lowLabel = y(iLow) 
    var sign = y(iHigh) * lowLabel 

    var alphaLowLowerBound = 0D 
    var alphaLowUpperBound = 0D 

    if (sign < 0){ 
     if (alphaDiff < 0){ 
      alphaLowLowerBound = 0; 
      alphaLowUpperBound = cost + alphaDiff; 
     } 
     else{ 
      alphaLowLowerBound = alphaDiff; 
      alphaLowUpperBound = cost; 
     } 
    } 
    else{ 
     var alphaSum = alphaLowOld + alphaHighOld; 
     if (alphaSum < cost){ 
      alphaLowUpperBound = alphaSum; 
      alphaLowLowerBound = 0; 
     } 
     else{ 
      alphaLowLowerBound = alphaSum - cost; 
      alphaLowUpperBound = cost; 
     } 
    } 

    if (eta > 0){ 
     alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta; 
     if (alphaLowNew < alphaLowLowerBound) 
      alphaLowNew = alphaLowLowerBound; 
     else if (alphaLowNew > alphaLowUpperBound) 
      alphaLowNew = alphaLowUpperBound; 
    } 
    else{ 
     var slope = lowLabel * (bHigh - bLow); 
     var delta = slope * (alphaLowUpperBound - alphaLowLowerBound); 
     if (delta > 0){ 
      if (slope > 0) 
       alphaLowNew = alphaLowUpperBound; 
      else 
       alphaLowNew = alphaLowLowerBound; 
     } 
     else 
      alphaLowNew = alphaLowOld; 
    } 

    alphaLowDiff = alphaLowNew - alphaLowOld; 
    alphaHighDiff = -sign*(alphaLowDiff); 
    alpha(iLow) = alphaLowNew; 
    alpha(iHigh) = (alphaHighOld + alphaHighDiff); 


    if(iteration % 50 == 0) 
     print(".") 

    iteration = iteration + 1; 


} 

===================

orijinal soruyu (bir kütüphane https://github.com/amplab/spark-indexedrdd sağlanan AMPLab tarafından geliştirilen) Aşağıdaki gibidir, kontrol noktası işe yaramaz bulmak ve program stackoverflow errer ile sonucuna varacaksınız !! Sorunumu açıklamak için basit bir test kodu yazarım. Neyse ki, iyi bir adam problemi çözmeme yardım ediyor, cevabı aşağıda bulabilirsiniz! Ancak, hatta kontrol noktası gerçekten çalışıyor, hala RDD.checkpoint belgelerine bakıldığında benim programda :(

for(i <- 1 to 1000){ 
    a = a.map(x => x+1).persist 
    var b = a.collect() 
    if(i%100 == 0){ 
    a.checkpoint() 
    } 
    print(".") 
} 

cevap

0

ile stackoverflow hata alıyorum, diyor:

herhangi bir iş infaz edilmeden önce bu işlevi çağrılmalıdır Eğer biraz kodunuzu değiştirirseniz bu RDD üzerinde

gerçekten de, kontrol noktasıa toplama önce yapmış - çalışır StackOverflowError:

for(i <- 1 to 1000){ 
    a = a.map(x => x+1).persist 

    if(i%100 == 0){ 
    a.checkpoint() 
    } 

    var b = a.collect() 

    print(".") 
} 
+0

Nezaketiniz için teşekkür ederiz! Kontrol noktasının işe yaramaz olmasının nedenini açıklar. Ancak, kontrol noktası çalıştıktan sonra, programım hala stackoverflow hatası çekiyor! Kodumu soruya ekliyorum. Lütfen bir göz atabilir misiniz? –

+0

Sorunuzu güncellediğinizi görüyorum - belki de orijinal "örnek" kodunu da bırakmış olmalısınız (bu cevabın başkalarına yardımcı olması için) –

+1

@JiaruiFang Başka bir sorunuz ya da başka bir sorunuz varsa Sorun lütfen cevabı kabul edin ve yeni bir soru sorun! Soruyu yeni sorunla düzenleme! – eliasah

İlgili konular