3

Kafka'nın doc bir yaklaşım vermek: Konu BaşınaKafka 0.9.0'da çok parçalı tüketici nasıl kullanılır? tanımlamaktadır aşağıdaki yaklaşık

Bir Tüketiciyi: Basit bir seçenek her biri kendi tüketici> örneğini iplik vermektir.

Kodum:

public class KafkaConsumerRunner implements Runnable { 

    private final AtomicBoolean closed = new AtomicBoolean(false); 
    private final CloudKafkaConsumer consumer; 
    private final String topicName; 

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) { 
     this.consumer = consumer; 
     this.topicName = topicName; 
    } 

    @Override 
    public void run() { 
     try { 
      this.consumer.subscribe(topicName); 
      ConsumerRecords<String, String> records; 
      while (!closed.get()) { 
       synchronized (consumer) { 
        records = consumer.poll(100); 
       } 
       for (ConsumerRecord<String, String> tmp : records) { 
        System.out.println(tmp.value()); 
       } 
      } 
     } catch (WakeupException e) { 
      // Ignore exception if closing 
      System.out.println(e); 
      //if (!closed.get()) throw e; 
     } 
    } 

    // Shutdown hook which can be called from a separate thread 
    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

    public static void main(String[] args) { 
     CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder() 
       .withBootstrapServers("172.31.1.159:9092") 
       .withGroupId("test") 
       .build(); 
     ExecutorService executorService = Executors.newFixedThreadPool(5); 
     executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log")); 
     executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info")); 
     executorService.shutdown(); 
    } 
} 

ama işe ve bir istisna atar değildir:

java.util.ConcurrentModificationException: KafkaConsumer çok kanallı erişim

için güvenli değil

Ayrıca, Flink'in kaynağını okudum (dağıtılmış akış için açık kaynak platformu d parti veri işleme. Çok iş parçacıklı tüketici kullanarak flört benimkiyle benzer.

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT))); 
pollLoop: while (running) { 
    ConsumerRecords<byte[], byte[]> records; 
    //noinspection SynchronizeOnNonFinalField 
    synchronized (flinkKafkaConsumer.consumer) { 
     try { 
      records = flinkKafkaConsumer.consumer.poll(pollTimeout); 
     } catch (WakeupException we) { 
      if (running) { 
       throw we; 
      } 
      // leave loop 
      continue; 
     } 
    } 

flink code of mutli-thread

nesi var ?

cevap

6

Kafka Tüketici ürün güvenli değildir.. Eğer söz konusu belirttiği gibi, belge

Basit seçenek

Ama kodunuzda

, farklı tarafından sarılmış aynı tüketici örneği var kendi tüketici instance'a her konuyu vermek olduğunu belirtti KafkaConsumerRunner örnekleri. Böylece, birden çok ileti aynı tüketici örneğine erişiyor. Kafka dokümantasyonu açıkça belirtilmiştir. Kafka tüketicisi iş parçacığı için güvenli DEĞİLDİR. Tüm ağ G/Ç, aramayı yapan uygulamanın iş parçacığında gerçekleşir. Çok iş parçacıklı erişimin senkronize olduğundan emin olmak kullanıcının sorumluluğundadır. Senkronize olmayan erişim, ConcurrentModificationException ile sonuçlanacaktır. Bu, aldığınız istisna tam olarak budur.

2

Abone olmak için çağrınıza istisna atıyor. Böyle bir senkronize bloğuna blok this.consumer.subscribe(topicName);

taşı:

@Override 
public void run() { 
    try { 
     synchronized (consumer) { 
      this.consumer.subscribe(topicName); 
     } 
     ConsumerRecords<String, String> records; 
     while (!closed.get()) { 
      synchronized (consumer) { 
       records = consumer.poll(100); 
      } 
      for (ConsumerRecord<String, String> tmp : records) { 
       System.out.println(tmp.value()); 
      } 
     } 
    } catch (WakeupException e) { 
     // Ignore exception if closing 
     System.out.println(e); 
     //if (!closed.get()) throw e; 
    } 
} 
+0

Benim için çalışıyor. – Prasath

2

Belki böyle değildir, ancak Serveral konuların verilerin işlenmesini Mergin eğer, o zaman ile birden konulardan verileri okuyabilir aynı tüketici. Değilse, her konuyu tüketen ayrı işler oluşturmak için tercih edilir.

İlgili konular