如何知道Kafka-Storm中的重试次数
我知道SpoutConfig有retryLimit
来设置消息可以重新处理的次数。如何知道Kafka-Storm中的重试次数
关于retryLimit
,这是我在SpoutConfig.class找到的消息:
指数回退重试设置。这些被 ExponentialBackoffMsgRetryManager用于在螺栓 调用OutputCollector.fail()后重试消息。
我想知道是否有任何方法可以知道当我的代码中的任何给定螺栓处理Tuple时重试的确切数量。
例如,如果我设置retryLimit=5
和失败(调用OutputCollector.fail()
)第一次,当它被重新处理,第二次我想知道这个元组已经失败1次。
我将不胜感激您对此的帮助。
谢谢。
这里没有内置的支持。由卡夫卡记录生成的元组仅由https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java取决于卡夫卡记录,而不取决于重放次数。
默认的RecordTranslator会将主题,分区和偏移量作为元组的一部分发出,因此您可能能够使用它们来检查您的螺栓是否曾经看过元组(假设您有某种状态存储) 。为什么这些螺栓需要知道元组失败了多少次?
编辑:
我认为我们没有添加失败计数作为发射元组选项的原因之一是它不可靠。由于元组失败的次数只存在于内存中,所以可能会遇到元组失败,喷口崩溃,以及您从不会看到失败次数超过0的元组的情况。
即使我们有喷口中的持久状态存储仍然会有失败的元组没有标记的情况,例如如果喷口首先崩溃并且以前发射的元组失败。重新启动的喷口无法识别之前发出的元组,因此不会将其标记为失败。
在我看来,你实际需要追踪的是喷口是否不止一次地发射了一个元组,而不是喷口是否认为它以前失败了。
您可能可以使用https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java和onEmit
来跟踪哪些偏移量已被多次发射。由于它是作为喷口的一部分运行的,因此当元组被占用时清理状态应该非常简单。仍有可能遗漏失败的元组,因为onEmit
在喷口发出元组之后运行,所以如果喷口在喷射后立即崩溃,则可能会错过失败。也许想想你是否可以先以某种方式设计这个需求。
斯蒂格,谢谢你的回复。在我的情况下,这些螺栓需要知道这个数字,因为业务人员想要给出那些失败的元组进行特殊处理(插入错误表中以重新处理它们)。 – cricardo84
编辑的答案,因为评论太大 –
感谢您的回答Stig,我会试一试,看看会发生什么。 – cricardo84