2016-03-19 18 views
2

java.lang.Math.max ile Spark reduce işlevini kullanırken beklenmedik davranışlarla karşılaşıyorum. İşte örnek kod var:Apache Spark java.lang.Math.max ile beklenmedik davranış azaltın

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator)); 
System.out.println(populationWithFitness.values().collect().toString()); 
long currentMaxFitness = populationWithFitness.values().reduce(Math::max); 
System.out.println("After Reduce: " + currentMaxFitness); 

Kod yukarıdaki birden çok kez denir ve çoğu zaman bu gibi beklenmeyen bir sonuç üretir: Eğer redüktör değerini üretir görebileceğiniz gibi

[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect 
After Reduce: -2392513 //value produced by reducer 

-2392513 ancak bu değer bile değil RDD'nin basılı değerleri ile karşılaştırırken RDD'de. Neden o? collect()reduce()'u etkiliyor mu? İlk reddetme ve daha sonra orijinal RDD'yi toplama konusunda da başka bir şekilde denedim ve hala bu garip davranışı gözlemliyorum. java.Math kütüphanesinden statik yöntemin geçirilmesinin serileştirme sırasında sorun yaratabileceğini ancak Spark Quick Start Tutorial'a atıfta bulunabileceğini düşünerek reducer numaralı telefondan Math.max kullanıyor ve görünüşe göre çalışması gerekiyor.

Herhangi bir fikrin var mı?

teşekkür ederiz

DÜZENLEME:

ek bilgiler: Bu pasajı birden tekrarlamalar vardır büyük bir programın parçası olduğunu ve her tekrarında deniyor. İlk yineleme doğru değer ancak diğer tüm yineleme garip sonuçlar

üretiyoruz maxValue reducer üretilen doğru sonucu üretir

EDIT2:

Böyle bir satırda populationWithFitness.values().collect().toString() üç kez yazdırırken:

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator)); 
System.out.println(populationWithFitness.values().collect().toString()); 
System.out.println(populationWithFitness.values().collect().toString()); 
System.out.println(populationWithFitness.values().collect().toString()); 
long currentMaxFitness = populationWithFitness.values().reduce(Math::max); 
System.out.println("After Reduce: " + currentMaxFitness); 

Eğer fi içinde

Generation 1 
[-3187591, -3984035, -3508984, -3054649] 
[-3187591, -3984035, -3508984, -3054649] 
[-3187591, -3984035, -3508984, -3054649] 
After Reduce: -3054649 
Generation 2 
[-3084310, -3931687, -3508984, -3054649] 
[-3084310, -3847178, -3508984, -2701881] 
[-3148206, -3984035, -2806859, -2989184] 
After Reduce: -2949478 
Generation 3 
[-3187591, -3984035, -3696853, -3054649] 
[-3187591, -3984035, -3178920, -3015411] 
[-3148206, -3804759, -3657984, -2701881] 
After Reduce: -2710313 
Generation 4 
[-3187591, -2982220, -3310753, -3054649] 
[-3148206, -2985628, -3657984, -2701881] 
[-3148206, -2706580, -3451228, -2989184] 
After Reduce: -2692651 
. 
. 
. 

görebileceğiniz gibi: şöyle çıktı almak İlk iterasyon her şey iyi çalışıyor ama sonraki iterasyonlarda tuhaf bir çıktı üretiyor. Sorun şu ki tembel değerlendirme ile bir ilgisi olduğunu ve topladığımda dönüşüm uygulanmadığı ancak emin olmadığım.

Ben de JavaDoubleRDD ile reduce(Math::max) yerine çalıştı ve bu JavaDoubleRDD üzerinde max denilen ama sonuç aynıydı:

JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2()); 
long currentMaxFitness = stats.max().longValue(); 

Ben parametrelerle çalışan yerel modunda bu kodu test ediyorum diğer önemli nokta:

spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt 
+0

Gerçekten bu gerçek kod mu? –

+0

Evet, uyguladığım sampleRdd ifadesi dışında map() ', eşlemeden sonra' Long' değerleri üreten bazı nesnelerin toplanmasıdır. Diğer parçalar, daha kolay başvuru için değiştirilen isimlerle kodumdan kod parçacıklarıdır. Sağlanan örnek çıktı da aldığım gerçek çıktı. – MichaelDD

+0

rdd'inizin kaynağı nedir? –

cevap

1

Bu büyük olasılıkla (% 99) evaluateFitness(isl, fitnessCalculator)'un içinde bir yerde meydana gelir. Bir çeşit yeniden üretilemez kaynak kullanıyor gibi görünüyor ve bu nedenle farklı bir farklı çalışma olan geri sonuçları gönderiyor. Spark'in tembel olduğunu ve uygulamanın her bir ardışık eylemde tekrar çalışacağını unutmayın. Buna yardımcı olmak için önbelleğe alma özelliğini kullanabilirsiniz, ancak başarısız bile olsa (düğüm başarısız/veri önbellekten düşüyor). En iyi bahistir, burada kontrol noktasını kullanmaktır, ama moreso bile, icraatın kendisini değiştirecek şekilde değiştirmelisiniz.

+0

Çok teşekkür ederim. Önbellek benim için çalışıyor. Sen deterministik olmayan 'harita' fonksiyonu ile haklısın, ancak "değerlendirmeFitness" değil, daha sonra "seçim" ve "crossover" fonksiyonu (Ben Genetik Algoritmalar hakkında konuşuyorum değişken adlandırma sonucuna varmış sanırım) bazı rasgelelik faktörü gerektiren Çalışabilmek için uygulama bitini değiştiremiyorum. "Harita" aşamalarını zincirliyor ve en iyi çözümü elde etmek için yalnızca maksimum fitness ve son nüfus için "azalt" ı çağırıyorum. Hadoop'da uyguladıkları bazı araştırma makalelerini takip ettim, ama kıvılcımsal değerlendirme, onu biraz daha zorlaştırıyor. – MichaelDD

İlgili konular