2016-05-01 24 views

cevap

3

Bu basit bir şey değil, çünkü yalnızca bireysel aracılar belirli bir konu/bölüm için en yeni ofset bilgisinin ne olduğunu biliyorlar.

OffsetRequest'u yapabilirsiniz. Aşağıdakiler, bir konu/bölüm için en erken ve en son uzaklıkları döndürecektir (Scala'dır, ancak Scala'yı kullanmıyorsanız bu fikri elde edebilmelisiniz).

İstenen bölümün lideri olan aracıya bağlı bir SimpleConsumer kullanmak zorunda olduğunuzu unutmayın. Genelde yaptığım şey, her bir brokerim için bir SimpleConsumer yaratıyorum. Sonra bir meta veri isteği yapmak ve lider eşleştirme bölümü olsun, bunu daha sonra foreach bölüm:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { 
    val time = kafka.api.OffsetRequest.LatestTime 
    val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))) 
    val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq") 
    val resp = consumer.getOffsetsBefore(req) 
    val offsets = resp.offsets(topic, partition) 
    if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0)) 
    else (0, -1) 
} 

Umut bu yardımcı olur.

+0

Büyük iş @David, bunun tam bir örneğiniz var mı? Ve .. aynı adı taşıyan Scala nesnesi varsa neden 'kafka.javaapi.OffsetRequest' kullanıyorsunuz? – salvob

İlgili konular