2015-06-17 23 views
20

Kafka okuyan yeni bir öğrenciyim ve çok sayıda tüketiciyi anlatarak makaleler, belgeler, vb. Çok fazla yardımcı olmamış bazı temel sorunlarla karşılaştım.Kafka'da birden fazla tüketiciyi nasıl kullanırım?

Yapmaya çalıştığım bir şey kendi üst düzey Kafka yapımcı ve tüketicimi yazmak ve bunları aynı anda çalıştırmak, bir konuya 100 basit mesaj yayınlamak ve tüketicilerimin onları alması. Bunu başarılı bir şekilde başardım, ancak mesajların yayınlandığı aynı konuyu tüketmek için ikinci bir tüketiciyi tanıtmaya çalıştığımda hiç mesaj almıyor.

Her bir konu için, ayrı tüketici gruplarından tüketicileriniz olabilir ve bu tüketici gruplarının her biri, bir konuya üretilen mesajların tam bir kopyasını alacaktı. Bu doğru mu? Değilse, birden fazla tüketici kurmak için uygun yol ne olurdu?

public class AlternateConsumer extends Thread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final Boolean isAsync = false; 

    public AlternateConsumer(String topic, String consumerGroup) { 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers", "localhost:9092"); 
     properties.put("group.id", consumerGroup); 
     properties.put("partition.assignment.strategy", "roundrobin"); 
     properties.put("enable.auto.commit", "true"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     properties.put("session.timeout.ms", "30000"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumer = new KafkaConsumer<Integer, String>(properties); 
     consumer.subscribe(topic); 
     this.topic = topic; 
    } 


    public void run() { 
     while (true) { 
      ConsumerRecords<Integer, String> records = consumer.poll(0); 
      for (ConsumerRecord<Integer, String> record : records) { 
       System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); 
      } 
     } 

    } 
} 

Dahası, ben aslen sadece tek bir bölüm ile bir konu 'test' için yukarıda tüketimini test olduğunu fark ettim: Bu şimdiye kadar yazdım tüketici sınıftır. Mevcut bir tüketici grubuna başka bir tüketici eklediğimde 'testGroup' dediğimizde, tüketimimin gecikme süresini önemli ölçüde azaltan bir Kafka yeniden dengelemesini saniye cinsinden tahmin ettim. Bunun tek bir bölümden beri yeniden dengeleme ile ilgili bir sorun olduğunu düşündüm, ancak 6 bölümlü yeni bir 'çoklu bölüm' oluşturduğumda, aynı tüketici grubuna daha fazla tüketici ekleyerek gecikme sorunlarına neden olan benzer sorunlar ortaya çıktı. Etrafa baktım ve insanlar bana çok iş parçacıklı bir tüketici kullanmam gerektiğini söylüyor - kimseye ışık tutabilir mi?

+0

Kafesin 0.8.1' için üst düzey bir tüketici [burada] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) harika bir örneği var. – chrsblck

+0

@chrsblck link için teşekkürler.Aslında bunu daha önce inceledim ve muhtemelen anlayabildiğim kadarıyla anlamadım - belki de bu örneğin dişlerden nasıl yararlandığını biraz açıklayabilir miydiniz? Şu an ne yaptığını tam olarak anlamadım. –

+0

Bir yol, belirli bir konu için bölümlerin aynı sayıda iş parçacığına sahip olmasıdır. Makaleden - Akımların bir listesini tut "Listenin > akışları = customersMap.get (konu);' ... Sonra her bir thread bir execition execubed.submit (new ConsumerTest (akış, threadNumber)). – chrsblck

cevap

17

Sorununuzun auto.offset.reset özelliği ile ilgili olduğunu düşünüyorum. Yeni bir tüketici bir bölümden okuduğunda ve önceden işlenen ofset olmadığında, auto.offset.reset özelliği başlangıç ​​ofsetinin ne olması gerektiğine karar vermek için kullanılır. "En büyük" (varsayılan) olarak ayarlarsanız, en son (son) mesajda okumaya başlarsınız. "En küçük" olarak ayarlarsanız, ilk kullanılabilir iletiyi alırsınız.

properties.put("auto.offset.reset", "smallest"); 

ve yeniden deneyin:

Yani ekleyin.

+1

