Kafka okuyan yeni bir öğrenciyim ve çok sayıda tüketiciyi anlatarak makaleler, belgeler, vb. Çok fazla yardımcı olmamış bazı temel sorunlarla karşılaştım.Kafka'da birden fazla tüketiciyi nasıl kullanırım?
Yapmaya çalıştığım bir şey kendi üst düzey Kafka yapımcı ve tüketicimi yazmak ve bunları aynı anda çalıştırmak, bir konuya 100 basit mesaj yayınlamak ve tüketicilerimin onları alması. Bunu başarılı bir şekilde başardım, ancak mesajların yayınlandığı aynı konuyu tüketmek için ikinci bir tüketiciyi tanıtmaya çalıştığımda hiç mesaj almıyor.
Her bir konu için, ayrı tüketici gruplarından tüketicileriniz olabilir ve bu tüketici gruplarının her biri, bir konuya üretilen mesajların tam bir kopyasını alacaktı. Bu doğru mu? Değilse, birden fazla tüketici kurmak için uygun yol ne olurdu?
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Dahası, ben aslen sadece tek bir bölüm ile bir konu 'test' için yukarıda tüketimini test olduğunu fark ettim: Bu şimdiye kadar yazdım tüketici sınıftır. Mevcut bir tüketici grubuna başka bir tüketici eklediğimde 'testGroup' dediğimizde, tüketimimin gecikme süresini önemli ölçüde azaltan bir Kafka yeniden dengelemesini saniye cinsinden tahmin ettim. Bunun tek bir bölümden beri yeniden dengeleme ile ilgili bir sorun olduğunu düşündüm, ancak 6 bölümlü yeni bir 'çoklu bölüm' oluşturduğumda, aynı tüketici grubuna daha fazla tüketici ekleyerek gecikme sorunlarına neden olan benzer sorunlar ortaya çıktı. Etrafa baktım ve insanlar bana çok iş parçacıklı bir tüketici kullanmam gerektiğini söylüyor - kimseye ışık tutabilir mi?
Kafesin 0.8.1' için üst düzey bir tüketici [burada] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) harika bir örneği var. – chrsblck
@chrsblck link için teşekkürler.Aslında bunu daha önce inceledim ve muhtemelen anlayabildiğim kadarıyla anlamadım - belki de bu örneğin dişlerden nasıl yararlandığını biraz açıklayabilir miydiniz? Şu an ne yaptığını tam olarak anlamadım. –
Bir yol, belirli bir konu için bölümlerin aynı sayıda iş parçacığına sahip olmasıdır. Makaleden - Akımların bir listesini tut "Listenin> akışları = customersMap.get (konu);' ... Sonra her bir thread bir execition execubed.submit (new ConsumerTest (akış, threadNumber)). –
chrsblck