2016-10-06 23 views
5

Verilen belgelere göre here, same documentation'da belirtildiği gibi bir dinleyiciye ileti almak için bir POC üzerinde çalışıyorum, aşağıda nasıl yapılandırmayı yazdım.Yay Bütünleşmesi Kafka Tüketici Dinleyicisi iletileri almıyor

@Configuration 
public class KafkaConsumerConfig { 

    public static final String TEST_TOPIC_ID = "record-stream"; 

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") 
    private String topic; 

    @Value("${kafka.address:localhost:9092}") 
    private String brokerAddress; 


    /* 
     @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
     KafkaMessageListenerContainer<String, String> container) { 
     KafkaMessageDrivenChannelAdapter<String, String> 
     kafkaMessageDrivenChannelAdapter = new 
     KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return 
     kafkaMessageDrivenChannelAdapter; } 

     @Bean public QueueChannel received() { return new QueueChannel(); } 
    */ 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(3); 
     factory.getContainerProperties().setPollTimeout(30000); 
     return factory; 

    } 

    /* 
    * @Bean public KafkaMessageListenerContainer<String, String> container() 
    * throws Exception { ContainerProperties properties = new 
    * ContainerProperties(this.topic); // set more properties return new 
    * KafkaMessageListenerContainer<>(consumerFactory(), properties); } 
    */ 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest 
                     // smallest 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

} 

ve Dinleyici olarak altındadır,

@Service 
public class Listener { 

    private Logger log = Logger.getLogger(Listener.class); 


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") 
    public void process(String message/* , Acknowledgment ack */) { 
     Gson gson = new Gson(); 
     Record record = gson.fromJson(message, Record.class); 

     log.info(record.getId() + " " + record.getName()); 
     // ack.acknowledge(); 
    } 

} 

Aynı konusuna mesajlar üretiyorum ve bu tüketici aynı konu üzerinde çalışıyor olsa da, Dinleyici yürütülmüyor.

Kafka 0.10.0.1 çalıştırıyorum ve işte benim mevcut pom. Bu tüketici birçok komut satırı örneğinden farklı olarak bir önyükleme web uygulaması olarak çalışıyor.

<dependencies> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-web</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-kafka</artifactId> 
      <version>2.1.0.RELEASE</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>com.google.code.gson</groupId> 
      <artifactId>gson</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

    </dependencies> 

bunu ben yanlış yapıyorum ne, konu mesajları olduğunda bu dinleyici kazaya değil neden anlamaya zaman iyi bir miktar geçirdim.

İletileri bir kanal kullanarak alabildiğimi biliyorum (kodun kendisiyle ilgili yapılandırma bölümünü yorumladım), Ama burada eşzamanlılık temiz tutuluyor.

Bu tür bir uyumsuzluk mesaj tüketimi ile gerçekleştirilebilir.

cevap

3

@EnableKafka ürününü @Configuration ile birlikte eklemelisiniz.

add Bazı açıklamalar yakında. Bu arada

:

Ben EnableIntegration benim Producer'daki, o ilgileniyor düşünce
@Configuration 
@EnableKafka 
public class KafkaConsumerConfig { 
+0

, ben EnableKafka yok. Bunu denememe ve size geri dönelim –

+0

'@ EnableIntegration 'Bahar Entegrasyonu için, ama burada Bahar Kafka hakkında konuşuyoruz. Tamamen farklı projelerdir. Bakın @ KafkaListener, Bahar Entegrasyon Kafka'nın dışında. Unutma gibi @ EnableJms', '@ EnableRabbit' ve diğer birçok' @Enable… 'vardır. Tek '@ EnableIntegration' hepsi ile ilgilenemez. Bu onun sorumluluğu değil. –

+0

İçgörü için çok teşekkürler, senin bir cazibe gibi çalıştı. Bunu akılda tutacak. –

İlgili konular