Tomcat源码分析之:ServletOutputStream的实现
转自:https://blog.****.net/fjslovejhl/article/details/23939017打开
貌似很久都没有写博客了,tomcat8的代码已经看了很多,主体部分的代码也都看得差不多了,发现在tomcat8中已经完全支持非阻塞的方式接收以及发送数据了。。。。但是比较遗憾的是,以前遗留下来的太多的老代码都不支持这种新的方式来发送数据。。。木有办法。。。
这里来看看Tomcat中是如何实现ServletOutputStream的吧。。。。
在具体的来看它之前,这里先来一张图来描述一下Tomcat的数据发送时候的流动。。。
这张图形已经比较好的展现成了Tomcat中对IO这一块封装的层次关系吧,首先是最上层的
ServletOutputStream对象,它是用户代码可以接触到的对象,它自带了自己的OutputBuffer,这里写数据其实是通过这个这个buffer完成的,数据是写到这个OutputBuffer里面
接下来的一层就是Tomcat内部的Response类型了,每一个ServletResponse对象都对应着一个唯一的Tomcat内部的response类型,当然了,他也有自己的buffer,InternalNioOutputBuffer类型的对象,这里上层ServletOutputStream写数据将会流动到这里。。。
最下层就是与channel关联的部分了,它也有自己的buffer,这里一般就是java类库中的niobuffer,上层的数据将会流动到这里。。。
最后才是将数据通过channel发送出去。。。
嗯,虽然好像层次稍微多了些,而且刚刚开始看代码就觉得稍微有点繁琐。。不过当将这个层次关系理清楚了之后还是蛮简单的。。。
好啦,接下来来看代码了。。。
这里就通过一次write来跟踪整个代码的执行轨迹吧,首先来看看CoyoteOutputStream类型的write方法:
- //发送一个byte数组的一部分数据
- public void write(byte[] b, int off, int len) throws IOException {
- boolean nonBlocking = checkNonBlockingWrite(); //判断是否支持非阻塞的发送数据,这里判断的标准其实是用户代码是否加入了WriteListener
- ob.write(b, off, len); //将数据写到outputbuffer里面
- if (nonBlocking) {
- checkRegisterForWrite(); //如果是非阻塞的发送数据的话,需要确保channel有注册写事件
- }
- }
这里首先要判断是否支持非阻塞的发送数据,这里就不细讲这部分的类容了,,,来看看这里的outputbuffer的write行为吧:
- //写入一个字节数组的一部分
- public void write(byte b[], int off, int len) throws IOException {
- if (suspended) {
- return;
- }
- writeBytes(b, off, len);
- }
- private void writeBytes(byte b[], int off, int len)
- throws IOException {
- if (closed) { //如果已经关闭了,那么直接返回吧
- return;
- }
- //这里其实是写到buffer里,如果数据过大,也有可能调用realWriteBytes方法真正的调用底层写数据
- bb.append(b, off, len);
- bytesWritten += len;
- // if called from within flush(), then immediately flush
- // remaining bytes
- if (doFlush) {
- bb.flushBuffer();
- }
- }
这里其实就是将数据直接放到当年的bytechunk就好了。。。但是这里如果数据比较多的话,会将数据直接写到下一层,也就是tomcat内置的response。。。来看看bytechunk的append方法吧:
- //加入一个字节数组的一部分数据
- public void append( byte src[], int off, int len )
- throws IOException
- {
- // will grow, up to limit
- makeSpace( len ); //确保有这么多空间可以写
- // if we don't have limit: makeSpace can grow as it wants
- if( limit < 0 ) { //表示没有空间限制
- // assert: makeSpace made enough space
- System.arraycopy( src, off, buff, end, len ); //将数据复制过来
- end+=len; //将end偏移加上len
- return;
- }
- // Optimize on a common case.
- // If the buffer is empty and the source is going to fill up all the
- // space in buffer, may as well write it directly to the output,
- // and avoid an extra copy
- //如果一次就填满了,那么还是写出去好了
- if ( len == limit && end == start && out != null ) {
- out.realWriteBytes( src, off, len ); //因为要写的数据比较大,所以直接写到更下层去
- return;
- }
- // if we have limit and we're below
- if( len <= limit - end ) { //表示还有足够的空间可以写数据
- // makeSpace will grow the buffer to the limit,
- // so we have space
- System.arraycopy( src, off, buff, end, len );
- end+=len;
- return;
- }
- // need more space than we can afford, need to flush
- // buffer
- // the buffer is already at ( or bigger than ) limit
- // We chunk the data into slices fitting in the buffer limit, although
- // if the data is written directly if it doesn't fit
- //代码执行到这里,说明空间不够了
- int avail=limit-end; //还剩下多大的空间可以写数据
- System.arraycopy(src, off, buff, end, avail);
- end += avail;
- flushBuffer();
- int remain = len - avail;
- while (remain > (limit - end)) { //不断的尝试写数据到下面去
- out.realWriteBytes( src, (off + len) - remain, limit - end );
- remain = remain - (limit - end);
- }
- System.arraycopy(src, (off + len) - remain, buff, end, remain);
- end += remain;
- }
这里可以看到realWriteBytes方法,它其实就是外面的outputbuffer定义的方法,来看看吧:
- //这个才是真正的调用底层发送数据,其实又是调用tomcat的response来写数据
- public void realWriteBytes(byte buf[], int off, int cnt)
- throws IOException {
- if (closed) {
- return;
- }
- if (coyoteResponse == null) {
- return;
- }
- // If we really have something to write
- if (cnt > 0) {
- // real write to the adapter
- outputChunk.setBytes(buf, off, cnt); //设置outputchunk
- try {
- coyoteResponse.doWrite(outputChunk); //通过tomcat的response来写数据,其实是写到httpprocessor的buffer里面去了
- } catch (IOException e) {
- // An IOException on a write is almost always due to
- // the remote client aborting the request. Wrap this
- // so that it can be handled better by the error dispatcher.
- throw new ClientAbortException(e);
- }
- }
- }
嗯,这里就与上面的层次对应上了吧,其实就是写到tomcat内置的response。。好了,接下来继续。。
- //在servlet的outputStream可能会调用这个方法来写数据
- public void doWrite(ByteChunk chunk/*byte buffer[], int pos, int count*/)
- throws IOException
- {
- outputBuffer.doWrite(chunk, this); //调用在httpprocessor里面创建的outputbuffer来写数据,这里会将数据写到niochannel的buffer,然后最终发送出去
- contentWritten+=chunk.getLength(); //标记已经发送的数据量的大小
- }
嗯,这里其实是写到internalNiobuffer里去。。。。继续看吧:
- public int doWrite(ByteChunk chunk, Response res) throws IOException {
- int len = chunk.getLength();
- int start = chunk.getStart();
- byte[] b = chunk.getBuffer();
- addToBB(b, start, len);
- byteCount += chunk.getLength();
- return chunk.getLength();
- }
嗯,没啥意思,继续:
- private synchronized void addToBB(byte[] buf, int offset, int length)
- throws IOException {
- if (length == 0) return;
- // Try to flush any data in the socket's write buffer first
- //首先尝试先将数据发送出去
- boolean dataLeft = flushBuffer(isBlocking());
- // Keep writing until all the data is written or a non-blocking write
- // leaves data in the buffer
- //这里只有在缓冲区里面已经没有数据了才继续发送
- while (!dataLeft && length > 0) {
- //首先将要发送的数据copy到niochanel的发送buffer里面去
- int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
- length = length - thisTime; //计算还剩下多少字节没有写到niochannel的buffer里面,其实这里也就当做将数据转移到了niochannel的buffer就算是写出去了
- offset = offset + thisTime; //这里用于调整偏移量
- //这里调用writeToSocket方法将niochannel的buffer的里面的数据通过socket写出去
- int written = writeToSocket(socket.getBufHandler().getWriteBuffer(),
- isBlocking(), true); //如果在tomcat的response里面有writelistener的话,可以异步的写
- if (written == 0) { //都没有写出去字节
- dataLeft = true;
- } else {
- dataLeft = flushBuffer(isBlocking()); //flush一下,看一下是否还会有数据剩余
- }
- }
- NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- if (ka != null) ka.access();//prevent timeouts for just doing client writes
- if (!isBlocking() && length > 0) { //在非阻塞的发送中,如果实在发送不出去,需要保存在额外的buffer里面
- // Remaining data must be buffered
- addToBuffers(buf, offset, length);
- }
- }
这里其实主要是调用flushBuffer方法,将数据传给下层的niochannel,而且可以看到对于过多的数据这里还会 做一层缓存。。。。
- //这里其实是flush niochannel的buffer
- protected boolean flushBuffer(boolean block) throws IOException {
- //prevent timeout for async,
- SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- if (key != null) {
- NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
- attach.access();
- }
- boolean dataLeft = hasMoreDataToFlush(); //其实这里是判断niochannel的buffer里面是否还有数据要写
- //write to the socket, if there is anything to write
- if (dataLeft) { //如果niochannel的buffer里面还有数据发送,那么继续写
- writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
- }
- dataLeft = hasMoreDataToFlush();
- //这里如果niochannel的buffer里面的数据已经发送完了,那么将将以前缓冲的数据再发送出去
- if (!dataLeft && bufferedWrites.size() > 0) {
- Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); //遍历待发送的数据
- while (!hasMoreDataToFlush() && bufIter.hasNext()) {
- ByteBufferHolder buffer = bufIter.next();
- buffer.flip();
- while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
- transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
- if (buffer.getBuf().remaining() == 0) { //如果当前buffer里面的所有数据都已经转移到了niochannel的buffer里面,那么可以将这个buffer移除了
- bufIter.remove();
- }
- writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
- //here we must break if we didn't finish the write
- }
- }
- }
- return hasMoreDataToFlush();
- }
嗯,这里其实主要是调用writeToSocket方法。。。来看看吧:
- //这里其实调用socket来写数据
- private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
- if ( flip ) {
- bytebuffer.flip();
- flipped = true;
- }
- int written = 0;
- NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- if ( att == null ) throw new IOException("Key must be cancelled");
- long writeTimeout = att.getWriteTimeout();
- Selector selector = null;
- try {
- selector = pool.get();
- } catch ( IOException x ) {
- //ignore
- }
- try {
- written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
- //make sure we are flushed
- do {
- //对于niochanel,这个flush方法其实是没用的
- if (socket.flush(true,selector,writeTimeout)) break;
- }while ( true );
- }finally {
- if ( selector != null ) {
- pool.put(selector);
- }
- }
- if ( block || bytebuffer.remaining()==0) {
- //blocking writes must empty the buffer
- //and if remaining==0 then we did empty it
- bytebuffer.clear();
- flipped = false;
- }
- // If there is data left in the buffer the socket will be registered for
- // write further up the stack. This is to ensure the socket is only
- // registered for write once as both container and user code can trigger
- // write registration.
- return written;
- }
这里因为涉及到了一些阻塞的或者非阻塞的发送数据。。所以可能会用到selector。。。
- public int write(ByteBuffer buf, NioChannel socket, Selector selector,
- long writeTimeout, boolean block) throws IOException {
- if ( SHARED && block ) { //对于写数据,一般都是这里
- return blockingSelector.write(buf,socket,writeTimeout);
- }
- //但是如果有outputstream的listener的话,可以采用非阻塞的方式来发送大量的数据
- SelectionKey key = null;
- int written = 0;
- boolean timedout = false;
- int keycount = 1; //assume we can write //假装刚开始是可以写的
- long time = System.currentTimeMillis(); //start the timeout timer
- try {
- while ( (!timedout) && buf.hasRemaining() ) {
- int cnt = 0;
- if ( keycount > 0 ) { //only write if we were registered for a write
- cnt = socket.write(buf); //write the data
- if (cnt == -1) throw new EOFException(); //出错了
- written += cnt;
- if (cnt > 0) {
- time = System.currentTimeMillis(); //reset our timeout timer
- continue; //we successfully wrote, try again without a selector
- }
- if (cnt==0 && (!block)) { //这里对于非阻塞的写,就直接返回了
- break; //don't block
- }
- }
- if ( selector != null ) {
- //register OP_WRITE to the selector
- if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
- else key.interestOps(SelectionKey.OP_WRITE);
- if (writeTimeout==0) {
- timedout = buf.hasRemaining();
- } else if (writeTimeout<0) {
- keycount = selector.select();
- } else {
- keycount = selector.select(writeTimeout);
- }
- }
- if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
- }//while
- if ( timedout ) throw new SocketTimeoutException();
- } finally {
- if (key != null) {
- key.cancel();
- if (selector != null) selector.selectNow();//removes the key from this selector
- }
- }
- return written;
- }
这里如果就直接调用niochannel来发送数据了。。不过其实这里还会涉及到将数据转移到niochannel的buffer。。然后才发送数据。。。
到这里整个数据的流动层次对照着上面的图形应该就算是 比较明白了吧。。。。