Netty源码之编码

之前我们已经介绍了Netty中各种解码器的解码过程,本节我们就来看一下它是如何编码的,下面是我们添加的两个handler:

ch.pipeline().addLast(new Encoder());
 ch.pipeline().addLast(new BizHandler());

第一个handler我们来看一下里面做了什么:

public class Encoder extends MessageToByteEncoder<User> {
    @Override
    protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {

        byte[] bytes = user.getName().getBytes();
        out.writeInt(4 + bytes.length);
        out.writeInt(user.getAge());
        out.writeBytes(bytes);
    }
}

其实就是覆盖了一个encode方法,再看一下另外一个handler:

public class BizHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //...

        User user = new User(19, "zhangsan");

        ctx.channel().writeAndFlush(user);
    }
}

就是向我们的channel中写了一个对象,那么我们就从这个地方开始分析:

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return pipeline.writeAndFlush(msg, promise);
}

然后套调用了pipeline的writeAndFlush方法,这个时候msg是我们传进去的user对象:

public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return tail.writeAndFlush(msg, promise);
    }

我们发现当调用writeAndFlush方法的时候是从处理链的尾部开始执行的:

 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
		
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }

        write(msg, true, promise);

        return promise;
    }

在这个方法中首先对我们传进的参数进行调用,最终嗲用了write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) {
		//找到下一个出结点
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        //获得下一个出结点绑定的处理器
        EventExecutor executor = next.executor();
        //判断处理器是否和当前线程绑定的一致
        if (executor.inEventLoop()) {
        	//根据是否flush调用不同的方法
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
        	//如果当前处理器和当前eventLoop绑定的线程不一致,就将它封装成一个task放到任务队列里面
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

这个方法中首先找到下一个出结点,然后判断是否处于当前线程中决定是否立即执行,然后调用下一个出结点的invokeWriteAndFlush方法:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

分别调用write方法和flush方法,接下来我们先看一下write的流程:

private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
        	//调用handler的write方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

在这里我们下一个处理器就是Encoder,它继承了MessageToByteEncoder<User>,看名字就能知道他是将我们传入的对象变成字节流,我们进去看一下它的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
        	//判断当前对象是否可以执行
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                //强制转换成对应的类型
                I cast = (I) msg;
                //分配空间
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                	//执行编码
                    encode(ctx, cast, buf);
                } finally {
                //释放原始对象
                    ReferenceCountUtil.release(cast);
                }
				//如果字节流可读,那么继续向下传播
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                //释放对象
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

在这个方法中主要做了这么几件事:
1、验证当前要编码的对象是否能处理,也就是根据我们传入的那个泛型
2、分配空间,这个空间用来装编码后的字节流
3、执行编码
4、释放对象
5、向下传播编码后的字节流
6、释放内存
接下来我们就看一下编码这个过程:

protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {

        byte[] bytes = user.getName().getBytes();
        out.writeInt(4 + bytes.length);
        out.writeInt(user.getAge());
        out.writeBytes(bytes);
    }

这个方法是用户自定义实现的流程,如果看过上一篇博客,就能看出来这是基于长度域的编码。

public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();
			//得到出站的缓冲区
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
            	//direct化ByteBuf
                msg = filterOutboundMessage(msg);
                //得到要写的字节的大小
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
			//添加到缓冲区中
            outboundBuffer.addMessage(msg, size, promise);
        }

在这个方法中首先将ByteBuf转化成直接内存,然后将他添加到缓冲区中,接下来我们看一下添加到缓冲区的过程:

public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(size, false);
    }

首先我们传入的字节流信息会被封装成一个entry,这个缓冲区是由三个指针维护的:
Netty源码之编码
flushedEntry到unflushedEntry之间的结点是已经flush的,unflushedEntry到TailEntry之间的结点是等待被flush的,加入缓冲区就是调整这三个指针。

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

计算现在已经写的总字节数目,如果已经大于了Netty中默认的最大字节(64字节)就将当前缓冲区设置为不可写状态,只能等到flush一部分字节之后,已经写的总字节数降下来之后才将状态修改为写状态。
一般情况下我们不会对flush进行重写,这个事件会一直被传播到头节点,所以我们直接看那里面的flush方法:

public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
			//更新缓冲区
            outboundBuffer.addFlush();
            //开始flush
            flush0();
        }

首先更新维护缓冲区的三个指针,然后进行flush:

public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

在这里面无非就是链表的操作,将所有准备flush的结点修改为flushed,并且每修改完一个状态就更新已经写的字节总数,修改当前缓冲区的写状态。

protected void flush0() {
			//如果当前正处于flush状态,就直接返回
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

           ...
                doWrite(outboundBuffer);
           ...
        }

这里面最重要的一个方法就是doWrite:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;

        boolean setOpWrite = false;
        //利用自旋锁,进行写
        for (;;) {
        	//得到当前需要flush的字节流对象
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }
			//对对象的类型进行验证
            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                //获得可以读的字节的长度
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                boolean done = false;
                long flushedAmount = 0;
                //拿到自旋锁的自旋次数
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                //通过自旋去写数据
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    // Break the loop and so incompleteWrite(...) is called.
                    break;
                }
            } 
            ...
        incompleteWrite(setOpWrite);
    }

首先在从缓冲区中拿到一下一个需要写的对象,然后过滤bytebuf,最后通过自旋锁进行写:

protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

最终调用jdk底层的api进行写,返回写的字节数。