Netty 源码分析 05 EventLoop
在看 EventLoop 的具体实现之前,我们先来对 Reactor 模型做个简单的了解
我们来看看 Reactor 模型的核心思想:
初步一看,Java NIO 符合 Reactor 模型啊?因为 Reactor 有 3 种模型实现:
- 单 Reactor 单线程模型
- 单 Reactor 多线程模型
- 多 Reactor 多线程模型
第三种模型比起第二种模型,是将 Reactor 分成两部分:
- mainReactor 负责监听 ServerSocketChannel ,用来处理客户端新连接的建立,并将建立的客户端的 SocketChannel 指定注册给 subReactor 。
- subReactor 维护自己的 Selector ,基于 mainReactor 建立的客户端的 SocketChannel 多路分离 IO 读写事件,读写网络数据。对于业务处理的功能,另外扔给 worker 线程池来完成。
- mainReactor 主要用来处理网络 IO 连接建立操作,通常,mainReactor 只需要一个,因为它一个线程就可以处理。
- subReactor 主要和建立起来的客户端的 SocketChannel 做数据交互和事件业务处理操作。通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理。
此种模式中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大大的提升,支持的可并发客户端数量可达到上百万级别
- 对于 Netty NIO 客户端来说,仅创建一个 EventLoopGroup 。
- 一个 EventLoop 可以对应一个 Reactor 。因为 EventLoopGroup 是 EventLoop 的分组,所以对等理解,EventLoopGroup 是一种 Reactor 的分组。
那么 Netty NIO 客户端是否能够使用【多 Reactor 多线程模型】呢????? 创建多个 Netty NIO 客户端,连接同一个服务端。那么多个 Netty 客户端就可以认为符合多 Reactor 多线程模型了
对于 Netty NIO 服务端来说,创建两个 EventLoopGroup 。
-
bossGroup
对应 Reactor 模式的 mainReactor ,用于服务端接受客户端的连接。比较特殊的是,传入了方法参数nThreads = 1
,表示只使用一个 EventLoop ,即只使用一个 Reactor 。这个也符合我们上面提到的,“通常,mainReactor 只需要一个,因为它一个线程就可以处理”。 -
workerGroup
对应 Reactor 模式的 subReactor ,用于进行 SocketChannel 的数据读写。对于 EventLoopGroup ,如果未传递方法参数nThreads
,表示使用 CPU 个数 Reactor 。这个也符合我们上面提到的,“通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理”。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(二)之 EventLoopGroup
6. EventLoopGroup
io.netty.channel.EventExecutorGroup
,继承 EventExecutorGroup 接口,EventLoop 的分组接口
7. MultithreadEventLoopGroup
io.netty.channel.MultithreadEventLoopGroup
,实现 EventLoopGroup 接口,继承 MultithreadEventExecutorGroup 抽象类,基于多线程的 EventLoop 的分组抽象类。
7.5 register
#register()
方法,注册 Channel 到 EventLoopGroup 中。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册
8. NioEventLoopGroup
io.netty.channel.nio.NioEventLoopGroup
,继承 MultithreadEventLoopGroup 抽象类,NioEventLoop 的分组实现类。
调用rebuildSelector,重建selector对象
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(三)之 EventLoop 初始化
7. AbstractScheduledEventExecutor
io.netty.util.concurrent.AbstractScheduledEventExecutor
,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。
8. SingleThreadEventExecutor
io.netty.util.concurrent.SingleThreadEventExecutor
,实现 OrderedEventExecutor 接口,继承 AbstractScheduledEventExecutor 抽象类,基于单线程的 EventExecutor 抽象类,即一个 EventExecutor 对应一个线程。
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(四)之 EventLoop 运行
2. NioEventLoop
io.netty.channel.nio.NioEventLoop
,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。
ioRatio
属性,在 NioEventLoop 中,会三种类型的任务:1) Channel 的就绪的 IO 事件;2) 普通任务;3) 定时任务。而 ioRatio
属性,处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例。
2.9 run
#run()
方法,NioEventLoop 运行,处理任务。这是本文最重要的方法
我们知道 SelectStrategy#calculateStrategy(...)
方法,有 3 种返回的情况
第一种,SelectStrategy.CONTINUE
,默认实现下,不存在这个情况。 值为-2 代表要进行重试
第二种,SelectStrategy.SELECT
,进行 Selector 阻塞 select 。 值为 -1
第三种,>= 0
,已经有可以处理的任务,直接向下
总的来说,#run()
的执行过程,就是如下一张图:
2.12 select
#select(boolean oldWakenUp)
方法,选择( 查询 )任务。这是本文最重要的方法
获得 select 操作的计数器。主要用于记录 Selector 空轮询次数,所以每次在正在轮询完成( 例如:轮询超时 ),则重置 selectCnt
为 1 。
第 74 至 93 行:不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建。这就是 Netty 判断发生 NIO Selector 空轮询的方式,N ( 默认 512 )次 select 并未阻塞超时这么长,那么就认为发生 NIO Selector 空轮询。过多的 NIO Selector 将会导致 CPU 100%
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(五)之 EventLoop 处理 IO 事件
3. openSelector
#openSelector()
方法,创建 Selector 对象
4. SelectedSelectionKeySet
io.netty.channel.nio.SelectedSelectionKeySet
,继承 AbstractSet 抽象类,已 select 的 NIO SelectionKey 集合。代码如下:
5. SelectedSelectionKeySetSelector
io.netty.channel.nio.SelectedSelectionKeySetSelector
,基于 Netty SelectedSelectionKeySet 作为 selectionKeys
的 Selector 实现类
select 相关的 3 个方法,在调用对应的 Java NIO Selector 方法之前,会调用 SelectedSelectionKeySet#reset()
方法,重置 selectionKeys
。从而实现,每次 select 之后,都是新的 select 的 NIO SelectionKey 集合。
6. rebuildSelector
#rebuildSelector()
方法,重建 Selector 对象
总的来说,#rebuildSelector()
方法,相比 #openSelector()
方法,主要是需要将老的 Selector 对象的“数据”复制到新的 Selector 对象上,并关闭老的 Selector 对象。
7. processSelectedKeys
在 #run()
方法中,会调用 #processSelectedKeys()
方法,处理 Channel 新增就绪的 IO 事件
7.1 processSelectedKeysOptimized
#processSelectedKeysOptimized()
方法,基于 Netty SelectedSelectionKeySetSelector ,处理 Channel 新增就绪的 IO 事件
7.2 processSelectedKeysPlain
#processSelectedKeysOptimized()
方法,基于 Java NIO 原生 Selecotr ,处理 Channel 新增就绪的 IO 事件
7.3 processSelectedKey
#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件
8.1 register
#register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
方法,注册 Java NIO Channel ( 不一定需要通过 Netty 创建的 Channel )到 Selector 上,相当于说,也注册到了 EventLoop 上
8.2 invokeChannelUnregistered
#invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause)
方法,执行 Channel 取消注册
8.3 processSelectedKey
#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask ,自定义实现 Channel 处理 Channel IO 就绪的事件
代码比较简单,胖友自己看中文注释。主要是看懂 state
有 3 种情况:
-
0
:未执行。 -
1
:执行成功。 -
2
:执行异常。
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(六)之 EventLoop 处理普通任务
2. runAllTasks 带超时
在 #run()
方法中,会调用 #runAllTasks(long timeoutNanos)
方法,执行所有任务直到完成所有,或者超过执行时间上限
3. runAllTasks
在 #run()
方法中,会调用 #runAllTasks()
方法,执行所有任务直到完成所有
4. pollTask
#pollTask()
方法,获得队头的任务
5. afterRunningAllTasks
在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 的 「9.10 afterRunningAllTasks」 中,#afterRunningAllTasks()
方法,执行所有任务完成的后续方法
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(七)之 EventLoop 处理定时任务
2. ScheduledFutureTask
io.netty.util.concurrent.ScheduledFutureTask
,实现 ScheduledFuture、PriorityQueueNode 接口,继承 PromiseTask 抽象类,Netty 定时任务。
3. AbstractScheduledEventExecutor
io.netty.util.concurrent.AbstractScheduledEventExecutor
,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。
3.2 scheduledTaskQueue
#scheduledTaskQueue()
方法,获得定时任务队列。若未初始化,则进行创建
3.4 schedule
#schedule(final ScheduledFutureTask<V> task)
方法,提交定时任务
4. SingleThreadEventExecutor
在 《精尽 Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 中,有个 #fetchFromScheduledTaskQueue()
方法,将定时任务队列 scheduledTaskQueue
到达可执行的任务,添加到任务队列 taskQueue
中
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
EventLoop(八)之 EventLoop 优雅关闭
我们总结一下,调用shutdown()方法从循环跳出的条件有:
(1).执行完普通任务
(2).没有普通任务,执行完shutdownHook任务
(3).既没有普通任务也没有shutdownHook任务
调用shutdownGracefully()方法从循环跳出的条件有:
(1).执行完普通任务且静默时间为0
(2).没有普通任务,执行完shutdownHook任务且静默时间为0
(3).静默期间没有任务提交
(4).优雅关闭截止时间已到
可知,Netty默认的shutdownGracefully()机制为:在2秒的静默时间内如果没有任务,则关闭;否则15秒截止时间到达时关闭。换句话说,在15秒时间段内,如果有超过2秒的时间段没有任务则关闭。至此,我们明白了从EvnetLoop循环中跳出的机制,最后,我们抵达终点站:线程结束机制。这一部分的代码实现在线程工厂的生成方法中: