Kafka——SocketServer分析
上文讲到Sender 线程把消息封装成 ClientRequest 之后放入 selector 的 send 字段在由其 poll 方法进行发送消息。那么在 KafkaServer 中则是通过 SocketServer 接受消息并且进行处理。并采用了 Reactor 模式
现在我们来看一下 SocketServer 它的模型是一个 endpoint 绑定一个 Acceptor 对应多个 Processor,每个 Processor 对应多个 Handler 线程之间用 requestChannel 进行消息传输。每个 Processor 都有一个属于自己的 Selector
初始化方法 startUp():
ConnectionQuotas对象是控制每个 ip 上最大的连接数的,如果超出的话会报TooManyConnectionsException
创建 Acceptor 和 Processor
如果startupProcessors这个标志是 false 的话那就延缓加载等到被调用的时候在加载 processor 否则就直接开启 processor 线程
接下来来看一下 Acceptor 和 Processor 他们都继承AbstractServerThread
首先Acceptor:核心方法是 run()方法处理op_accept 事件创建 socketChannel 并调用 accept方法,accept 方法则是把该事件交给 Processor.accept 处理
Processor:核心方法是 run 方法,调用 poll 方法发送成功后会加入 completedReceives,conpletedSends,disConnected 中等待处理,其流程和 sender 线程发送消息一样