Kafka'yı tek bir düğüm üzerinde kurdum ve kafo sunucusunun yanı sıra zookeeper'i de başlattım. Konsolu dahili yapımcı ve tüketici için test ettim ve iyi çalışıyor.Ama konsolda bir iç kafka tüketici çalıştırıyorum ve özel yapımcı işe yaramıyor. Aşağıda Kafka kullanarak kafka sunucusu ile iletişim kurulamıyor Üretici API
benim Üretici sınıfı kontrol producer.send (ulaşırProperties props = new Properties();
props.put("metadata.broker.list", "xx.xx.xx.xx:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
"mails", "xxxx");
producer.send(data);
), ben kafka bağlanmaya çalışıyordu şu istisna
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
"props.put (" partitioner.class "," com.example.producer.SimplePartitioner ")' bölümünün – user2720864