2016-04-03 19 views
0

Im ofset ama olsun onaylama işlemi hatası ayarlayarak bir konuya veri tüketmeye çalışıyor -AssertionError: Atanmayan'ı bölüm

from kafka import KafkaConsumer 

consumer = KafkaConsumer('foobar1', 
         bootstrap_servers=['localhost:9092']) 
print 'process started' 
print consumer.partitions_for_topic('foobar1') 
print 'done' 
consumer.seek(0,10) 

for message in consumer: 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
print 'process ended' 

Hata: -

Traceback (most recent call last): 
    File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module> 
    consumer.seek(0,10) 
    File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek 
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' 
AssertionError: Unassigned partition 

cevap

1

Sen (consumer.assign aramak zorunda) aramak çağırmadan önce TopicPartitions listesiyle. Ayrıca aramak için ilk argüman aynı zamanda bir TopicPartition olduğuna dikkat. Kafka 0.9 ve kafka-python ile Benim durumumda KafkaConsumer API

0

Bkz bölüm atama for message in consumer sırasında meydana edilir. Yani, opration yineleme sonra gerektiği ararlar.

import kafka 

ps = [] 
for i in xrange(topic_partition_number): 
    ps.append(kafka.TopicPartition(topic, i)) 

consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group) 
for msg in consumer: 
    print msg 
    consumer.seek_to_beginning(*ps) 
    consumer.commit() 
    break 
: Benim grubu aşağıdaki kodu ile dengeleniyor sıfırlama