netty源码分析2-3-server启动-动态分析
分享内容如下
- client请求连接后,sever端的处理
- 网络标识位的变化
- 子Reactor启动
1.client请求连接后,sever端的处理
client连接时创建NioSocketChannel,触发DefaultChannelPipeline.read()
AbstractNioMessageChannel$NioMessageUnsafe.read()
@Override
public void read() {
for (;;) {
//创建
int localRead = doReadMessages(readBuf);
}
//。。。。。。。。。。。。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
//触发ServerBootstrapAcceptor channelRead 为 新创建的NioSocketChannel注册read事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
//触发DefaultChannelPipeline.read() ,最终去修改网络标识位,还没有改?
pipeline.fireChannelReadComplete();
//。。。。。。。。。。。
pipeline.fireExceptionCaught(exception);
}
}
//使用childEventGroup创建NioSocketChannel,装入buf中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
//。。。。。。
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
//。。。。。。。
return 0;
}
public ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete();
//又执行 NioSocketChannel(AbstractNioChannel).doBeginRead() 的原因?
if (channel.config().isAutoRead()) {
read();
}
return this;
}
ServerBootstrapAcceptor-channelRead
//添加childHandler到pipeline执行链中,设置参数,执行register
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);//
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//执行register
child.unsafe().register(child.newPromise());
}
开篇图中的过程伴随着网络标志位interestOps的变化,搞懂变化的过程,对理解netty server,client启动的过程有很大帮助。
此部分中sever端 负责IO操作的selector是新的 与server端负责连接请求的selector不同, 此处的selector eventLoop().selector获取的。
2.网络标识位的变化
server端第一次执行register(eventLoop().selector, 0, this); 此处interestOps=0,什么时候 interestOps=SelectionKey.OP_ACCEPT?
NioServerSocketChannel 初始化时 传入 SelectionKey.OP_ACCEPT 。
ServerBootstrap 执行bind(),调用register()执行fireChannelActive ,更新了 interestOps=SelectionKey.OP_ACCEPT
NioServerSocketChannel更新interestOps调用链如下
NioServerSocketChannel(AbstractNioChannel).doBeginRead() line: 330
AbstractNioMessageChannel$NioMessageUnsafe(AbstractChannel$AbstractUnsafe).beginRead() line: 585
DefaultChannelPipeline$HeadHandler.read(ChannelHandlerContext) line: 1055
ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 132
DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 263
DefaultChannelHandlerContext.read() line: 403
DefaultChannelPipeline.read() line: 923
NioServerSocketChannel(AbstractChannel).read() line: 214
DefaultChannelPipeline.fireChannelActive() line: 819
AbstractChannel$AbstractUnsafe$2.run() line: 474
NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 318 ???
NioEventLoop.run() line: 353
SingleThreadEventExecutor$5.run() line: 794
Thread.run() line: 745
更新 interestOps=SelectionKey.OP_ACCEPT的代码如下
AbstractNioChannel
//更新interestOps(网络标识位)
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
chilldGroup执行register后 什么时候更改网络标志位为读即interestOps=SelectionKey.OP_READ?
客户端发起连接请求后更改的
调用链如下
NioSocketChannel(AbstractNioChannel).doBeginRead() line: 331
AbstractNioByteChannel$NioByteUnsafe(AbstractChannel$AbstractUnsafe).beginRead() line: 585
DefaultChannelPipeline$HeadHandler.read(ChannelHandlerContext) line: 1055
ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 132
DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 263
DefaultChannelHandlerContext.read() line: 403
DefaultChannelPipeline.read() line: 923
NioSocketChannel(AbstractChannel).read() line: 214
DefaultChannelPipeline.fireChannelActive() line: 819
AbstractNioByteChannel$NioByteUnsafe(AbstractChannel$AbstractUnsafe).register0(ChannelPromise) line: 429
AbstractChannel$AbstractUnsafe.access$100(AbstractChannel$AbstractUnsafe, ChannelPromise) line: 369
AbstractChannel$AbstractUnsafe$1.run() line: 403
NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 318
NioEventLoop.run() line: 353
SingleThreadEventExecutor$5.run() line: 794
Thread.run() line: 745
更新网络标准位的处理与server处理相同。
ServerBootstrapAcceptor-channelRead 执行了 child.unsafe().register(child.newPromise());
重点关注:AbstractNioByteChannel$NioByteUnsafe(AbstractChannel$AbstractUnsafe).register0(ChannelPromise) line: 429
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
// 此时isActive()==true,执行了ChannelActive事件,更新了interestOps=SelectionKey.OP_READ
//注意NioServerSocketChannel执行register执行到了这里,但是isActive()=fase
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
//。。。。。
}
}
io.netty.channel.AbstractNioChannel
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
//初始化注册 网络标识位为0
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
}
isActive()的不同实现
//NioSocketChannel类的判断,即是否连接
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
NioServerSocketChannel类的判断,即是否已经绑定,注意 执行accpet后,这个判断就等于true了
public boolean isActive() {
return javaChannel().socket().isBound();
}
创建NioSocketChannel时设置网络标识位为 SelectionKey.OP_READ
protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {
super(parent, eventLoop, ch, SelectionKey.OP_READ);
}
只有 isActive()等于true才能执行 pipeline.fireChannelActive(),创建服务端channel(NioServerSocketChannel)时执行 bind后才能触发,创建客户端连接(NioSocketChannel)的时候会立执行accept , 然后isActive()等于true, pipeline.fireChannelActive() 立即被触发。
pipeline.fireChannelActive(); 触发了创建了 网络标识位的更新, 这个值是在 创建 SocketChannel创建的,NioServerSocketChannel的值是 SelectionKey.OP_ACCEPT,NioSocketChannel的值是 SelectionKey.OP_READ。
综上所述,服务端的在执行完bind后 才会将网络位更新成SelectionKey.OP_ACCEPT,而客户端连接会直接更新成SelectionKey.OP_READ(这里面跳过了执行注册,并设置网络标识位为0的步骤)
子Reactor启动
子Reactor也是在执行 register过程中启动的
第一次执行register
pipeline.fireChannelRegistered(); -- ChannelInitializer channelRegistered channel add ServerBootStrapAccpter
第二次执行register
pipeline.fireChannelRegistered(); child channel add childHandler
fireChannelRead
客户端连接触发,最后调用ServerBootStrapAccpter channelRead()
执行bind触发
pipeline.fireChannelActive(); 修改网络标志位 sever channel update SelectionKey.OP_ACCEPT
ServerBootStrapAccpter channelRead() 执行 register 触发
pipeline.fireChannelActive(); 修改网络标志位 sever channel update SelectionKey.OP_ACCEPT