2015-03-17 24 views
6

Kafka'yı tek bir düğüm üzerinde kurdum ve kafo sunucusunun yanı sıra zookeeper'i de başlattım. Konsolu dahili yapımcı ve tüketici için test ettim ve iyi çalışıyor.Ama konsolda bir iç kafka tüketici çalıştırıyorum ve özel yapımcı işe yaramıyor. Aşağıda Kafka kullanarak kafka sunucusu ile iletişim kurulamıyor Üretici API

benim Üretici sınıfı kontrol producer.send (ulaşır

Properties props = new Properties(); 

    props.put("metadata.broker.list", "xx.xx.xx.xx:9092"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("partitioner.class", "com.example.producer.SimplePartitioner"); 
    props.put("request.required.acks", "1"); 

    ProducerConfig config = new ProducerConfig(props); 

    Producer<String, String> producer = new Producer<String, String>(config); 
    KeyedMessage<String, String> data = new KeyedMessage<String, String>(
      "mails", "xxxx"); 
    producer.send(data); 

), ben kafka bağlanmaya çalışıyordu şu istisna

java.nio.channels.ClosedChannelException 
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) 
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) 
at kafka.producer.SyncProducer.send(SyncProducer.scala:113) 
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) 
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) 
at kafka.utils.Utils$.swallow(Utils.scala:172) 
at kafka.utils.Logging$class.swallowError(Logging.scala:106) 
at kafka.utils.Utils$.swallowError(Utils.scala:45) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) 
at kafka.producer.Producer.send(Producer.scala:77) 
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
+0

"props.put (" partitioner.class "," com.example.producer.SimplePartitioner ")' bölümünün – user2720864

cevap

3

ile 3 denemeden sonra durağı Harici VM'den tutulma durumunda bir Producer sınıfından sunucu. localhost ile ip adresini config/kafka'daki producer.properties içinde değiştirmek zorunda kaldım.

+0

numaralı kısmını da/etc/hosts dosyasında da – Shams

İlgili konular