2016-03-24 17 views
0

Kafka komisyoncusu ve kafesteki bir kafka üreticisi ile gerçekten basit bir uygulama yapmaya çalışan günler için uğraşıyorum. google'daki benzer sorunların yanıtları, ancak yine de çalışamadım. kafka.producer.async.DefaultEventHandler - Konular için istek gönderilemedi

Bu

alıyorum hatadır:

INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(clicks) 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092 
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks -> 
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException 
[ProducerSendThread-] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) Set(clicks) 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092 
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks -> 
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException 
[ProducerSendThread-] ERROR kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: clicks 
[ProducerSendThread-] INFO kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 3 

Ve bu benim kodudur: Zaten bir Yaupon örneği doğru localhost üzerinde çalışan var

Properties properties= new Properties(); 
properties.put("broker.id", "1"); 
properties.put("advertised.host.name", "localhost"); 
properties.put("advertised.port", "9092"); 
properties.put("host.name", "localhost"); 
properties.put("auto.create.topics.enable","true"); 
properties.put("zookeeper.connect", zookeeperConnectString); 
properties.put("port","9092"); 
properties.setProperty("num.partitions", "1"); 
properties.setProperty("log.dirs", newPath(KAFKA_LOG_DIR).toString()); 

KafkaConfig kafkaConfig = new KafkaConfig(properties); 
KafkaServerStartable kafkaServer = new KafkaServerStartable(kafkaConfig); 
kafkaServer.startup(); 

String topic = clicks; 

ZkClient zookeeper = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$); 
if (!AdminUtils.topicExists(zookeeper, topic)) { 
    AdminUtils.createTopic(new ZkClient(zookeeperConnectString), topic, 1, 1, new Properties()); 
    } 
zookeeper.close(); 

Properties producerProps = new Properties(); 
producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
producerProps.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
producerProps.setProperty("producer.type", "async"); 
producerProps.put("metadata.broker.list", "localhost:9092"); 
producerProps.put("request.required.acks","0"); 

Producer producer = new Producer(new ProducerConfig(producerProps)); 

String click = "exampleMessage"; 
producer.send(ImmutableList.of(new KeyedMessage(topic, click))); 
producer.close(); 

: 2181.

aşağıdaki kafka sürümlerini ve zookeeper kullanıyorum:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.zookeeper</groupId> 
     <artifactId>zookeeper</artifactId> 
     <version>3.4.6</version> 
    </dependency> 

Teşekkür Herhangi bir yardım için veya yorum :) durumda biri ise

+0

_describe_ komutunu kullanarak konu meta bilgisini komut satırından almayı denediniz mi? 'bin/kafka-topics.sh --describe --zookeeper localhost: 2181 - tıklatma tıklamaları' – avr

+0

Kafka komut satırını kullanamıyorum. Ancak küratörle zookeeper'e erişirsem konu için bir düğüm oluşturulduğunu görüyorum:/broker/topic/cliks. Bunun yerine, düğüm/broker/kimlikleri boş. – nicola

+0

Yani aktif kafka brokeri yok. Bu yüzden en az bir kafka komisyoncunun çalışır durumda olduğundan emin olun. – avr

cevap

0

aynı sorunu yaşamıştır, ben iş Thread.sleep(500); ekleyerek yapılan konunun oluşturulmasından sonra. Tam olarak nedenini bilmiyorum, neden başlangıç ​​için zaman alan localhost üzerinde çalışan Zookeeper örneğinden kaynaklanıyor olabilir.

İlgili konular