如何使用Java从卡夫卡获取最近5天的消息

问题描述:

我已经为卡夫卡中的主题设置了TTL为7天,我从Kafka获取数据并将其存储在数据库中,但是从最近5天我的数据库服务器已关闭,现在我必须从Kafka获取最近5天的消息并将它们存储在数据库中 注意:从过去5天起,Kafka没有问题。如何使用Java从卡夫卡获取最近5天的消息

+0

您需要借助偏移值进行消耗。举个例子,如果你上一次读取的偏移量为100,那么你需要从偏移量101中消耗它。 –

+0

如何在Java中使用这个偏移量概念,以及如何知道存储消息的最后偏移值,因为我没有存储任何偏移值 – Sat

首先调用consumer.partitionsFor()方法来获得分区你的主题

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)

然后调用consumer.offsetsForTimes()来获得每个分区的时间戳的偏移量3天前,当最后一条消息已成功处理。

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)

然后调用consumer.seek(),以目前消费者在这一点偏移位置的时间,并继续呼吁调查()和处理消息,你通常会。

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

到上一个不错的答案,我想补充一点,通话partitionsFor方法来获得分区你的主题,然后做的@Hans说。

+1

谢谢。我更新了我的答案,包括适当的第一步。 –