Netty源码之编码
之前我们已经介绍了Netty中各种解码器的解码过程,本节我们就来看一下它是如何编码的,下面是我们添加的两个handler:
ch.pipeline().addLast(new Encoder());
ch.pipeline().addLast(new BizHandler());
第一个handler我们来看一下里面做了什么:
public class Encoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {
byte[] bytes = user.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(user.getAge());
out.writeBytes(bytes);
}
}
其实就是覆盖了一个encode方法,再看一下另外一个handler:
public class BizHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//...
User user = new User(19, "zhangsan");
ctx.channel().writeAndFlush(user);
}
}
就是向我们的channel中写了一个对象,那么我们就从这个地方开始分析:
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return pipeline.writeAndFlush(msg, promise);
}
然后套调用了pipeline的writeAndFlush方法,这个时候msg是我们传进去的user对象:
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}
我们发现当调用writeAndFlush方法的时候是从处理链的尾部开始执行的:
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
在这个方法中首先对我们传进的参数进行调用,最终嗲用了write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
//找到下一个出结点
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
//获得下一个出结点绑定的处理器
EventExecutor executor = next.executor();
//判断处理器是否和当前线程绑定的一致
if (executor.inEventLoop()) {
//根据是否flush调用不同的方法
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//如果当前处理器和当前eventLoop绑定的线程不一致,就将它封装成一个task放到任务队列里面
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
这个方法中首先找到下一个出结点,然后判断是否处于当前线程中决定是否立即执行,然后调用下一个出结点的invokeWriteAndFlush方法:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
分别调用write方法和flush方法,接下来我们先看一下write的流程:
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//调用handler的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
在这里我们下一个处理器就是Encoder,它继承了MessageToByteEncoder<User>,看名字就能知道他是将我们传入的对象变成字节流,我们进去看一下它的write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
//判断当前对象是否可以执行
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
//强制转换成对应的类型
I cast = (I) msg;
//分配空间
buf = allocateBuffer(ctx, cast, preferDirect);
try {
//执行编码
encode(ctx, cast, buf);
} finally {
//释放原始对象
ReferenceCountUtil.release(cast);
}
//如果字节流可读,那么继续向下传播
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
//释放对象
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
在这个方法中主要做了这么几件事:
1、验证当前要编码的对象是否能处理,也就是根据我们传入的那个泛型
2、分配空间,这个空间用来装编码后的字节流
3、执行编码
4、释放对象
5、向下传播编码后的字节流
6、释放内存
接下来我们就看一下编码这个过程:
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {
byte[] bytes = user.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(user.getAge());
out.writeBytes(bytes);
}
这个方法是用户自定义实现的流程,如果看过上一篇博客,就能看出来这是基于长度域的编码。
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//得到出站的缓冲区
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
//direct化ByteBuf
msg = filterOutboundMessage(msg);
//得到要写的字节的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//添加到缓冲区中
outboundBuffer.addMessage(msg, size, promise);
}
在这个方法中首先将ByteBuf转化成直接内存,然后将他添加到缓冲区中,接下来我们看一下添加到缓冲区的过程:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, false);
}
首先我们传入的字节流信息会被封装成一个entry,这个缓冲区是由三个指针维护的:
flushedEntry到unflushedEntry之间的结点是已经flush的,unflushedEntry到TailEntry之间的结点是等待被flush的,加入缓冲区就是调整这三个指针。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
计算现在已经写的总字节数目,如果已经大于了Netty中默认的最大字节(64字节)就将当前缓冲区设置为不可写状态,只能等到flush一部分字节之后,已经写的总字节数降下来之后才将状态修改为写状态。
一般情况下我们不会对flush进行重写,这个事件会一直被传播到头节点,所以我们直接看那里面的flush方法:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
//更新缓冲区
outboundBuffer.addFlush();
//开始flush
flush0();
}
首先更新维护缓冲区的三个指针,然后进行flush:
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
在这里面无非就是链表的操作,将所有准备flush的结点修改为flushed,并且每修改完一个状态就更新已经写的字节总数,修改当前缓冲区的写状态。
protected void flush0() {
//如果当前正处于flush状态,就直接返回
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
...
doWrite(outboundBuffer);
...
}
这里面最重要的一个方法就是doWrite:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
//利用自旋锁,进行写
for (;;) {
//得到当前需要flush的字节流对象
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
//对对象的类型进行验证
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
//获得可以读的字节的长度
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
//拿到自旋锁的自旋次数
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
//通过自旋去写数据
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
}
...
incompleteWrite(setOpWrite);
}
首先在从缓冲区中拿到一下一个需要写的对象,然后过滤bytebuf,最后通过自旋锁进行写:
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
最终调用jdk底层的api进行写,返回写的字节数。