八、RocketMQ事务消息全流程解析
分类:
文章
•
2025-04-18 17:03:22
- 1.交互流程
-

- 解决了Producer端的本地事务和消息发送的原子性问题。并没有解决Producer端本地事务、消息发送、消息消费三者的一致性问题。消息消费目前仅是靠MQ本身的消息可靠性来保证。
- 2.类设计
-

- 1.事务消息是普通消息的一种特例,基本的流程和普通消息是一样的。所以在实现的过程中进行了分层抽象,从而避免了对RocketMQ原有存储机制的修改。
- 2.Producer侧,使用者只需要实现本地事务执行逻辑和本地事务状态查询逻辑。其他的都不用考虑。
- 3.Server侧:
- 1.对二阶段提交进行了抽象。
- 2.对存储进行了再次封装,操作两个系统TOPIC。主要就是2个类,TransactionalMessageService和TransactionalMessageBridge。待会儿我们看代码也主要就是这两个类。
- 3.实现了对超时事务的回查逻辑,通过不断扫描当前事务进度,反向查询Producer端事务状态。
- 4.RocektMQ也支持通过其他的存储介质来实现自己的Service,通过ServiceProvider来加载进来。细心的同学会发现在transaction包下有jdbc子包。说明RocketMQ有过打算使用数据库来实现事务消息。用数据库来实现的话,代码逻辑会比现在更加清晰简单。只不过RocketMQ依赖于数据库中间件,架构上来讲可能比较重。大企业有专业的DBA,可以考虑这种方式。
- 3.类详细设计
-

- 1.prepare消息发送、落盘
- 2.客户端执行本地事务和发送END_TRANSACTION请求
- 发起END_TRANSACTION请求:
- 如果本地事务状态为COMMIT_MESSAGE
- 1.还原half message真实的topic、queueid;
- 2.将还原后的消息重新落盘,随后就会调用CommitLogDispatcherBuildConsumeQueue.dispatch更新原始topic的consumerQueue,即该消息对消费者可见;
- 3.删除half消息(并非从磁盘上真正删除):构建一个消息放入另一个RMQ_SYS_TRANS_OP_HALF_TOPIC队列,queueId为对应half消息queueId,消息内容为对应half消息的queueOffset,并将tags设置为"d",表示该事务消息已经提交,状态已经确认。
- 如果为ROLLBACK_MESSAGE则回滚
- 还原half message真实的topic、queueid;
- 删除half消息(并非从磁盘上真正删除):构建一个消息放入另一个RMQ_SYS_TRANS_OP_HALF_TOPIC队列,queueId为对应half消息queueId,消息内容为对应half消息的queueOffset,并将tags设置为"d",表示该事务消息已经提交,状态已经确认。
- 如果UNKOWN则什么都不做;
- 3.Broker端回查逻辑
- 4.客户端回查逻辑