RocktMQ源码解析之broker消息存储流程(最最最硬核一)
原创不易,转载请注明出处
前言
本文主要是解析下broker 的消息存储部分,看下broker是怎样存储消息的,比如说我消息生产者发送过来一个消息,然后broker 是怎样存储的,需要注意的是在RocketMQ集群架构中只有master角色的broker是可以接收消息生产者消息的,slave角色的broker只能是master同步给它,但是消息消费者是有机会从slave角色上面拉取消息的,这个是要看slave同步情况与master负载情况来定,然后在拉取消息回来的时候,master会建议你去哪个broker上面拉取,扯远了,由于消息存储流程比较复杂,我将最最最硬核,最最最重要的分成三篇来介绍,第一篇也就是本篇主要是介绍消息存储主流程与解析commitLog写入,其他的篇章介绍 ConsumeQueue写入与创建索引。
1. 消息存储流程
首先我们先来回顾下消息发送流程,当我们创建一个消息,然后调用RocketMQ client api进行send消息,这个时候RocketMQ client会先去本地缓存中查找你发送那个topic的一个详细信息,这个详细信息里面其实就是你这个topic分成了几个MessageQueue,然后每个MessageQueue都有自己的queueId,有自己对应的broker name ,也就是你这个MessageQueue里面的消息要发到哪个broker上面,如果你本地没有这个topic信息的话,它就会向nameserv去请求更新下,如果nameserv也没有你这个topic信息,说明你这个topic没有创建(其实没创建也是可以发送的,就是向nameserv把所有broker name 信息拉下来,然后每个broker name 对应4个MessageQueue,然后发给对应的broker 就可以了,然后broker根据配置,就会帮你自动创建这个topic,这块生产建议关闭自动创建topic,只能通过可视化运维平台创建就可以了),接下来就是按照规则进行选择MessageQueue了,默认的一个规则就是线程内轮询的,就是在每个线程中维护着一个计数器,然后每次%MessageQueue的数量,然后计数器加一,这个样子就能实现一个线程内的轮询效果。最后就是根据你选择的通信方式进行发送,比如说同步发送,异步发送,单向发送,发送的时候先是根据MessageQueue里面对应的broker name 去找对应的broker地址,需要注意的是找的是master的地址,然后根据地址信息去获取对应的channel,如果channel不存在或者是断开了就创建,将消息写入channel中,如果是同步,异步发送就要等着sendResult的到来,同步的话sendResult会回到你发送消息的那个线程上去,然后如果是异步的话就会回调你的callback实现。
到这一个普通消息的发送流程就介绍完成了,如果需要源码级别认知,可以看我关于RocktMQ源码解析之消息生产者系列的文章,这里说下 broker name这个东西, broker name 可以理解为代表的是一组broker,比如说我有master 然后为了容灾在它下面挂了2个slave角色的broker ,这个时候 这三个的broker name 是需要配置成相同的,然后用broker id来区分你是master 还是slave ,broker id 是0的就是master ,0以上的是slave。
接下来就是介绍broker 收到这个消息是怎样处理的了,broker 收到消息后,有一个proccesor会对消息进行封装,然后判断是不是事务消息,事务消息的话就会交给TransactionalMessageService 来处理,其实就是替换一下topic 与queueId,然后找到存储器存起来,如果不是事务消息的话,直接向存储器存储。存储器会校验一堆东西,然后交给commitLog来存储,这里要介绍一下这个commitLog 了,一个broker逻辑上对应着一个commitLog,你可以把它看作一个大文件,然后这个broker收到的所有消息都写到这个里面,但是物理上ROCKET_HOME/commitlog/00000000000000000000
这个路径存储的,它是由若干个文件组成的,每个文件默认大小是1G,然后每个文件都对应这个一个MappedFile,00000000000000000000 就是第一个MappedFile对应的物理文件,每个文件的文件名就是在commitLog里面的一个其实offset,第二个文件名就是00000000001073741824,也就是上一个MappedFile文件起始offset加上每个文件的大小,这个MappedFile就是RocketMQ的黑科技,使用了内存映射技术来提高文件的访问速度与写入速度,然后都是采用追加写的方式提高写入速度,这个是commitLog的一个简单介绍,与此同时后台还有一个线程,专门做reput操作与buildIndex操作,这个reput操作其实就是将消息在commitLog中的offset按照topic,queueId写入不同的ConsumeQueue中,这个ConsumeQueue就是供消息消费者消费使用的,它是与消息生产者中的MessageQueue是一一对应的,这个ConsumeQueue在也是使用内存映射技术来读写文件的,在物理上它会按照 ROCKET_HOME/store/consumeQueue/主题/queueId/00000000000000000000
这个路径来存储,其中00000000000000000000就是那个文件,每个文件存储30w条信息,然后每条信息占20字节,一个文件就是600w字节,第二个文件的文件名就是00000000000006000000,这个文件名就是这个文件在这个ConsumeQueue中的起始offset,同一个topic下的queueId是由多个文件组成。buildIndex这个就是创建索引的意思,它的索引构建原理就是根据topic加uniqKey与topic加key生成key,然后根据key的hash,计算出来所在的hash槽,从而计算出那个hash槽在那个索引文件的offset,从hash槽中的获取index的offset,在根据index的offset获取到具体的索引内容,索引内容里面有这个消息在commitLog中的一个offset,还会有下一个index的offset,通过这个offset又能找到下一个索引内容,到这里,其实它这个索引构建可以看作是一个jdk1.7的HashMap,它这个hash槽可以看作是HashMap里面那个数组,真正索引内容就是数据每个元素,如果有hash碰撞就会生成一个链表,在一个索引文件中默认有500w个hash槽,每个hash槽是4个字节, 2000w个index,每个index是20个字节,算算一个indexFile有400m多点。
最后我这里画了一张RocketMQ存储器逻辑存储与物理存储的对应关系图(原图地址:链接),关于存储器更细节的东西我们会通过扣源码的过程介绍出来。
2. commitLog写入消息源码解析
接下来我们将以贴源码的形式介绍消息是怎样写入commitLog的。
broker在启动的时候,会将某个code对应的processor注册到server上,啥意思呢?就是不同类型的消息交给不同的处理器去处理,比如说SEND_MESSAGE
这个code的消息就会交给SendMessageProcessor
处理,会调用对应processor的 processRequest 方法来处理,我们消息生产者发送消息的时候,就是使用的SEND_MESSAGE
这个code ,接下来我们看下SendMessageProcessor 的processRequest 方法,
咱们这里直接走的default,不要问我为什么,因为我们的code是SEND_MESSAGE
,首先是解析请求头,然后看看是不是批量消息,批量消息的话交给sendBatchMessage 方法处理,不是批量消息的话给sendMessage 方法处理,sendMessage 方法开始都是一些封装响应和校验封装请求消息的,我们直接看下方法后面部分
这里普通消息的话就是找到存储器,然后调用存储器的putMessage方法进行存储,这里这个存储器是默认存储器DefaultMessageStore
我们看下它的putMessage方法的前半部分
前半部分都是一些校验性的东西,判断broker的角色,判断服务状态,判断topic长度,判断propreties长度,判断pageCache是否繁忙,这个判断os pageCache是否繁忙我们需要看下
是这样子,在commitlog写入消息前会获取一个锁来保证顺序写入,获取到锁就会更新下这个获取锁的时间,最后写入完成会释放锁,将这个时间设置成0 ,如果按照正常情况的话,如果你现在这个时间戳减去0的话绝对会大于这个10000000的,也就是不满足,然后当前时间减去那个上次获取锁的时间,如果现在与获取锁时间差大于1s的话就说明某个追加写入已经持有锁超过1s了,所以它会认为os page cache繁忙
这里就非常重要了,调用了commitlog的putMessage方法,我们来看下,由于太长我们一部分一部分的看。
这一部分其实就是从msg中获取一些信息,判断处理一下这个延时消息
这一部分就比较重要了,首先是从mappedFileQueue中获取最后一个MappedFile,这个就是拿集合最后一个元素,因为都是有序的,最后一个元素就是最后一个MappedFile,接着就是获取锁了,这个锁也是比较有讲究的,可以设置使用ReentrantLock 也可以设置使用cas,默认是使用cas,接着就是设置beginTimeInLock这个变量了,这个变量我们在判断os page cache繁忙的时候说过,就是获取到锁的一个时间戳,在释放锁之前会重置成0,接着就是判断mappedFile是不是null或者是不是满了,如果是的话就要新建一个了。
接着就是最最最重要的了 往mappedFile中追加消息,
这里首先获取了一下这个mappedFile写到哪个位置了,它这个位置是从0开始的,然后判断一下当前位置与文件大小做对比,要是大于的就超了文件大小了,接着是获取writerbuffer因为这里是没有开启transientStorePool的,所以它是个空的,就会使用mmapedByteBuffer,接着就是调用回调的doAppend追加消息了,我们看下它的参数, 第一个是开始offset,这个offset是commitlog的一个offset,举个例子,第一个MappedFile的开始offset是0,然后一个MappedFile 的大小是1g,然后第二个MappedFile就得从1073741824(1g)开始了,第二个参数是bytebuffer,这个不用多说,第三个是这个MappedFile还空多少字节没用,第四个就是消息了。
我们来看下这个doAppend方法,这个也有点长,我们需要分开看下:
这一部分主要就是 计算了一下这个消息写在commitlog中的一个offset,接着就是生成一个msgId,然后根据topic 与queueId从缓存中获取了一下这个queueId对应的一个queue的offset,这个其实就是添加一个消息加1,然后就是事务的东西了,如果有事务,然后还在准备阶段或者回滚阶段,就将queue offset 设置成0,再往下其实就是处理消息,然后写到buffer中了。
这里首先是获取了一下消息里面的properties,将它转成字节数组,计算了一下长度,接着就是将topic转成字节数据,计算了一下长度,获取了一下body的长度,就是你往Message塞得内容长度,重点来,计算 消息的总长度,然后判断一下长度是否超长。
接着就是判断剩下的空间能不能放开,如果放不开的话,就塞上一个结束的东西,8个字节是正经的,剩下的随意,然后返回文件满了的状态。
这个就是封装消息了,最后将消息放到byteBuffer中
最后就是封装追加消息的结果是put_ok,然后更新queue offset ,其实就是+1。接下来我们回过头来看下appendMessagesInner的后半部分,
这里其实就是更新了一下 这个MappedFile 写到哪个地方了,更新了下写入时间。在回到commitLog的putMessage方法
这里追加完成了,就需要判断追加状态了,如果是那种MappedFile放不开消息的情况,它会重新获取一个MappedFile,然后重新追加,在释放锁之前,它还会将beginTimeInLock这个字段重置为0;
判断了一下耗时,如果是大于500的话,打印警告,封装put消息的结果,统计store,可以看到后面调用了2个方法,一个是刷盘的,一个是同步消息的,我们这里要看下这个刷盘动作
如果broker配置的SYNC_FLUSH 并且是个同步消息,这个时候就会创建一个刷盘请求,然后提交刷盘请求,这个时候会等着刷盘完成,默认就是5s。
接着就是到存储器的putMessage方法的后半部分了
commitlog存入消息之后,咱们这块也就算是完成了,最后就是回到那个processor,然后将put结果写入对应的channel给返回去,告诉消息生产者消息写入结果 。消息存储其实就是找对应的MappedFile,按照一定的格式往文件里面写入,需要注意的是内存映射文件。
这里附一张消息存储字段存储顺序与字段长度的图:
总结
本文我们主要是对回顾了一下消息生产者发送消息的流程,介绍了一下broker 收到消息进行存储的核心流程,同时解析了commitlog写入消息的源码,这里我们需要明白的是 commitlog是一个逻辑上的大文件,然后它有一个offset,通过每个MappedFile文件大小就能找这个offset在哪个MappedFile上面,我们在消息追加写的时候,有个一个PHYSICALOFFSET 字段,其实就是存储的commitlog的一个offset,包括我们不管是构建索引还是ConsumeQueue,存储的都是这个commitlog的offset,然后commitlog是由若干个MappedFile组成的,可以对照着上面的RocketMQ存储器逻辑存储与物理存储的对应关系图来看下