五、RocketMQ消息消费重试

五、RocketMQ消息消费重试

  • 1.consumer prepare:消费者启动,订阅正常的topic(包含tag)以及一个%RETRY%consumergroup的topic,并将订阅关系(SubscriptionData)以心跳方式上传给broker。consumer会与所有的broker建立连接发送心跳。
  • 2.consumer经过RebalanceService.doRebalance后,开始pull message。当拉取消息时,不需上传订阅关系,只需传queueId、topic、consumeQueueOffset就可获取消息。
  • 3.consumer sendback:若pullResult.pullStatus是返回RECONSUME_LATER,会将1号(初始消息)消息复制为2消息写回到(consume sendMessageBack)broker中。
  • 4.broker backup:备份原主题至PROPERTY_RETRY_TOPIC;设置DelayLV;
  • 5.broker change:设置topic变为SCHEDULE_TOPIC_XXXX;设置 queueId=delayLV-1;
  • 6.broker perisitence:将消息持久化到commitlog后,会被reputmessageService监测到,commitlog的最大值有变动,会dispatch到consumeQueue中。
  • 7.broker schedule delay message:ScheduleMessageService线程,启动时候给所有的延迟级别(delayLV)创建一个调度任务,每个延迟级别其实对应SCHEDULE_ TOPIC_XXXX主题下的一个消息消费队列。详见定时消息。
  • 8.broker Re-delivery从consumeQueue中取出2号消息,从commitlog中获取消息,清除delay信息,把topic更换为real_topic,再次组成3号消息投递到commitlog中。然后再被consumer的pull message所接收到,进行新一轮的消费。
  • 9.重试队列的消息,消费时,打印的主题仍然时首次订阅主题,而不是重试主题,即当重试消息消费逻辑中,获取topic时,会对topci的名称由retry还原为真正的topic,还原逻辑在?
    • ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); 当消息为重试消息,还原Topic为原始topic, "%RETRYconsumeGroup%" >> originalTopic。