卡夫卡消费者调查消息与python

问题描述:

我在消费者组中的卡夫卡轮询消息有问题。 我的消费对象分配给一个给定的分区与卡夫卡消费者调查消息与python

self.ps = TopicPartition(topic, partition) 

之后,消费者分配到该分区:

self.consumer.assign([self.ps]) 

之后,我能够与

算分区内部消息
self.consumer.seek_to_beginning(self.ps) 
pos = self.consumer.position(self.ps) 

and self.consumer.seek_to_end(self.ps) .....

在我的tpoic中有超过30000条消息。 问题是我只收到一条消息。

使用者配置有: max_poll_records= 200 AUTO_OFFSET_RESET是最早

这里是我的功能与此我想获得消息:

def poll_messages(self): 


    data = [] 

    messages = self.consumer.poll(timeout_ms=6000) 


    for partition, msgs in six.iteritems(messages): 

     for msg in msgs: 

      data.append(msg) 

    return data 

即使我去了第一个可用的前偏移开始轮询邮件 我只收到一封邮件。

self.consumer.seek(self.ps, self.get_first_offset()) 

我希望有人能解释我做错了什么。 在此先感谢。

祝 乔恩

我相信你是误解max_poll_records - 这并不意味着你会得到200个调查,只是限制在最你可能会得到。您需要多次调用轮询。我是指你的文档进行简单的例子:http://kafka-python.readthedocs.io/en/master/usage.html

我相信一个更标准的实现是:

for message in self.consumer: 
    # do stuff like: 
    print(msg) 
+0

不幸的是尼克,我相信你的例子是阻塞调用 – cs94njw

+0

同意。你为什么提到这个?它不影响轮询机制吗?我没有审查源代码。 – Nick

+0

会发生什么情况是,如果队列中没有消息(无法读取),则for循环不会移动。 这不是一个问题,但它会降低灵活性。 上面的“soa”代码使用轮询,它将在队列中等待几秒钟,然后执行其他操作。 我认为“soa”正在寻找投票解决方案。 – cs94njw