2015-03-13 14 views
8

Kafka-net eklentisi ile Red Hat VM üzerinde kafka 0.8.1.1 kullanıyorum. Tüketicimi kafka'dan daha önceki iletileri almayı durdurmak için nasıl yapılandırabilirim?Son mesajları göndermeyi durdurmak için kafka-net'i yapılandır

Benim tüketici kodu:

var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092")); 

Stopwatch sp = new Stopwatch(); 
var router = new BrokerRouter(options); 
var consumer = new Consumer(new ConsumerOptions("Test", router)); 

ThreadStart start2 =() => 
{ 
    while (true) 
    { 
     sp.Start(); 
     foreach (var message in consumer.Consume()) 
     { 
      if (MessageDecoderReceiver.MessageBase(message.Value) != null) 
      { 
       PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString()); 
      } 
      else 
      { 
       Console.WriteLine(message.Value); 
      } 
     } 
     sp.Stop(); 
    } 
}; 
var thread2 = new Thread(start2); 
thread2.Start(); 
+0

Kullanmakta olduğunuz dil için bir etiket ekledim ve daha iyi okunabilmesi için gövde ve başlığı değiştirdim. Ayrıca [başlıktan bir etiket çıkardım] (http://meta.stackexchange.com/questions/19190/should-questions-include-tags-in-their-titles). –

cevap

11

Kafka-net Tüketici anda otomatik uzaklıklar tüketildiğini takip etmez. Ofset izlemeyi manuel olarak uygulamanız gerekecektir. Yukarıdaki kod tüketiciyi ayarlayacaktır

var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result 
        .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray(); 

var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets); 

Not:

var commit = new OffsetCommitRequest 
      { 
       ConsumerGroup = consumerGroup, 
       OffsetCommits = new List<OffsetCommit> 
          { 
           new OffsetCommit 
            { 
             PartitionId = partitionId, 
             Topic = IntegrationConfig.IntegrationTopic, 
             Offset = offset, 
             Metadata = metadata 
            } 
          } 
      }; 

var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault(); 

belirli ofset noktada ithalatına başlamak tüketiciyi ayarlamak için:

kafka sürümü 0.8.1 de ofset Mağaza Günlüğün en sonunda tüketmeye başlamak, sadece yeni mesajları almak.

+0

Harika, ihtiyacım olan şey bu, çok teşekkür ederim! –

+0

İşlediğiniz ofsetin nasıl tüketileceğine biraz ışık tutabilir misiniz? İstemcinin GetTopicOffsetAsync işleminin gerçekleştirilmesi, söz konusu offsetleri iade etmez. Sadece 2 değer döndürür: 0 (muhtemelen ilk) ve son ofset. Ayrıca, diğer okuyucular için, sen KafkaOptions' KafkaConnectionFactory yöntemini kullanarak bağlantı alabilirsiniz: '_options.KafkaConnectionFactory.Create (uri, _options.ResponseTimeoutMs, _options.Log)' – arviman

+0

Nvm ben kaynak üzerinden çıkardı ve OffsetFetchRequest bulundu \ Response sınıfı. Bu nedenle, GetTopicOffset'inizi kullanarak tüm bölümleri almamız ve sonra OffsetFetchRequest'i kullanarak bölümlerin her biri için saklı ofseti getirmemiz gerektiği görülüyor, ki bu yanıtı daha sonra ConsumerOptions'da tüketiciye aktarılıyor. – arviman

İlgili konular