'da yığın akış hatası yaratıyor Apache Spark'in yeni başlamasından biriyim. Şu anda bir RDD'yi tekrar güncellemeyi ve daha sonra yaklaşık 10 KB'lık verileri sürücülerden sürücüye toplayan bir Makine Öğrenimi programı üzerinde çalışıyorum. Ne yazık ki, 600 yineleme çalıştırıldığında StackOverFlow hatası alıyorum! Aşağıdaki kodum. Yineleme sayısı 400'ün üzerinde olduğunda stackovers hatası işlevinde stackoverflow hatası oluştu! indexedDevF ve indexedData olan indexedRDDUzun seri RDD ile yinelemeli kod, Apache Spark
breakable{
while(bLow > bHigh + 2*tolerance){
indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues(x => (x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow)))
if (iteration % 50 == 0) {
indexedDevF.checkpoint()
}
indexedDevF.persist() // essential to get correct answer
val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will stackoverflow
var min_value = Double.PositiveInfinity
var max_value = -min_value
var min_i = -1
var max_i = -1
i = 0
while(i < m){
if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){
if(devFMap(i) <= min_value){
min_value = devFMap(i)
min_i = i
}
}
if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){
if(devFMap(i) >= max_value){
max_value = devFMap(i)
max_i = i
}
}
i = i+1
}
iHigh = min_i
iLow = max_i
bHigh = devFMap(iHigh)
bLow = devFMap(iLow)
dataiHigh = indexedData.get(iHigh.toLong).get
dataiLow = indexedData.get(iLow.toLong).get
eta = 2 - 2 * kernel(dataiHigh, dataiLow)
alphaHighOld = alpha(iHigh)
alphaLowOld = alpha(iLow)
var alphaDiff = alphaLowOld - alphaHighOld
var lowLabel = y(iLow)
var sign = y(iHigh) * lowLabel
var alphaLowLowerBound = 0D
var alphaLowUpperBound = 0D
if (sign < 0){
if (alphaDiff < 0){
alphaLowLowerBound = 0;
alphaLowUpperBound = cost + alphaDiff;
}
else{
alphaLowLowerBound = alphaDiff;
alphaLowUpperBound = cost;
}
}
else{
var alphaSum = alphaLowOld + alphaHighOld;
if (alphaSum < cost){
alphaLowUpperBound = alphaSum;
alphaLowLowerBound = 0;
}
else{
alphaLowLowerBound = alphaSum - cost;
alphaLowUpperBound = cost;
}
}
if (eta > 0){
alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta;
if (alphaLowNew < alphaLowLowerBound)
alphaLowNew = alphaLowLowerBound;
else if (alphaLowNew > alphaLowUpperBound)
alphaLowNew = alphaLowUpperBound;
}
else{
var slope = lowLabel * (bHigh - bLow);
var delta = slope * (alphaLowUpperBound - alphaLowLowerBound);
if (delta > 0){
if (slope > 0)
alphaLowNew = alphaLowUpperBound;
else
alphaLowNew = alphaLowLowerBound;
}
else
alphaLowNew = alphaLowOld;
}
alphaLowDiff = alphaLowNew - alphaLowOld;
alphaHighDiff = -sign*(alphaLowDiff);
alpha(iLow) = alphaLowNew;
alpha(iHigh) = (alphaHighOld + alphaHighDiff);
if(iteration % 50 == 0)
print(".")
iteration = iteration + 1;
}
===================
orijinal soruyu (bir kütüphane https://github.com/amplab/spark-indexedrdd sağlanan AMPLab tarafından geliştirilen) Aşağıdaki gibidir, kontrol noktası işe yaramaz bulmak ve program stackoverflow errer ile sonucuna varacaksınız !! Sorunumu açıklamak için basit bir test kodu yazarım. Neyse ki, iyi bir adam problemi çözmeme yardım ediyor, cevabı aşağıda bulabilirsiniz! Ancak, hatta kontrol noktası gerçekten çalışıyor, hala RDD.checkpoint
belgelerine bakıldığında benim programda :(
for(i <- 1 to 1000){
a = a.map(x => x+1).persist
var b = a.collect()
if(i%100 == 0){
a.checkpoint()
}
print(".")
}
Nezaketiniz için teşekkür ederiz! Kontrol noktasının işe yaramaz olmasının nedenini açıklar. Ancak, kontrol noktası çalıştıktan sonra, programım hala stackoverflow hatası çekiyor! Kodumu soruya ekliyorum. Lütfen bir göz atabilir misiniz? –
Sorunuzu güncellediğinizi görüyorum - belki de orijinal "örnek" kodunu da bırakmış olmalısınız (bu cevabın başkalarına yardımcı olması için) –
@JiaruiFang Başka bir sorunuz ya da başka bir sorunuz varsa Sorun lütfen cevabı kabul edin ve yeni bir soru sorun! Soruyu yeni sorunla düzenleme! – eliasah