Bu geç bir cevap ama teşekkürler Chris! Çözümleriniz doğrudur ve bazı belgelere daha yakından baktıktan sonra, yeni bir tüketici piyasaya sürüldüğünde, yalnızca en yeni gönderilen mesajları tüketecek şekilde ayarlanmış olmalıdır - Yukarıdaki özellikler ayarlanmadıkça önceden mevcut olanları DEĞİL. –

4

here numaralı belgede, "konuyla ilgili bölümler olduğundan daha fazla iş parçacığı sağlıyorsanız, bazı parçacıkların hiçbir zaman bir ileti görmeyeceğini" söylüyor. Konunuza bölüm ekleyebilir misiniz? Tüketici grubu iş parçacığı sayımın, başlığımdaki bölümlerin sayısına eşit olması ve her bir iletinin ileti alması.

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins 
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

Ve Tüketici:

Eğer (yayın gibi) aynı mesajları tüketmek çok tüketiciye istiyorsanız
package com.cie.dispatcher.services; 

import com.cie.dispatcher.model.WinNotification; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.inject.Inject; 
import io.dropwizard.lifecycle.Managed; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* This will create three threads, assign them to a "group" and listen for notifications on a topic. 
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by 
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the 
* lifecycle manager in dropwizard. 
* <p/> 
* Created by aakture on 6/15/15. 
*/ 
public class KafkaTopicListener implements Managed { 
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); 
private final ConsumerConnector consumer; 
private final String topic; 
private ExecutorService executor; 
private int threadCount; 
private WinNotificationWorkflow winNotificationWorkflow; 
private ObjectMapper objectMapper; 

@Inject 
public KafkaTopicListener(String a_zookeeper, 
          String a_groupId, String a_topic, 
          int threadCount, 
          WinNotificationWorkflow winNotificationWorkflow, 
          ObjectMapper objectMapper) { 
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig(a_zookeeper, a_groupId)); 
    this.topic = a_topic; 
    this.threadCount = threadCount; 
    this.winNotificationWorkflow = winNotificationWorkflow; 
    this.objectMapper = objectMapper; 
} 

/** 
* Creates the config for a connection 
* 
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example. 
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. 
* @return the config props 
*/ 
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public void stop() { 
    if (consumer != null) consumer.shutdown(); 
    if (executor != null) executor.shutdown(); 
    try { 
     if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
      LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     } 
    } catch (InterruptedException e) { 
     LOG.info("Interrupted during shutdown, exiting uncleanly"); 
    } 
    LOG.info("{} shutdown successfully", this.getClass().getName()); 
} 
/** 
* Starts the listener 
*/ 
public void start() { 
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put(topic, new Integer(threadCount)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
    executor = Executors.newFixedThreadPool(threadCount); 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ListenerThread(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

private class ListenerThread implements Runnable { 
    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     try { 
      String message = null; 
      LOG.info("started listener thread: {}", m_threadNumber); 
      ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
      while (it.hasNext()) { 
       try { 
        message = new String(it.next().message()); 
        LOG.info("receive message by " + m_threadNumber + " : " + message); 
        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); 
        winNotificationWorkflow.process(winNotification); 
       } catch (Exception ex) { 
        LOG.error("error processing queue for message: " + message, ex); 
       } 
      } 
      LOG.info("Shutting down listener thread: " + m_threadNumber); 
     } catch (Exception ex) { 
      LOG.error("error:", ex); 
     } 
    } 
    } 
} 
+0

Yukarıdaki örnekte kullanılan sınıfların çoğunun kullanımdan kaldırıldığından Kafka 1.0 sürümü için örnek paylaşabilirsiniz. –

+0

O zamanlar dışarıda olduğuna inanmıyorum, kodumu çok yakında güncellemekten alamayacağım özür dilerim. –

4

, farklı tüketici grubuyla bunları iletebilir

İşte benim konu yapılandırma var ve auto.offset.reset'i tüketici yapılandırmasında en küçüğe ayarlıyor. Birden fazla tüketicinin paralel olarak tüketmeyi bitirmesini isterseniz (aralarındaki işi bölünüz), bölüm sayısı oluşturmalısınız> = tüketici sayısı. Bir bölüm sadece en fazla bir tüketici işlemi tarafından tüketilebilir. Ancak Bir tüketici birden fazla bölüm tüketebilir.

İlgili konular