Kafka——SocketServer分析

上文讲到Sender 线程把消息封装成 ClientRequest 之后放入 selector 的 send 字段在由其 poll 方法进行发送消息。那么在 KafkaServer 中则是通过 SocketServer 接受消息并且进行处理。并采用了 Reactor 模式
Kafka——SocketServer分析
现在我们来看一下 SocketServer 它的模型是一个 endpoint 绑定一个 Acceptor 对应多个 Processor,每个 Processor 对应多个 Handler 线程之间用 requestChannel 进行消息传输。每个 Processor 都有一个属于自己的 Selector
Kafka——SocketServer分析
Kafka——SocketServer分析
初始化方法 startUp():
ConnectionQuotas对象是控制每个 ip 上最大的连接数的,如果超出的话会报TooManyConnectionsException
创建 Acceptor 和 Processor
如果startupProcessors这个标志是 false 的话那就延缓加载等到被调用的时候在加载 processor 否则就直接开启 processor 线程
Kafka——SocketServer分析
接下来来看一下 Acceptor 和 Processor 他们都继承AbstractServerThread
首先Acceptor:核心方法是 run()方法处理op_accept 事件创建 socketChannel 并调用 accept方法,accept 方法则是把该事件交给 Processor.accept 处理
Kafka——SocketServer分析
Kafka——SocketServer分析
Kafka——SocketServer分析
Kafka——SocketServer分析
Kafka——SocketServer分析
Processor:核心方法是 run 方法,调用 poll 方法发送成功后会加入 completedReceives,conpletedSends,disConnected 中等待处理,其流程和 sender 线程发送消息一样
Kafka——SocketServer分析
Kafka——SocketServer分析