2016-04-08 19 views
2
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(config.getKafkaTopics().split(","))); 
HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
kafkaParams.put("metadata.broker.list", config.getKafkaBrokers()); 

// Create direct KAFKA stream with brokers and topics 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, 
     StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); 

Yukarıda gösterildiği gibi, KafkaUtils işlevinden createDirectStream işlevini kullanarak Kafka akışı oluşturuyorum. Bence oldukça standart ve sanırım Spark-1.5.1 ile çalıştı. KafkaUtils.createDirectStream neden bir NoSuchMethodError atar?

Ben Spark-1.6.1 geçti ve bu çünkü sürümü ise değil eminim rağmen, aşağıdaki hatayı atar: Bu tam olarak ne sorun hakkında çok az bilgi verir

Exception in thread "main" java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) 
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) 
Caused by: java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; 
    at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39) 
    at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala) 
    at kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53) 
    at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122) 
    at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) 
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) 
    at com.analytics.kafka.consumer.SystemUserAnalyticsConsumer.main(SystemUserAnalyticsConsumer.java:59) 
    ... 6 more 

olduğunu.

Burada sorun nedir? Sen ve çalıştırma inşa/derleme sırasında kullanılan Scala sürümleri için spark-streaming-kafka-0-10 uyumsuz sürümlerini kullanıyorsanız

cevap

1

, yani

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.0.1" 

emin Scala versiyonları birbirine benzemez olun (ve bunu dikkat çekmek yüzde iki işaretleri dikkat ve scalaVersion güveniyor.

İlgili konular