Java读源码之Netty深入剖析----3.Netty服务端启动
Java读源码之Netty深入剖析----3.Netty服务端启动
分析服务端启动流程,包括服务端Channel的创建,初始化,以及注册到selector
3-1 服务端启动demo
3-2 服务端Channel的创建
3-3 服务端Channel的初始化
3-4 注册selector
3-5 服务端口的绑定
3-6 服务端启动总结
netty源码阅读之服务端启动
netty服务端启动分为以下几个过程:
1、服务端channel的创建
2、服务端channel的初始化
3、注册selector
4、服务端端口的绑定
我将通过以下用户的代码,后面分几篇文章为大家讲解:
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
//..
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
----------------------------------------------------------------------------------------------------------------------------------------------------
netty源码阅读之服务器启动之服务端channel的创建
首先是我们一段用户的代码
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
//..
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
我们分以下步骤分析源码:
1、通过什么方式创建服务端channel。
2、 创建服务端channel的时候做了什么事情。
通过什么方式创建服务端channel
从bind方法进入:层层深入,到了doBind方法,里面有一个initAndRegister()方法(在AbstractBootstrap类里面):
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if(channel != null) {
channel.unsafe().closeForcibly();
}
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
...
}
也就是有一个channelFactory类工厂,产生我们的Channel。具体这个channelFactory的实现是什么?我们目前先猜测是ReflectiveChannelFactory,进入ReflectiveChannelFactory的newChannel()方法,明显是使用反射的实现:
public T newChannel() {
try {
return (Channel)this.clazz.newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
}
}
那么这个clazz是什么?带着这两个疑问,来到用户代码的这一行:
.channel(NioServerSocketChannel.class)
这一行好像和clazz有关。进入bind方法:
public B channel(Class<? extends C> channelClass) {
if(channelClass == null) {
throw new NullPointerException("channelClass");
} else {
return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
}
}
看channelFactory方法,一直点进去,就是把传入的ReflectiveChannelFactory传递给AbstractBootstrap类,作为他的其中一个成员变量,这里就解释了前面的channelFactory的实现是什么,就是ReflectiveChannelFactory。
那么我们进入new ReflectiveChannelFactory(channelClass)看看它做了什么:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if(clazz == null) {
throw new NullPointerException("clazz");
} else {
this.clazz = clazz;
}
}
很明显,刚刚的clazz就是我们用户传进去的NioServerSocketChannel.class
这里,创建服务端channel也就是NioServerSocketChannel的代码已经清晰了
创建服务端channel的时候做了什么事情
我们点进去NioServerSocketChannel这个类,看它的构造方法,也就是在构造的时候做了什么事情:
无参构造器:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
DEFAULT_SELECTOR_PROVIDER的来源:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
newSocket的来源:
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
这些其实都是使用jdk底层构造ServerSocketChannel的方法创建的。
接下去看无参构造器调用一个参数的构造器:
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
super((Channel)null, channel, 16);
this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
}
这里主要做了两件事情:
1、调用父类的构造器
2、初始化一个NioServerSocketChannelConfig,以后可以使用这个tcp参数设置类。
第二点可以不用解释了,第一点层层进入,到了AbstractNioChannel(这个AbstractNioChannel是服务端和客户端channel的基类,一些公共的东西都在这个类里面创建)的这个构造方法:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
首先把ch传入,然后把readInterestOp也传入,然后是一个nio编程很常用的设置,设置为非阻塞模式:
ch.configureBlocking(false);
最后我们查看第一行super(parent);
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
也就是为这个服务端channel添加一个id,一个unsafe、一个pipeline。
自此,我们创建服务端channel的源码已经分析完,记住这些东西,以后会串起来。
-------------------------------------------------------------------------------------------------------------------------------------------
netty源码阅读之服务器启动之服务端channel的初始化
服务端channel的初始化大致分为以下几个步骤:
1、设置channelOptions,channelAttrs
2、设置childOptions,childAttrs
3、配置服务端pipeline:config handler
4、添加连接器,add serverBootstrapAcceptor,以后新的请求的都通过这个连接器处理。
首先,我们在上一篇文章的AbstractBootstrap的initAndRegister方法里面找到init(channel)这个函数,从这里开始,
现在分析的是服务端的,所以实现是ServerBootstrap的实现:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
设置channelOptions,channelAttrs
这里的实现很简单,就是把传进来的数据options 和attrs放到map里面:
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
这这里需要说明的是,我们的用户代码没有设置options。
设置childOptions,childAttrs
childOptions和childAttrs是新的链接进来之后,我们需要给她设置的属性,而上面一个设置是服务端自己的属性。
用户的代码在这里设置:
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
点进去看其中一个的实现:
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
if (childKey == null) {
throw new NullPointerException("childKey");
}
if (value == null) {
childAttrs.remove(childKey);
} else {
childAttrs.put(childKey, value);
}
return this;
}
而这个childAttrs又是一个hashMap:
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
在ServerBootstrap里面,我们这一步也是简单的复制了一下这两个hashMap:
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
简单。
配置服务端pipeline:config handler
后面有一段代码:
ChannelHandler handler = config.handler();
就是配置服务端pipeline。
添加连接器,add ServerBootstrapAcceptor
这一步是最重要的,以后我们新的请求都会经过它来初始化,就是这一句:
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
我们一个个分析参数:
currentChildGroup:
这是从childGroup来的,然后我们用户端代码有一段:b.group(bossGroup, workerGroup),点进去看,这个workerGroup就是我们ServerBootstrap的childGroup:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
currentChildHandler:
这个其实也是从下面的用户代码进来的:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
//..
}
});
currentChildOptions, currentChildAttrs:
这个就是我们上面第二步添加的childOptions和childAttrs
初始化比较简单,重点在最后一步添加连接器ServerBootstrapAcceptor。
------------------------------------------------------------------------------------------------------------------------------------------------------------
netty源码阅读之服务器启动之注册selector
注册selector主要做以下两件事情:
1、绑定eventLoop,这个eventLoop就是服务端的event,不是childEventLoop
2、registor做实际的注册
回到《netty源码阅读之服务器启动之服务端channel的创建》这盘文章的AbstractBootstrap的initAndRegister方法,里面有一段代码
ChannelFuture regFuture = config().group().register(channel);
这个register方法层层追溯,可以到达AbstractChannel里面内部类的AbstractUnsafe的register方法:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
绑定线程用的就是AbstractChannel.this.eventLoop = eventLoop。
从用户代码的b.group(bossGroup, workerGroup)追溯,这个eventLoop就是和bossGroup有千丝万缕的关系。
然后我们看register0(),点进去,在AbstractChannl里面有个这样的函数:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
在这里又做了三件事:
1、doRegister(),调用jdk底层注册
2、invokeHandlerAddedIfNeeded()
3、fireChannelRegistered(),传播事件
选择AbstractNioChannel的doregister()
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
这里就是调用jdk地层的register方法了,把自己作为attachment给注册进去。
在定义用户代码Handler,有这三个回调:
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
有两个回调,就是在这里被调用了。至于channelActive这个回调,是在端口绑定之后被回调的。此时的isActive()还是false。只有最终绑定端口之后,才会回调这个方法。下一篇文章我们会介绍。
----------------------------------------------------------------------------------------------------------------------------------------------
netty源码阅读之服务器启动之端口绑定
端口绑定,大部分是在AbstractChannel的内部类AbstractUnsafe的bind()这个方法里面完成的。主要完成了两件事情:
1、调用jdk底层绑定端口
2、传播channelActive事件。
在AbstractBoostrap的
doBind()这个方法的initAndRegister()调用后面,有一个doBind0(),一层一层进去,到达AbstractChannelHeadContext的这个方法:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
点击HeadContext的实现,再进去一层就能看到AbstractUnsafe的bind的实现:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
看NioServerSocketChannel的doBInd的实现:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
就是调用jdk底层实现端口绑定了。
观察到这一段:
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
wasActive实在doBind()之前赋值,这段代码的意思是:如果doBInd之前isActive为false,之后isActive()为true,那么就传播事件。也就是,之前没有传播过,后面绑定端口完成之后,就开始传播。
一样的道理,我们层层进入fireChannelActive最后到达HeadContext的channelAcitve方法:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
这里又做了两件事:
1、传播事件
2、把在用户代码注册的读事件注册上去。
在传播事件这件事情上,我们一层一层点进去,在AbstractChannelContextHandler的这个方法里面:
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
channelActive这个方法,找我们用户代码自己实现的channelActive,就能看到这个:
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
是不是很神奇!!
在把在用户代码注册的读事件注册上去这里,点击进去readIfIsAutoRead()这个方法,还是实现HeadContext的read方法,AbstractNioChannel的doBegainRead()方法:
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
可以很容易知道,readInterestOps就把读事件传递进去了。