mina源码阅读,nio相关知识介绍。
我们所有的网络操作都离不开套接字,网络的传输常见的就是tcp/IP协议,http协议等。当然网络操作属于是系统空间上的,非用户空间能直接操作的。所以存在内核态和用户态的数据传输与拷贝,这个是有性能损耗的,相关知识需自行了解,关于Zero copy,可自行了解。
1、先从底层上来说明几种网络IO的模型,如下:
阻塞io模型:
在缺省情形下,所有文件操作都是阻塞的,在进程空间中调用recvfrom,其系统调用直到数据报到达且被拷贝到应用进程的缓冲区中或者发生错误才返回,期间一直在等待。我们就说进程在从调用recvfrom开始到它返回的整段时间内是被阻塞的。
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段,准备数据,对于network IO, 很多时候数据在一开始还没有到达,比如还没有收到一个完整的UDP包,这个时候kernel就要等待足够的数据到来。而用户进程这边,整个进程会被阻塞,当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block状态,重新运行起来。
非阻塞IO模型
进程把一个套接口设置为非阻塞是在通知内核:当所请求的IO操作不能满足要求时,不把本进程投入睡眠,而是返回一个错误。也就是说当数据没有到达时并不等待,而是以一个错误返回。
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作,一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
IO复用模型
linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,select/poll会不断轮询所负责的所有socket,可以侦测许多fd是否就绪,但select和poll是顺序扫描fd是否就绪,并且支持的fd数量有限。linux还提供了epoll系统调用,它是基于事件驱动的方式,而不是顺序扫描,当某个socket有数据到达了,可以直接通知用户进程,而不需要顺序轮询扫描,提高了效率。
当进程调用了select,整个进程会被block,同时,kernel会监视所有select负责的socket,当任何一个socket的数据准备好了,select就会返回,这个图和阻塞IO的图其实并没有多大区别,事实上,还更差一点,因为这里需要使用两个System call,select和recvFrom,而blocking io只调用了一个system call(recvfrom),但是select的好处在与它可以同时处理多个connection,(如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
信号驱动异步IO模型
首先开启套接口信号驱动I/O功能, 并通过系统调用sigaction安装一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据报准备好被读时,就为该进程生成一个SIGIO信号。随即可以在信号处理程序中调用recvfrom来读数据报,井通知主循环数据已准备好被处理中。也可以通知主循环,让它来读数据报。
异步I/O模型
告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核拷贝到用户自己的缓冲区)通知用户进程,这种模型和信号驱动模型的主要区别是:信号驱动IO:由内核通知我们何时可以启动一个IO操作,异步IO模型:由内核通知我们IO操作何时完成
用户进程发起read操作之后,立刻就可以开始去做其他的事了,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,不会对用户进程产生任何block,然后,kernel会等待数据准备完成,然后再将数据拷贝到用户进程内存,当着一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作已经完成。
小结 前面几种都是同步IO,在内核数据copy到用户空间都是阻塞的。最后一种是异步IO,通过API把IO操作交给操作系统处理,当前进程不关心具体IO的实现,通过回调函数或者信号量通知当前进程直接对IO返回结果进行处理。一个IO操作其实分成了两个步骤,发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二步是否阻塞,如果实际的IO读写阻塞请求进程,那就是同步IO,因此前四种都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那就是非阻塞IO.
以上来自网络,以及JAVA中的NIO说明,放在此处做一个记录,点此进入。
2、接下来说明mina这种框架是怎么构造的。
(1)、首先看下mina框架的整体流程,Mina 的执行流程如下所示:
主要采用责任链设计模式。
Mina的底层依赖的主要是Java NIO库,上层提供的是基于事件的异步接口。其整体的结构如下:
(2)、首先看下mina中的一个example例子:
此处主要初始化了一个NioSocketAcceptor类,这个类的初始化方法new NioSocketAcceptor()的时序图如下:
这个图中主要是涉及到AbstractPollingIoAcceptor和AbstractPollingIoProcessor,主要初始化两大线程池和session的默认配置。第一个线程池是acceptor的线程池,用于接收accept事件,第二个是processor的线程池,用于处理读写事件。如下图:
/**
* Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
* session configuration, a class of {@link IoProcessor} which will be instantiated in a
* {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
* systems.
*
* @see SimpleIoProcessorPool
*
* @param sessionConfig
* the default configuration for the managed {@link IoSession}
* @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
* type.
* @param processorCount the amount of processor to instantiate for the pool
*/
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
int processorCount) {
this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount)(此处是processor线程池), true, null);
}
下面是acceptor的连接池:
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
if (sessionConfig == null) {
throw new IllegalArgumentException("sessionConfig");
}
if (getTransportMetadata() == null) {
throw new IllegalArgumentException("TransportMetadata");
}
if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
+ getTransportMetadata().getSessionConfigType() + ")");
}
// Create the listeners, and add a first listener : a activation listener
// for this service, which will give information on the service state.
listeners = new IoServiceListenerSupport(this);
listeners.add(serviceActivationListener);
// Stores the given session configuration
this.sessionConfig = sessionConfig;
// Make JVM load the exception monitor before some transports
// change the thread context class loader.
ExceptionMonitor.getInstance();
if (executor == null) {
//此处创建了一个Acceptor的线程池
this.executor = Executors.newCachedThreadPool();
createdExecutor = true;
} else {
this.executor = executor;
createdExecutor = false;
}
threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
}
在startupAcceptor()会创建一个acceptor任务,放入线程池执行:
private void startupAcceptor() throws InterruptedException { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ? if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }
这个startupAcceptor()方法是在绑定网络地址acceptor.bind( new InetSocketAddress(PORT) )的时候创建的。也就会去监听相应的端口。
首先看下Acceptor的run方法,如下:
public void run() { assert acceptorRef.get() == this; int nHandles = 0; // Release the lock lock.release(); while (selectable) { try { System.out.println("test:"+System.currentTimeMillis()); // Process the bound sockets to this acceptor. // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on. We do that before the select because // the registerQueue containing the new handler is // already updated at this point. //此处是进行注册操作,在ACCEPT事件的选择器中注册通道 nHandles += registerHandles(); // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up int selected = select(); // Now, if the number of registered handles is 0, we can // quit the loop: we don't have any socket listening // for incoming connection. if (nHandles == 0) { acceptorRef.set(null); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { assert acceptorRef.get() != this; break; } if (!acceptorRef.compareAndSet(null, this)) { assert acceptorRef.get() != this; break; } assert acceptorRef.get() == this; } if (selected > 0) { // We have some connection request, let's process // them here. //此处是处理accept事件 processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } }
接下来查看registerHandles()方法,如下:
private int registerHandles() {
for (;;) {
// The register queue contains the list of services to manage
// in this acceptor.
AcceptorOperationFuture future = registerQueue.poll();
if (future == null) {
return 0;
}
// We create a temporary map to store the bound handles,
// as we may have to remove them all if there is an exception
// during the sockets opening.
Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>();
List<SocketAddress> localAddresses = future.getLocalAddresses();
try {
// Process all the addresses
for (SocketAddress a : localAddresses) {
//此处就是打开监听,这是一个抽象方法,在NioSocketAcceptor中实现
H handle = open(a);
newHandles.put(localAddress(handle), handle);
}
// Everything went ok, we can now update the map storing
// all the bound sockets.
boundHandles.putAll(newHandles);
// and notify.
future.setDone();
return newHandles.size();
} catch (Exception e) {
// We store the exception in the future
future.setException(e);
} finally {
// Roll back if failed to bind all addresses.
if (future.getException() != null) {
for (H handle : newHandles.values()) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
// Wake up the selector to be sure we will process the newly bound handle
// and not block forever in the select()
wakeup();
}
}
}
}
在NioSocketAcceptor类中的open方法如下:
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
// Creates the listening ServerSocket
ServerSocketChannel channel = null;
if (selectorProvider != null) {
channel = selectorProvider.openServerSocketChannel();
} else {
channel = ServerSocketChannel.open();
}
boolean success = false;
try {
// This is a non blocking socket channel
channel.configureBlocking(false);
// Configure the server socket,
ServerSocket socket = channel.socket();
// Set the reuseAddress flag accordingly with the setting
socket.setReuseAddress(isReuseAddress());
// and bind.
try {
socket.bind(localAddress, getBacklog());
} catch (IOException ioe) {
// Add some info regarding the address we try to bind to the
// message
String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
+ ioe.getMessage();
Exception e = new IOException(newMessage);
e.initCause(ioe.getCause());
// And close the channel
channel.close();
throw e;
}
// Register the channel within the selector for ACCEPT event
channel.register(selector, SelectionKey.OP_ACCEPT);
success = true;
} finally {
if (!success) {
close(channel);
}
}
return channel;
}
至此Acceptor介绍完毕,接下介绍processor,即上文中介绍的Acceptor的run方法中的 processHandles(selectedHandles()),进入该方法,如下:
private void processHandles(Iterator<H> handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session //将processor线程池绑定到session上 S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); System.out.println("add a session"); // add the session to the SocketIoProcessor //此处就是从session上获取processor的线程池,然后将session绑定到该线程池的某个processor上 session.getProcessor().add(session); } }
进入session.getProcessor().add(session)的add方法,先进入SimpleIoProcessorPool.add方法,如下:
/** * {@inheritDoc} */ @Override public final void add(S session) { getProcessor(session).add(session); }
先获取和该session绑定的线程,如果不存在,则从线程池中取一个线程出来处理这个session,并绑定。getProcessor(session)如下:
private IoProcessor<S> getProcessor(S session) { IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; }
继续查看getProcessor(session).add(session)的add方法,该方法需要进入AbstractPollingIoProcessor.add方法,如下:
public final void add(S session) { if (disposed || disposing) { throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker //这是一个异步操作,先放入队列,再启动processor线程去取出并绑定。 newSessions.add(session); //开启一个处理任务 startupProcessor(); }
startupProcessor()方法如下:
private void startupProcessor() {
Processor processor = processorRef.get();
if (processor == null) {
processor = new Processor();
if (processorRef.compareAndSet(null, processor)) {
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}
// Just stop the select() and start it again, so that the processor
// can be activated immediately.
//唤醒selector
wakeup();
}
接下来,看下processor的run方法,如下:
public void run() { assert processorRef.get() == this; lastIdleCheckTime = System.currentTimeMillis(); int nbTries = 10; for (;;) { try { // This select has a timeout so that we can manage // idle session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). long t0 = System.currentTimeMillis(); int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = t1 - t0; if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { // Last chance : the select() may have been // interrupted because we have had an closed channel. if (isBrokenConnection()) { LOG.warn("Broken connection"); } else { // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, // but // it stopped the select. The next time we will // call select(), it will exit immediately for the // same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. if (nbTries == 0) { LOG.warn("Create a new selector. Selected is 0, delta = " + delta); registerNewSelector(); nbTries = 10; } else { nbTries--; } } } else { nbTries = 10; } // Manage newly created session first //此处主要是在读事件的选择器上注册通道,另外是创建这个session的IoFilterChain,并将会话创建事件传播到责任链IoFilterChain上 if(handleNewSessions() == 0) { // Get a chance to exit the infinite loop if there are no // more sessions on this Processor if (allSessionsCount() == 0) { processorRef.set(null); if (newSessions.isEmpty() && isSelectorEmpty()) { // newSessions.add() precedes startupProcessor assert processorRef.get() != this; break; } assert processorRef.get() != this; if (!processorRef.compareAndSet(null, this)) { // startupProcessor won race, so must exit processor assert processorRef.get() != this; break; } assert processorRef.get() == this; } } updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { // LOG.debug("Processing ..."); // This log hurts one of // the MDCFilter test... //处理读写事件 process(); } // Write the pending requests long currentTime = System.currentTimeMillis(); flush(currentTime); // Last, not least, send Idle events to the idle sessions notifyIdleSessions(currentTime); // And manage removed sessions removeSessions(); // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. if (isDisposing()) { boolean hasKeys = false; for (Iterator<S> i = allSessions(); i.hasNext();) { IoSession session = i.next(); scheduleRemove((S) session); if (session.isActive()) { hasKeys = true; } } wakeup(); } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop // But first, dump a stack trace ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (disposing) { doDispose(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } }
NioProcessor的init方法,注册通道,如下:
@Override
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
selectorLock.readLock().lock();
try {
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
} finally {
selectorLock.readLock().unlock();
}
}
在AbstractPollingIoProcessor中的process()方法,如下:
private void process() throws Exception { for (Iterator<S> i = selectedSessions(); i.hasNext();) { S session = i.next(); //处理对应的session process(session); i.remove(); } } /** * Deal with session ready for the read or write operations, or both. */ private void process(S session) { // Process Reads if (isReadable(session) && !session.isReadSuspended()) { read(session); } // Process writes if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) { // add the session to the queue, if it's not already there flushingSessions.add(session); } }
自此,整个执行流程已经通过源码方式过完。
上述的时序图是在idea中装sequence diagram插件,这个可以自行安装。非常好用,帮助阅读代码,理解流程。