mina源码分析——bind
关于Mina
mina是开源的NIO框架,其project地址:
http://mina.apache.org/mina-project/features.html
想快速了解mina就看user guide:
http://mina.apache.org/mina-project/userguide/user-guide-toc.html
mina给我的感觉:干净、利落的抽象,非常容易上手,使用mina你只需要写不需要超过10行code就可以搭建一个TCP服务器,就像mina自身带的例子:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.example.gettingstarted.timeserver.TimeServerHandler;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class MinaTimeServer {
/**
* @param args
*/
private static final int PORT = 9123;
public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter());
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName( "UTF-8" ))));
acceptor.setHandler(new TimeServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10);
acceptor.bind(new InetSocketAddress(PORT) );
}
}
Mina源码分析之bind
mina好用只是它把复杂很好的进行了分解和隐藏,要想真正掌握它,还是要深入了解源码,所以从bind方法开始,bind方法是interface IoAcceptor声明的方法,先看看它的javadoc,了解bind方法的职责:
/**
* Binds to the default local address(es) and start to accept incoming
* connections.
*
* @throws IOException if failed to bind
*/
void bind() throws IOException;
/**
* Binds to the specified local address and start to accept incoming
* connections.
*
* @param localAddress The SocketAddress to bind to
*
* @throws IOException if failed to bind
*/
void bind(SocketAddress localAddress) throws IOException;
/**
* Binds to the specified local addresses and start to accept incoming
* connections. If no address is given, bind on the default local address.
*
* @param firstLocalAddresses The first address to bind to
* @param addresses The SocketAddresses to bind to
*
* @throws IOException if failed to bind
*/
void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;
/**
* Binds to the specified local addresses and start to accept incoming
* connections. If no address is given, bind on the default local address.
*
* @param addresses The SocketAddresses to bind to
*
* @throws IOException if failed to bind
*/
void bind(SocketAddress... addresses) throws IOException;
/**
* Binds to the specified local addresses and start to accept incoming
* connections.
*
* @throws IOException if failed to bind
*/
void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
bind方法职责
1.绑定到本地ip地址(如果没有指定就绑定到默认ip)
2.开始接受外面进来的请求。
通俗地说,一个Tcp的或则UDP的IO Server调用了bind方法后,就可以开始干活了,就可以接受请求了。顺便说一句,mina对IO的两端(Server,Client)都有很好的抽象和封装,IO Server由IoAcceptor代表,IO Client由IoConnector代表,而它们又都是IoService的直接子接口。关于mina的整体架构还是去看user guide,非常清晰。
明白了bind方法的职责,不妨先不要往下看,而是结合java NIO api 思考一下其大概的实现细节,然后再去翻code来验证你的推测。
对于一个基于NIO的server来说,要想能够开始接受外面的请求,有几件事必须做:
1.open一个SelectableChannel,SelectableChannel代表了可以支持非阻塞IO操作的channel
2.open一个Selector,Selector通过select方法来监控所有可以进行IO操作的SelectableChannel
3.完成SelectableChannel在Selector上的注册,这样Selector方可监控他。而类SelectionKey则代表一个成功的注册。
当然对于一个框架来说,肯定会做的更多,但以上3点是必须的,那接下来就去翻code吧
首先看bind方法所在的类层次:
从类图中可以看出,真正处理bind逻辑的是bindInternal方法,该方法在AbstractIoAcceptor抽象类中声明,在AbstractPollingIoAcceptor类中实现。正如javadoc的说明,AbstractPollingIoAcceptor是A base class for implementing transport using a polling strategy.
bindInternal方法
下面是bindInternal方法的code:
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
// Create a bind request as a Future operation. When the selector
// have handled the registration, it will signal this future.
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
// creates the Acceptor instance and has the local
// executor kick it off.
startupAcceptor();
// As we just started the acceptor, we have to unblock the select()
// in order to process the bind request we just have added to the
// registerQueue.
try {
lock.acquire();
// Wait a bit to give a chance to the Acceptor thread to do the select()
Thread.sleep(10);
wakeup();
} finally {
lock.release();
}
// Now, we wait until this request is completed.
request.awaitUninterruptibly();
if (request.getException() != null) {
throw request.getException();
}
// Update the local addresses.
// setLocalAddresses() shouldn't be called from the worker thread
// because of deadlock.
Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
for (H handle : boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
}
return newLocalAddresses;
}
1.创建一个bind请求:
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
当前你可以把AcceptorOperationFuture简单理解为支持异步处理和事件通知机制的一个request,后续再详细分析它。
2.把请求加到队列中:
registerQueue.add(request);
registerQueue保存所有新注册的请求,这些请求在registerHandles方法中被消费掉。registerHandles方法又被Acceptor.run方法调用。新注册成功的连接都被保存在:
private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
3.创建Acceptor类,并且异步执行该类的run方法。
startupAcceptor();
Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求。下面会重点分析这个类。
4.等待10毫秒,然后调用wakeup方法来确保selector的select方法被唤醒。
try {
lock.acquire();
// Wait a bit to give a chance to the Acceptor thread to do the select()
Thread.sleep(10);
wakeup();
} finally {
lock.release();
}
lock是一个信号量:
private final Semaphore lock = new Semaphore(1);由于可用值是1,因此相当于排他锁。
这里要重点解释一下为什么要sleep这10毫秒:要搞清楚这点,首先要看Acceptor.run方法的实现(下面介绍),该方法调用了
int selected = select();
而select方法是abstract的:
/**
* Check for acceptable connections, interrupt when at least a server is ready for accepting.
* All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
* @return The number of sockets having got incoming client
* @throws Exception any exception thrown by the underlying systems calls
*/
protected abstract int select() throws Exception;
大家能够猜到其子类:NioSocketAcceptor的实现就是调用Selector.select方法:
@Override
protected int select() throws Exception {
return selector.select();
}
而select方法在没有io事件也没有调用selector.wakeup方法时是阻塞的,反之就会被唤醒。所以才有了上面sleep10毫秒后调用wakeup方法(也是abstract方法,子类NioSocketAcceptor实现就是调用Selector.wakeUp),来唤醒select。但由于Accecptor是在独立线程运行的,因而sleep一会以保证那个线程已经启动,而不会错过唤醒select.
5.然后就等待请求被处理完毕,并判断是否有异常,没有异常就把绑定的本地地址返回。
Acceptor
Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求
接下来重点看Acceptor的run方法:
public void run() {
assert (acceptorRef.get() == this);
int nHandles = 0;
// Release the lock
lock.release();
while (selectable) {
try {
// Detect if we have some keys ready to be processed
// The select() will be woke up if some new connection
// have occurred, or if the selector has been explicitly
// woke up
int selected = select();
// this actually sets the selector to OP_ACCEPT,
// and binds to the port on which this class will
// listen on
nHandles += registerHandles();
// 这里推出while循环的逻辑 省略。。。
if (selected > 0) {
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}
// check to see if any cancellation request has been made.
nHandles -= unregisterHandles();
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
break;
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
// Cleanup all the processors, and shutdown the acceptor.
// .....
}
}
run方法重点是while循环:
1.首先调用select方法来看是否有新io请求进来(由于Acceptor只被bind和unbind调用,因此此时的请求只可能是bind请求和unbind请求)
2.调用registerHandles处理bind请求,该方法返回新处理的bind请求数目,如果=0,则推出循环,Acceptor的处理结束。处理bind请求的大概过程是:
1)从registerQueue队列中拿出bind请求:AcceptorOperationFuture(之前放进去的)
2)从AcceptorOperationFuture中拿出要bind的SocketAddress(可能不止一个);
3)对于每一个SocketAddress,调用open方法来建立ServerSocketChannel通道。open方法是abstract,其子类NioSocketAcceptor给出了实现,没错正如我们之前设想的那样,NioSocketAcceptor.open主要就是完成ServerSocketChannel的注册,源码如下:
@Override
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
// Creates the listening ServerSocket
ServerSocketChannel channel = ServerSocketChannel.open();
boolean success = false;
try {
// This is a non blocking socket channel
channel.configureBlocking(false);
// Configure the server socket,
ServerSocket socket = channel.socket();
// Set the reuseAddress flag accordingly with the setting
socket.setReuseAddress(isReuseAddress());
// and bind.
socket.bind(localAddress, getBacklog());
// Register the channel within the selector for ACCEPT event
channel.register(selector, SelectionKey.OP_ACCEPT);
success = true;
} finally {
if (!success) {
close(channel);
}
}
return channel;
}
4)把第3)建立好的ServerSocketChannel放到boundHandles中。boundHandles是一个线程安全的map:
private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
5)返回新注册的size。
3. 如果select方法返回值>0,就调用processHandles(selectedHandles());来处理。处理过程大致如下:
1)从selector中拿出有io发生的ServerSocketChannel集合;这个过程是在子类NioSocketAcceptor .selectedHandles方法中发生的。
2)对于每一个ServerSocketChannel,调用accept方法来建立会话:IoSession,IoSession是mina中非常重要的概念,以后再分析。accept方法也是abstract的,其子类NioSocketAccepor给出了实现:
@Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
SelectionKey key = handle.keyFor(selector);
if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
return null;
}
// accept the connection from the client
SocketChannel ch = handle.accept();
if (ch == null) {
return null;
}
return new NioSocketSession(this, processor, ch);
}
3)初始化session:initSession(session, null, null);
4)保存session到IoProcessor
4.处理unbind的逻辑:
nHandles -= unregisterHandles();
前面说过Acceptor的用户是bind和unbind,所以run方法里自然会有处理unbind的逻辑,不了解这一点看到这段code可能会有些困惑。
unbind的逻辑就很简单了,大致过程如下:
1)从cancelQueue中取出unbind的请求:AcceptorOperationFuture
2)从AcceptorOperationFuture中取出SocketAddress对象(可能不止一个)
3)对于每一个SocketAddress,把它从boundHandles中删除
4)对于每一个SocketAddress,调用close方法来关闭它,close方法也是abstract方法,子类NioSocketAcceptor给出了实现:
@Override
protected void close(ServerSocketChannel handle) throws Exception {
SelectionKey key = handle.keyFor(selector);
if (key != null) {
key.cancel();
}
handle.close();
}
5)调用AcceptorOperationFuture.setDone来做事件通知
AbstractPollingIoAcceptor.bindInternal
->startupAcceptor//启动Acceptor线程
-->Acceptor.run
--->registerHandles
主要逻辑:完成ServerSocketChannel在selector中的注册,感兴趣的事件是: SelectionKey.OP_ACCEPT,注册后ServerSocketChannel就可以在指定的ip和port上监听新连接。
1)从registerQueue中取出头元素,registerQueue保存待注册的请求
AcceptorOperationFuture future = registerQueue.poll();
1)创建ServerSocketChannel:ServerSocketChannel.open();
2)把ServerSocket绑定到指定的SocketAddress,SocketAddress来自
List<SocketAddress> localAddresses = future.getLocalAddresses();
3)把ServerSocketChannel注册到selector:channel.register(selector, SelectionKey.OP_ACCEPT);
4)返回channel并保存在boundHandles中
--->processHandles
主要逻辑:处理ServerSocketChannel上的新连接请求,把新连接请求构建新的IoSession并初始化,然后把IoSession实例交给IoProcessor来管理和处理io操作。
1)调用NioSocketAcceptor.accept方法返回NioSocketSession实例。构建NioSocketSession需要SocketChannel、IoProcessor等参数。
2)调用initSession方法
3)把当前session加到IoProcessor中:void add(S session)
--->unregisterHandles
主要逻辑:响应doUnbind方法,关闭ServerSocketChannel