2015-07-29 19 views
7

Kıvılcım görevinde lambda ifadesini kullanmaya çalıştım ve "java.lang.IllegalArgumentException: Invalid lambda deserialization" istisnasını atar. Bu istisna, kod "dönüşüm (pRDD-> pRDD.map (t-> t._2))" gibi olduğunda atılır. Kod snippet'i aşağıda.Apache Spark Lambda Expression - Serialization Issue

JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y); 
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map( 
(Function<Tuple2<String,Integer>,Integer>)t->t._2)); 


JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y); 
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map( 
(Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2)); 

Yukarıdaki iki seçenek işe yaramadı. "T-> t_.2" lambda ifadesi yerine argüman olarak "f" nesnesinin altından geçiyormuşum gibi. İşe yarıyor.

Function f = new Function<Tuple2<String,Integer>,Integer>(){ 
@Override 
public Integer call(Tuple2<String,Integer> paramT1) throws Exception { 
return paramT1._2; 
} 
}; 

Bu işlevleri lambda ifadesi olarak ifade eden doğru formatın ne olduğunu bilir miyim?

public static void main(String[] args) { 

      Function f = new Function<Tuple2<String,Integer>,Integer>(){ 

       @Override 
       public Integer call(Tuple2<String,Integer> paramT1) throws Exception { 
        return paramT1._2; 
       } 

      }; 

      JavaStreamingContext ssc = JavaStreamingFactory.getInstance(); 

      JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999); 
      JavaDStream<String> words = lines.flatMap(s->{return Arrays.asList(s.split(" "));}); 
      JavaPairDStream<String,Integer> pairRDD = words.mapToPair(x->new Tuple2<String,Integer>(x,1)); 
      JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y); 
      JavaDStream<Integer> con = aggregate.transform(
        (Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map( 
          (Function<Tuple2<String,Integer>,Integer>)t->t._2)); 
      //JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works 
      con.print(); 

      ssc.start(); 
      ssc.awaitTermination(); 


     } 
+0

sorabilir miyim ifadeler kesinlikle tavsiye edilmez. – eliasah

+0

@eliasah Lambda ifadelerinin kıvılcım serileştirilmesi çok standart ve beklenen bir işlemdir. Uzaktan çalışmak için işleri seri hale getiren paralel yürütme motorudur. – whaleberg

cevap

2

Neden lambda çalışmıyor bilmiyorum. Belki de sorun lambda içinde yuvalanmış bir lambda ile. Bu Spark belgeleri tarafından tanınan görünüyor.

Kontrast http://spark.apache.org/docs/latest/programming-guide.html#basics örnek: http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation örneğin ile

JavaRDD<String> lines = sc.textFile("data.txt"); 
JavaRDD<Integer> lineLengths = lines.map(s -> s.length()); 
int totalLength = lineLengths.reduce((a, b) -> a + b); 

:

import org.apache.spark.streaming.api.java.*; 
// RDD containing spam information 
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); 

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
    new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() { 
    @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception { 
     rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning 
     ... 
    } 
    }); 

ikinci örnek muhtemelen çünkü aynı sorun nedeniyle, yerine Lambda bir Function alt sınıfını kullanan sizi keşfetti.

Bunun sizin için yararlı olup olmadığını bilmiyorum, ama yuvalanmış lambdas kesinlikle Scala'da çalışır. Sorun Java lambda fonksiyonları (örneğin Arayüz İşlevi için, gerçekten paket java.util.function içinde bir arabirimi uygulayan bir "sınıf" olduğunu düşünüyorum

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information 

val cleanedDStream = wordCounts.transform(rdd => { 
    rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning 
    ... 
}) 
+0

(Açıkça) tam bir cevap değil, üzgünüm. İşe yaramazsa reddetmekten çekinmeyin! –

+0

Cevabınız için teşekkür ederiz. Evet iç içe geçmiş lambda Apache Spark – user1182253

+0

'da tanınmıyor Sadece anonim sınıfların statik sınıflar olmadığını unutmayın. Bunlar, ana nesneye, kıvılcım tarafından da serileştirilecek bir işaretleyiciye sahipler; Adlandırılmış bir statik iç sınıf kullanmak muhtemelen daha güvenlidir. – whaleberg

0

: Önceki örneğin Scala versiyonunu düşünün https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html). Gördüğüm kadarıyla bu arayüzler seri hale getirilemez ... ve işte bu noktada ...

... bir Spark işlevinde lambda kullanırken ... Spark, lambda "class" serisini serileştirmeye çalışıyor .. ve seri hale getirilemez.

Böyle bir şeyle Serializable zorlamak için deneyebilirsiniz:

Runnable r = (Runnable & Serializable)() -> System.out.println("Serializable!"); 
0

Sen Göndermeye çalıştığınız dönüş değeri boks deneyebilirsiniz: Yeni Tamsayı (paramT1._2) return ;

Ben int en seri hale getirilebilir olmadığını düşündüren çünkü kaynağın bu öneriyorum: Eğer olduğu gibi, iç sınıflar için olduğu gibi, lambda seri lambda ifadeleri seri hale getirmek isteyen neden http://mindprod.com/jgloss/intvsinteger.html

İlgili konular