Kafka生产者架构 - Selector


前言

本文解析生产者流程图中的Selector部分的处理逻辑9、10。
Kafka生产者架构 - Selector


源码分析

本文基于Kafka定义的Selector的send方法和poll方法,展开源码分析。

Kafka定义的Selector是Kafka对于java原生的Selector的功能增强,用于满足Kafka的具体需求。核心是利用NetworkSend、NetworkReceive与KafkaChannel这些Kafka定义的模型,完成I/O读写操作。归根结底,使用nio的Channel与ByteBuffer交互完成更底层的读写操作。


send方法

Kafka生产者架构 - Selector

Send,本身是个接口。
官方给予的注释:This interface models the in-progress sending of data to a specific destination.
也就是说它代表一个发送数据到指定的目的地的一个模型。

Kafka生产者架构 - Selector

首先从channels获取给定id的KafkaChannel。如果不为空,直接返回。
如果为空,从closingChannels获取给定id的KafkaChannel,此时代表这个KafkaChannel即将关闭。
如果还为空,表示KafkaChannel已经断开连接。会抛出异常。

Kafka生产者架构 - Selector

设置send属性,然后监听SelectionKey.OP_WRITE事件。

由此可以看出,正常而言,Send实例(实际上是NetworkSend)的目的地是KafkaChannel。


poll方法

Selector实现了Selectable接口。而Selectable接口,对于poll(…)的声明如下:
Kafka生产者架构 - Selector

也就是承担实际的I/O读写,建立连接等工作。

poll(…)方法的内部实现篇幅比较长,这里拆分为四个部分,分开解析。核心的是第三部分。


Part One - 准备

先来看下poll(…)方法的第一个部分。主要是清除上一次poll调用的结果。

Kafka生产者架构 - Selector

  • madeReadProgressLastPoll:表示上次的poll调用在读取数据方面是否取得进展。这个参数被用来避免当内存容量不足以读取更多的数据时出现小循环。默认是true。
  • keysWithBufferedRead:Set#SelectionKey。
  • isOutOfMemory():判断内存池当前是否没有内存可以使用。
  • hasStagedReceives():判断stagedReceives列表中是否没有isMute()状态的通道。
  • stagedReceives:暂存一次OP_READ事件处理过程中读取到的所有请求。当一次OP_READ事件处理完成之后,会将stagedReceives集合中的请求保存到completeReceives集合中。

clear() - 清除上一次poll调用的结果
Kafka生产者架构 - Selector

  1. 清空已完成发送的列表、已完成接收的列表、已建立连接的列表、断开连接的列表。
  2. 遍历closingChannels,获取value - KafkaChannel。
  3. stagedReceives获取KafkaChannel对应的Deque。
  4. 从发送失败的列表里删除该KafkaChannel的id。
  5. 如果Deque不存在、Deque没有内容、删除成功,则关闭该KafkaChannel。
  6. 遍历发送失败的列表,将每一个元素与FAILED_SEND状态加入到已断开连接的集合中。
  7. 清空发送失败的列表。
  8. 将madeReadProgressLastPoll设置为false,表示上次的poll调用在读取数据方面没有取得进展。

isInMutableState() - 判断KafkaChannel是否是处于mutable的状态

所谓mutable状态,我们可以在Selector的接口-Seletable找到一些蛛丝马迹。mute(…)方法,用来禁止从给定的连接中读取数据。unmute(…)方法,与之相反,用来开启从给定的连接中读取数据。

Kafka生产者架构 - Selector

  • receive:NetworkReceive类型。
  • memoryAllocated():判断是否有调用内存。实际上是判断ByteBuffer#buffer属性是否不为空。
  • transportLayer:TransportLayer接口类型。
  • ready():对于SslTransportLayer来说,是判断它的state属性是否是READY。对于PlaintextTransportLayer来说,始终是true。

maybeUnmute() - 可能修改KafkaChannel的状态为not_muted

Kafka生产者架构 - Selector


Part Two - 检查

接着看第二部分。核心是调用nio原生的Selector#select(long timeoutMs)或者是selectNow()方法。获取对应的通道准备就绪的键的数量。
Kafka生产者架构 - Selector

Kafka生产者架构 - Selector

根据timeoutMs的设置,调用java.nio.channels.Selector的selectNow()或者select(timeoutMs)方法。这个是java nio原生的Selector,而select(…)方法返回准备就绪的键的个数。


Part Three - I/O处理

