Kıvılcım görevinde lambda ifadesini kullanmaya çalıştım ve "java.lang.IllegalArgumentException: Invalid lambda deserialization" istisnasını atar. Bu istisna, kod "dönüşüm (pRDD-> pRDD.map (t-> t._2))" gibi olduğunda atılır. Kod snippet'i aşağıda.Apache Spark Lambda Expression - Serialization Issue
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2));
Yukarıdaki iki seçenek işe yaramadı. "T-> t_.2" lambda ifadesi yerine argüman olarak "f" nesnesinin altından geçiyormuşum gibi. İşe yarıyor.
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
Bu işlevleri lambda ifadesi olarak ifade eden doğru formatın ne olduğunu bilir miyim?
public static void main(String[] args) {
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
JavaStreamingContext ssc = JavaStreamingFactory.getInstance();
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(s->{return Arrays.asList(s.split(" "));});
JavaPairDStream<String,Integer> pairRDD = words.mapToPair(x->new Tuple2<String,Integer>(x,1));
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));
//JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works
con.print();
ssc.start();
ssc.awaitTermination();
}
sorabilir miyim ifadeler kesinlikle tavsiye edilmez. – eliasah
@eliasah Lambda ifadelerinin kıvılcım serileştirilmesi çok standart ve beklenen bir işlemdir. Uzaktan çalışmak için işleri seri hale getiren paralel yürütme motorudur. – whaleberg