Kafka生产者架构 - Selector
前言
本文解析生产者流程图中的Selector部分的处理逻辑9、10。
源码分析
本文基于Kafka定义的Selector的send方法和poll方法,展开源码分析。
Kafka定义的Selector是Kafka对于java原生的Selector的功能增强,用于满足Kafka的具体需求。核心是利用NetworkSend、NetworkReceive与KafkaChannel这些Kafka定义的模型,完成I/O读写操作。归根结底,使用nio的Channel与ByteBuffer交互完成更底层的读写操作。
send方法
Send,本身是个接口。
官方给予的注释:This interface models the in-progress sending of data to a specific destination.
也就是说它代表一个发送数据到指定的目的地的一个模型。
首先从channels获取给定id的KafkaChannel。如果不为空,直接返回。
如果为空,从closingChannels获取给定id的KafkaChannel,此时代表这个KafkaChannel即将关闭。
如果还为空,表示KafkaChannel已经断开连接。会抛出异常。
设置send属性,然后监听SelectionKey.OP_WRITE事件。
由此可以看出,正常而言,Send实例(实际上是NetworkSend)的目的地是KafkaChannel。
poll方法
Selector实现了Selectable接口。而Selectable接口,对于poll(…)的声明如下:
也就是承担实际的I/O读写,建立连接等工作。
poll(…)方法的内部实现篇幅比较长,这里拆分为四个部分,分开解析。核心的是第三部分。
Part One - 准备
先来看下poll(…)方法的第一个部分。主要是清除上一次poll调用的结果。
- madeReadProgressLastPoll:表示上次的poll调用在读取数据方面是否取得进展。这个参数被用来避免当内存容量不足以读取更多的数据时出现小循环。默认是true。
- keysWithBufferedRead:Set#SelectionKey。
- isOutOfMemory():判断内存池当前是否没有内存可以使用。
- hasStagedReceives():判断stagedReceives列表中是否没有isMute()状态的通道。
- stagedReceives:暂存一次OP_READ事件处理过程中读取到的所有请求。当一次OP_READ事件处理完成之后,会将stagedReceives集合中的请求保存到completeReceives集合中。
clear() - 清除上一次poll调用的结果
- 清空已完成发送的列表、已完成接收的列表、已建立连接的列表、断开连接的列表。
- 遍历closingChannels,获取value - KafkaChannel。
- stagedReceives获取KafkaChannel对应的Deque。
- 从发送失败的列表里删除该KafkaChannel的id。
- 如果Deque不存在、Deque没有内容、删除成功,则关闭该KafkaChannel。
- 遍历发送失败的列表,将每一个元素与FAILED_SEND状态加入到已断开连接的集合中。
- 清空发送失败的列表。
- 将madeReadProgressLastPoll设置为false,表示上次的poll调用在读取数据方面没有取得进展。
isInMutableState() - 判断KafkaChannel是否是处于mutable的状态
所谓mutable状态,我们可以在Selector的接口-Seletable找到一些蛛丝马迹。mute(…)方法,用来禁止从给定的连接中读取数据。unmute(…)方法,与之相反,用来开启从给定的连接中读取数据。
- receive:NetworkReceive类型。
- memoryAllocated():判断是否有调用内存。实际上是判断
ByteBuffer#buffer
属性是否不为空。 - transportLayer:TransportLayer接口类型。
- ready():对于SslTransportLayer来说,是判断它的state属性是否是READY。对于PlaintextTransportLayer来说,始终是true。
maybeUnmute() - 可能修改KafkaChannel的状态为not_muted
Part Two - 检查
接着看第二部分。核心是调用nio原生的Selector#select(long timeoutMs)或者是selectNow()方法。获取对应的通道准备就绪的键的数量。
根据timeoutMs的设置,调用java.nio.channels.Selector
的selectNow()或者select(timeoutMs)方法。这个是java nio原生的Selector,而select(…)方法返回准备就绪的键的个数。
Part Three - I/O处理
第三部分核心是围绕pollSelectionKeys(…)方法,做实际的I/O读写操作。
- numReadyKeys:准备就绪的键的个数。
- immediatelyConnectedKeys:SeletionKey集合,其调用SocketChannel#connect(…)方法,立刻连接的。
- dataInBuffers:表示keysWithBufferedRead集合是否有元素。keysWithBufferedRead表示使用缓冲读取的SelectionKey集合。
- pollSelectionKeys(…):处理准备就绪的键。
pollSelectionKeys(…) - 对准备就绪的SelectionKey做了一些I/O处理。
- determineHandlingOrder(…):对于
Selector#outOfMemory
属性为false,并且Selector#memoryPool
的可用内存大小小于lowMemThreshold,就会对SelectionKey集合调用Collections.Shufffle(...)
进行重排序。 - channel(…):获取SelectionKey.attachment(),然后类型转换为KafkaChannel。也就是获取与该SelectionKey关联的KafkaChannel。
- update(…):将channel的id和当前时间放入到
Selector#lruConnections
这个LinkedHashMap里。
对于调用SocketChannel#connect(…)就立刻完成连接的、SelectionKey是可连接状态,调用NIO原生的SocketChannel#finishConnect()方法完成连接。确保后续的操作中,SocketChannel一定处于连接状态。
- ready():判断Authenticator验证是否完成,并且state是否处于READY状态(对于SslTransportLayer)。
- prepare():核心是调用
SslTransportLayer#handshake()
方法,完成SSL握手。
KafkaChannel的读处理。(这里不再展开更底层的源码)
KafkaChannel的写处理。(这里不再展开更底层的源码)
Part Four - 收尾
不难看出大部分都是I/O操作的收尾工作。
completeDelayedChannelClose(…) - 完成设置为延迟关闭的通道的关闭动作。
handleCloseOnAuthenticationFailure(…)方法是即使验证失败,也会完成关闭。
对验证失败的处理。
完成关闭。(根据CloseMode,分为优雅关闭和强制关闭)
- stagedReceives:接收完成,但是还没有暴露给用户的NetworkReceive列表。
TransportLayer#disconnect(),实际上对于它的实现类-SslTransportLayer来说,调用SelectionKey.cancel()方法,也就是取消Channel在Selector上的注册。
关闭的核心方法。
除了nio的SelectionKey#cancel()、attach(null)以及KafkaChannel#close()设置外,还将KafkaChannel从immediatelyConnectedKeys、keysWithBufferedRead、stagedReceives、explicitlyMutedChannels进行删除。将KafkaChannel的id、state作为key、value加入到disconnected中。
接下来看下KafkaChannel是如何关闭的。如下:
对TransportLayer、Authenticator、NetworkReceive分别调用close()方法进行关闭。这里不展开详细分析了。
maybeCloseOldestConnection(…) - 关闭最久的失效连接。(可以理解为关闭闲置时间过久的连接)
- idleExpiryManager:IdleExpiryManager类型。
从IdleExpiryManager#lruConnections
中取出连接,进行当前时间与nextIdleCloseCheckTime的比较,从而判定连接是否失效。
addToCompletedReceives() - 对于满足条件的stagedReceives集合的元素,从队列中取出队首的NetworkReceive,添加到completedReceives列表中。
- 从stagedReceives集合中迭代元素。
- 如果元素的key,也就是KafkaChannel,不在explicitlyMutedChannels集合中,就会从队列(元素的value)中取出队首的NetworkReceive,添加到completedReceives列表中。
关于这个方法,官方给予了如**释:
不难看出将NetworkReceive从原来的stagedDeque转移到completedReceives列表中的原因。