2014-11-20 22 views
7

Kafka konularına aboneliklerin eklenmesini ve dinamik olarak kaldırılmasını sağlayan bir uygulama yapıyorum. Bir konu aboneliği eklendiğinde, her yeni iletiyi alan ve onları başka bir veri deposuna gönderen bir toplu iş çalıştırmak istedim.Kafka - En Son Ofset Almanın En Kolay Yolu

Anlamak istediğim konuya ilişkin mevcut ofsetin nasıl alınacağıdır. Bir abonelik eklendiğinde, bir sonraki toplu işin, aboneliğin yaklaşık zamanından bu yana tüm mesajları almasını istiyorum. Örnek olarak, sürekli olarak mesaj alan "TopicA" adlı bir konu olduğunu düşünün. 7.15pm'de bir abonelik eklediğimde, toplu iş 8'de çalıştığı zaman, 7.15pm'den beri tüm iletilerin toplulaştırılmasını istiyorum. Yaklaşık zaman için mutluyum - 7.10, 7.20 vs. 5 ya da 10 dakika her iki taraf da endişelenmeme sebep oluyor.

Bu nedenle, benim amacım çözümümün bir abonelik eklendiği andaki güncel ofsetini elde etmektir. Basit tüketiciye baktım, ancak bu temel kullanım durumu için tüm küme yönetimi yönlerine dahil olmak istemiyorum.

Ayrıca üst düzey tüketicilere de baktım. Böyle elimden bir şey:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset 

Ne bu yaklaşımla beni ilgilendiren çağrı için "kafa" aslında bir akımdır. Yani bir sonraki mesajı beklemeyi engelleyeceğine inanıyorum. Engelleme sorunludur çünkü bir sonraki mesaj gelene kadar diğer aboneliklerin sıraya alınmasına neden olabilir.

İkinci yaklaşımı uygulamak için biraz zaman ayırmaktan mutluluk duyarım, ancak hata eğilimli eşzamanlı kod yazmamı gerektirmeyen daha basit bir yol varsa, zamanımı boşa harcamam.

Ayrıca, bu ofsetten bu yana tüm günlükleri almanın bir yoluna da ihtiyacım var.

cevap

2

Bir getirme isteğine yapılan her yanıt, şu anda tüketilen bölümün günlüğündeki en son kaymayı temsil eden bir "HighWaterMark" döndürür. Yani teoride, en erken mesajı veya gerçekten herhangi bir iletiyi (varsayarak) belirli bir konu için getirebilir ve HighWaterMark'ı yanıttan çekebilirsiniz. fazla detay için HighWaterMark üzerinde var: yanıtından HighWaterMarkOffset alabilmenin sağladığı Tabii https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

, bağlıdır istemci üzerinde yapım o kendi Kafka API yoluyla mevcut veriler.

+0

Bu, belirli bir bölüm için yüksek su işareti olabilir. Sanırım "son mesajın" {partitionId, offsetId} bilgisini soruyor. – arviman

+1

Sanırım küresel bir "en son mesaj" diye bir şey yok. Kafka, küresel bir senkronizasyon mekanizması olsaydı ölmezdi ... –

İlgili konular