RocketMQ源码解析之消息生产者(顺序消息)


前言

本篇主要是介绍下RocketMQ顺序消息的实现,包括全局有序与分区有序的介绍,我们想要使用它的顺序消息功能,那么我们消息生产者与消息消费者应该怎样子编程,以及从发送到消费整个运作流程。

1. 关于全局有序与分区有序

这里我摘官方文档上面的一段话:

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

什么意思呢?就是我们一个topic 可以分成多个MessageQueue,轮询发送与多queue消费的情况不能保证消息的顺序性,如果我们topic就一个MessageQueue,然后就可以怎样顺序发送的怎样顺序消费,这个就是全局有序,一个topic 用一个MessageQueue,因为就一个MessageQueue,系统只能将消息发送到这个MessageQueue上面,分区有序的话,就是一个topic 有多个MessageQueue,你发送的时候,按照你自己的规则(注意这里就不是系统轮询分配MessageQueue了),将需要保持顺序的一类消息分到同一个MessageQueue上面去,这样消费的时候就能保证这个MessageQueue里的消息是有序的,这个其实就跟kafka的partition分区保证有序性差不多

2. 我们应该怎样发送和消费

如果你是采用全局有序的话,需要创建topic的时候指定一个write跟一个read就可以了
如果你是采用分区有序的话,在发送的消息的时候需要自己实现一个selector用来自己指定这个消息发送到哪个MessageQueue中。
消费的时候使用顺序消费就可以了。
先来看下消息生产者(这里我把官方example拿过来):
RocketMQ源码解析之消息生产者(顺序消息)
其实这里跟发送普通消息差不多,唯一需要关注的就是我们要实现一个MessageQueueSelector ,重写这个选择MessageQueue的方法,我们自己选择MessageQueue的话,就可以控制将某一类消息发送到这个MessageQueue中了,比如说 按照用户id的来选择MessageQueue,这样就能保证某个用户的消息都能在一个MessageQueue中。还有是send最后一个参数,到时候是会被传到这个select方法中的。

消息消费者:
RocketMQ源码解析之消息生产者(顺序消息)
消费者保证顺序消费的话其实就是 在注册消息监听器(其实就是写消费逻辑的)的时候,实现一个orderly的,这样就能保证顺序消费,与之对应的就是MessageListenerConcurrently 这个监听器了,这个是并发消费的。

3. 整个运作流程

消息生产者:上面我们说了,其实就是将需要保证顺序的按照某个属性来选择MessageQueue,相同属性的分到同一个MessageQueue 上面,然后它又采取同步发送的方式发送,这个时候也能保证第一个消息发送成功后返回响应后,又发送第二个。

broker : 因为消息生产者采用的同步发送的方式,然后broker 会先接收到消息1,然后将消息1追加写到commitLog中,然后记录下这个消息在queue里面的offset,注意这个offset是有顺序的,然后刷盘操作,最后返回消息1的响应,然后消息生产者才能发送消息2.

消息消费者:消费者是这个样子的,它会将一个MessageQueue对应生成一个processQueue,然后它会向broker 拿一批消息过来,比如说,告诉broker 我要从offset是1的消息开始拿,拿32个,这个时候broker就会给他从offset 是1的开始给它取,取过来的时候也是顺序排列的,然后消息消费者将 这批消息就放到 processQueue中的一个数据结构中,这个数据结构就是TreeMap,都知道这个TreeMap是有序的,然后调用你实现的MessageListenerOrderly的类对象进行单线程消费。

4. 生产者源码解析

本篇其实我们主要是介绍生产者的,但是顺序消息不光是生产者的事,还有broker,消费者也需要保证各个流程消息的顺序,所以我们会插入了点broker 与消息消费者的介绍。
这里我们主要解析下消息生产者是怎样调用我们写的这个selector的与看看消息发送方式,同时也要看看send最后一个参数去了哪。
RocketMQ源码解析之消息生产者(顺序消息)
这里调用了defaultMQProducerImpl的send方法
RocketMQ源码解析之消息生产者(顺序消息)
添加一个调用超时时间,默认是3s。
RocketMQ源码解析之消息生产者(顺序消息)
其实从这个方法中就可以看到是使用的同步发送的方法。
RocketMQ源码解析之消息生产者(顺序消息)
在sendSelectImpl 这个方法中我们看到了它先是根据topic获取topic info信息,然后调用我们实现的selector里面select方法来选择一个MessageQueue,注意这个arg参数,被传到了我们的select方法中。后面就是发送请求了,这些我们之前文章都有解析过,这里就不展开了,需要注意的是,它没用重试功能。

总结

好了,到这我们生产者端顺序消费就介绍完了,我们先是介绍了一下全局有序与分区有序,接着又介绍了分区有序我们的消息生产者该怎样编码与消息消费者该怎样编码,接着又介绍了 从 消息生产者到broker到消息消费者消息被消费,RocketMQ是怎样保证我们消息顺序的,最后就是看了一下消息生产者关于顺序消息的源码实现,在消息生产者其实就是一句话,我们根据业务指定消息被分到同一个MessageQueue上。