2015-12-07 22 views
5

Kıvılcım akışı ile kafka'dan okumaya çalışırken bazı sorunlar yaşıyorum. Spark Akış Kafka akışı

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 

val kafkaParams = Map[String, String](
    "zookeeper.connect" -> "localhost:2181", 
    "group.id" -> "consumergroup", 
    "metadata.broker.list" -> "localhost:9092", 
    "zookeeper.connection.timeout.ms" -> "10000" 
    //"kafka.auto.offset.reset" -> "smallest" 
) 

val topics = Set("test") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

Daha önce limanda 9092. limanından 2181 ve Kafka sunucusunda 0.9.0.0 de zookeeper başladı Ama Kıvılcım sürücüsünde aşağıdaki hatayı alıyorum:

Benim kodudur

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) 
at scala.Option.map(Option.scala:145) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) 

hayvan bakıcısı günlüğü:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) 

Herhangi bir ipucu?

size sorun yanlış kıvılcım-akış-kafka versiyonunu ilgiliydi

cevap

14

çok teşekkür ederim. documentation

Kafka açıklandığı üzere

: Spark 1.5.2 Akış benim pom.xml'Bu

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 

dahil 0.8.2.1

Yani Kafka ile uyumludur (0.9.0.0 sürümü yerine) sorunu çözdü.

Umut bu

0

Kafka10 akışı/kıvılcım Bu konuda bütün gün geçirdi ve Bu postayı yüzlerce kez okumak gerekir 2.1.0/DKG/Mezosfer

Ugg yardımcı olur. Kıvılcım 2.0.0, 2.0.1, Kafka 8, Kafka 10'u denedim. Kafka 8'den uzak dur, kıvılcım 2.0.x ve bağımlılıklar her şeydir. Aşağıdan başla. İşe yarıyor.

SBT:

"org.apache.hadoop" % "hadoop-aws" % "2.7.3" excludeAll ExclusionRule(organization = "org.apache.hadoop", name = "hadoop-common"), 
"org.apache.spark" %% "spark-core" % "2.1.0", 
"org.apache.spark" %% "spark-sql" % "2.1.0" , 
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0", 
"org.apache.spark" % "spark-streaming_2.11" % "2.1.0" 

Çalışma Kafka/Kıvılcım Akış kodu:

val spark = SparkSession 
    .builder() 
    .appName("ingest") 
    .master("local[4]") 
    .getOrCreate() 

import spark.implicits._ 
val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) 

val topics = Set("water2").toSet 

val kafkaParams = Map[String, String](
    "metadata.broker.list"  -> "broker:port,broker:port", 
    "bootstrap.servers"   -> "broker:port,broker:port", 
    "group.id"     -> "somegroup", 
    "auto.commit.interval.ms"  -> "1000", 
    "key.deserializer"   -> "org.apache.kafka.common.serialization.StringDeserializer", 
    "value.deserializer"   -> "org.apache.kafka.common.serialization.StringDeserializer", 
    "auto.offset.reset"   -> "earliest", 
    "enable.auto.commit"   -> "true" 
) 

val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) 

messages.foreachRDD(rdd => { 
    if (rdd.count() >= 1) { 
    rdd.map(record => (record.key, record.value)) 
     .toDS() 
     .withColumnRenamed("_2", "value") 
     .drop("_1") 
     .show(5, false) 
    println(rdd.getClass) 
    } 
}) 
ssc.start() 
ssc.awaitTermination() 

Bunu görmek eğer gibi pek alabilirim Lütfen bazı itibar puanları. :)

İlgili konular