卡夫卡消费者调查消息与python
问题描述:
我在消费者组中的卡夫卡轮询消息有问题。 我的消费对象分配给一个给定的分区与卡夫卡消费者调查消息与python
self.ps = TopicPartition(topic, partition)
之后,消费者分配到该分区:
self.consumer.assign([self.ps])
之后,我能够与
算分区内部消息self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)
and self.consumer.seek_to_end(self.ps)
.....
在我的tpoic中有超过30000条消息。 问题是我只收到一条消息。
使用者配置有: max_poll_records= 200
AUTO_OFFSET_RESET
是最早
这里是我的功能与此我想获得消息:
def poll_messages(self):
data = []
messages = self.consumer.poll(timeout_ms=6000)
for partition, msgs in six.iteritems(messages):
for msg in msgs:
data.append(msg)
return data
即使我去了第一个可用的前偏移开始轮询邮件 我只收到一封邮件。
self.consumer.seek(self.ps, self.get_first_offset())
我希望有人能解释我做错了什么。 在此先感谢。
祝 乔恩
答
我相信你是误解max_poll_records - 这并不意味着你会得到200个调查,只是限制在最你可能会得到。您需要多次调用轮询。我是指你的文档进行简单的例子:http://kafka-python.readthedocs.io/en/master/usage.html
我相信一个更标准的实现是:
for message in self.consumer:
# do stuff like:
print(msg)
不幸的是尼克,我相信你的例子是阻塞调用 – cs94njw
同意。你为什么提到这个?它不影响轮询机制吗?我没有审查源代码。 – Nick
会发生什么情况是,如果队列中没有消息(无法读取),则for循环不会移动。 这不是一个问题,但它会降低灵活性。 上面的“soa”代码使用轮询,它将在队列中等待几秒钟,然后执行其他操作。 我认为“soa”正在寻找投票解决方案。 – cs94njw