第三部分核心是围绕pollSelectionKeys(…)方法,做实际的I/O读写操作。
Kafka生产者架构 - Selector

  • numReadyKeys:准备就绪的键的个数。
  • immediatelyConnectedKeys:SeletionKey集合,其调用SocketChannel#connect(…)方法,立刻连接的。
  • dataInBuffers:表示keysWithBufferedRead集合是否有元素。keysWithBufferedRead表示使用缓冲读取的SelectionKey集合。
  • pollSelectionKeys(…):处理准备就绪的键。

pollSelectionKeys(…) - 对准备就绪的SelectionKey做了一些I/O处理。

Kafka生产者架构 - Selector

  • determineHandlingOrder(…):对于Selector#outOfMemory属性为false,并且Selector#memoryPool的可用内存大小小于lowMemThreshold,就会对SelectionKey集合调用Collections.Shufffle(...)进行重排序。
  • channel(…):获取SelectionKey.attachment(),然后类型转换为KafkaChannel。也就是获取与该SelectionKey关联的KafkaChannel。
  • update(…):将channel的id和当前时间放入到Selector#lruConnections这个LinkedHashMap里。

Kafka生产者架构 - Selector

对于调用SocketChannel#connect(…)就立刻完成连接的、SelectionKey是可连接状态,调用NIO原生的SocketChannel#finishConnect()方法完成连接。确保后续的操作中,SocketChannel一定处于连接状态。

Kafka生产者架构 - Selector

  • ready():判断Authenticator验证是否完成,并且state是否处于READY状态(对于SslTransportLayer)。
  • prepare():核心是调用SslTransportLayer#handshake()方法,完成SSL握手。

Kafka生产者架构 - Selector
Kafka生产者架构 - Selector

KafkaChannel的读处理。(这里不再展开更底层的源码)

Kafka生产者架构 - Selector
Kafka生产者架构 - Selector
Kafka生产者架构 - Selector
KafkaChannel的写处理。(这里不再展开更底层的源码)


Part Four - 收尾

不难看出大部分都是I/O操作的收尾工作。
Kafka生产者架构 - Selector


completeDelayedChannelClose(…) - 完成设置为延迟关闭的通道的关闭动作。
Kafka生产者架构 - Selector
Kafka生产者架构 - Selector

handleCloseOnAuthenticationFailure(…)方法是即使验证失败,也会完成关闭。
Kafka生产者架构 - Selector

对验证失败的处理。
Kafka生产者架构 - Selector

完成关闭。(根据CloseMode,分为优雅关闭和强制关闭)
Kafka生产者架构 - Selector

  • stagedReceives:接收完成,但是还没有暴露给用户的NetworkReceive列表。

Kafka生产者架构 - Selector

TransportLayer#disconnect(),实际上对于它的实现类-SslTransportLayer来说,调用SelectionKey.cancel()方法,也就是取消Channel在Selector上的注册。

关闭的核心方法。
Kafka生产者架构 - Selector

除了nio的SelectionKey#cancel()、attach(null)以及KafkaChannel#close()设置外,还将KafkaChannel从immediatelyConnectedKeys、keysWithBufferedRead、stagedReceives、explicitlyMutedChannels进行删除。将KafkaChannel的id、state作为key、value加入到disconnected中。

接下来看下KafkaChannel是如何关闭的。如下:
Kafka生产者架构 - Selector

对TransportLayer、Authenticator、NetworkReceive分别调用close()方法进行关闭。这里不展开详细分析了。


maybeCloseOldestConnection(…) - 关闭最久的失效连接。(可以理解为关闭闲置时间过久的连接)

Kafka生产者架构 - Selector

  • idleExpiryManager:IdleExpiryManager类型。

Kafka生产者架构 - Selector

IdleExpiryManager#lruConnections中取出连接,进行当前时间与nextIdleCloseCheckTime的比较,从而判定连接是否失效。

Kafka生产者架构 - Selector


addToCompletedReceives() - 对于满足条件的stagedReceives集合的元素,从队列中取出队首的NetworkReceive,添加到completedReceives列表中。
Kafka生产者架构 - Selector

  1. 从stagedReceives集合中迭代元素。
  2. 如果元素的key,也就是KafkaChannel,不在explicitlyMutedChannels集合中,就会从队列(元素的value)中取出队首的NetworkReceive,添加到completedReceives列表中。

Kafka生产者架构 - Selector

关于这个方法,官方给予了如**释:
Kafka生产者架构 - Selector

不难看出将NetworkReceive从原来的stagedDeque转移到completedReceives列表中的原因。