消息队列学习笔记4——JMQ Broker接收消息流程
设计了 6 组线程,将一个大的流程拆成了 6 个小流程。并且整个过程完全是异步化的。
流程的入口在图中的左上角,Broker 在收到来自生产者的发消息请求后,会在一个 Handler 中处理这些请求,这和我们在普通的业务系统中,用 Handler 接收 HTTP 请求是一样的,执行 Handler 中业务逻辑使用的是 Netty 的 IO 线程。
收到请求后,我们在 Handler 中不做过多的处理,执行必要的检查后,将请求放到一个内存队列中,也就是图中的 Requests Queue。请求被放入队列后,Handler 的方法就结束了。所以即使是处理海量的请求,也不会过多的占用 IO 线程。
由于要保证消息的有序性,整个流程的大部分过程是不能并发的,只能单线程执行。所以,接下来我们使用一个线程 WriteThread 从请求队列中按照顺序来获取请求,依次进行解析请求等其他的处理逻辑,最后将消息序列化并写入存储。序列化后的消息会被写入到一个内存缓存中,就是图中的 JournalCache,等待后续的处理。
执行到这里,一条一条的消息已经被转换成一个连续的字节流,每一条消息都在这个字节流中有一个全局唯一起止位置,也就是这条消息的 Offset。后续的处理就不用关心字节流中的内容了,只要确保这个字节流能快速正确的被保存和复制就可以了。
这里面还有一个工作需要完成,就是给生产者回响应,但在这一步,消息既没有落盘,也没有完成复制,还不能给客户端返回响应,所以我们把待返回的响应按照顺序放到一个内存的链表 Pending Callbacks 中,并记录每个请求中的消息对应的 Offset。
然后,我们有 2 个线程,FlushThread 和 ReplicationThread,这两个线程是并行执行的,分别负责批量异步进行刷盘和复制,刷盘和复制又分别是 2 个比较复杂的流程,我们暂时不展开讲。刷盘线程不停地将新写入 Journal Cache 的字节流写到磁盘上,完成一批数据的刷盘,它就会更新一个刷盘位置的内存变量,确保这个刷盘位置之前数据都已经安全的写入磁盘中。复制线程的逻辑也是类似的,同样维护了一个复制位置的内存变量。
最后,我们设计了一组专门用于发送响应的线程 ReponseThreads,在刷盘位置或者复制位置更新后,去检查待返回的响应链表 Pending Callbacks,根据 QOS 级别的设置(因为不同 QOS 基本对发送成功的定义不一样,有的设置需要消息写入磁盘才算成功,有的需要复制完成才算成功),将刷盘位置或者复制位置之前所有响应,以及已经超时的响应,利用这组线程 ReponseThreads 异步并行的发送给各个客户端。
这样就完成了消息生产这个流程。整个流程中,除了 JournalCache 的加载和卸载需要对文件加锁以外,没有用到其他的锁。每个小流程都不会等待其他流程的共享资源,也就不用互相等待资源(没有数据需要处理时等待上游流程提供数据的情况除外),并且只要有数据就能第一时间处理。
这个流程中,最核心的部分在于 WriteThread 执行处理的这个步骤,对每条消息进行处理的这些业务逻辑,都只能在 WriteThread 中单线程执行,虽然这里面干了很多的事儿,但是我们确保这些逻辑中,没有缓慢的磁盘和网络 IO,也没有使用任何的锁来等待资源,全部都是内存操作,这样即使单线程可以非常快速地执行所有的业务逻辑。
这个里面有很重要的几点优化:
- 把刷盘和复制这两部分比较慢的操作从这个流程中分离出去异步执行;
- 我们使用了一个写缓存 Journal Cache 将一个写磁盘的操作,转换成了一个写内存的操作,来提升数据写入的性能。
- 这个处理的全流程是近乎无锁的设计,避免了线程因为等待锁导致的阻塞;
- 我们把回复响应这个需要等待资源的操作,也异步放到其他的线程中去执行。
参考资料:李玥——消息队列高手课