【Pigeon源码阅读】RPC底层通信实现原理(八)


pigeon底层通过Netty-3.9.2.Final实现服务端和客户端的连接通信,对应实现类为NettyServer和NettyClient。在内部,处理RPC通信的核心逻辑又分别定义在NettyServerPipelineFactory和NettyClientPipelineFactory,这两个类都实现了ChannelPipelineFactory,重写了里面的getPipeline方法,用于处理发送请求和处理请求的相关流程逻辑。

pigeon TCP协议格式

pigeon目前区分两种TCP协议方式,一种是非统一(默认)协议,为普通序列化方式如Hessian,json等方式调用,另一种是统一协议,如Thrift调用和泛化调用,其中泛化调用可以在不直到api设计的基础上,直接通过指定方法名字符串来调用相应的服务方法。基于不同协议,tcp消息包格式也是不同的,下面分开解析

粘包半包问题

在TCP传输中,一个完整的消息包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,因而数据接收方无法区分消息包的头尾,在接收方来看,可能一次接收的数据内容只是一次完整请求的部分数据,或包含多次请求的数据等情况。基于此,常见有三种解决方式:

  1. 消息包定长,固定每个消息包长度,不够的话用空格补,缺点是长度不能灵活控制,字节不够的情况下补空格会造成浪费,字节过多还要处理分包传输的逻辑
  2. 使用定长消息头和变长消息体,其中消息体的长度必须在消息头指出,在接收方每次接收数据时,先确保获取到定长的消息头,再从消息头中解析出消息体的长度,以此来界定获取以此完整请求的消息包数据。
  3. 在消息包尾部指定分隔符,缺点是需要遍历数据,判断是否为分隔符,影响效率。

在pigeon中,是基于第二种,使用定长消息头和变长消息体的方式实现的。

定长消息头格式

在消息头部分,统一协议和默认协议的区别较大,这里分开讲述:

默认协议消息格式

默认协议消息格式具体包括2部分:消息头、消息体,其中消息体包含变长请求体和定长请求尾两部分。
默认协议消息头固定为7个字节:

  1. 第1-2个字节固定为十六进制的0x39、0x3A,即十进制的57、58,可以用来区分是默认协议还是统一协议。
  2. 第3个字节为序列化方式,如Hessian是2,java是3,json是7等。
  3. 第4-7个字节:消息体长度,int,占4个字节,值为请求体长度(请求或响应对象序列化后的字节长度)+请求尾长度11。

统一协议消息格式

类似默认协议消息,统一协议消息格式也包括2部分:消息头、消息体,和默认协议不同的是,统一协议中,消息体部分为完整请求体尾部不再包含请求尾,但会在请求体头部包含一个两字节长度的请求头。这里需要区分消息头和请求头的区别。
统一协议的消息头固定为8个字节:

  1. 第1-2个字节固定为十六进制的0xAB、0xBA,即十进制的171、186,或8位有符号整型的-85、-70,可以用来区分是默认协议还是统一协议。
  2. 第3个字节为协议版本,也可以称作command字节,会用来定义编解码的相关自定义行为,如压缩方式、数据校验方式等,具体command第8位表示是否需要进行校验数据完整性,第6、7位定义了是否进行压缩及具体的压缩方式。
  3. 第4个字节为序列化方式,一般为1。
  4. 第5~8个字节为消息体长度。

消息体

消息体部分,不区分是统一协议还是默认协议,最终解析出请求和响应对象类型分别为com.dianping.dpsf.protocol.DefaultRequest或com.dianping.dpsf.protocol.DefaultResponse,而除此之外,两种协议有细微区别:

  1. 统一协议没有请求尾,在消息体头部会有两个定长字节,这两个字节在序列化内部赋值,是Thrift内部计算的头部长度。
  2. 默认协议没有请求头,在消息体尾部会有11位定长字节,前8个字节为消息sequence,long型,值请从0开始递增,每个消息的sequence都不同;后3个字节固定为:29,30,31

DefaultRequest或com的具体定义如下:

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "seq", scope = DefaultRequest.class)
public class DefaultRequest implements InvocationRequest {

    /**
     * 不能随意修改!
     */
    private static final long serialVersionUID = 652592942114047764L;

    // 必填,序列化类型,默认hessian为2
    private byte serialize;

    // 必填,消息sequence,long型,值请从0开始递增,每个消息的sequence都不同
    @JsonProperty("seq")
    private long seq;

    //必填,如果调用需要返回结果,固定为1,不需要回复为2,手动回复为3
    private int callType = Constants.CALLTYPE_REPLY;

    // 必填,超时时间,单位毫秒
    private int timeout = 0;

    // 请求创建时间, 不参与序列化传输
    @JsonIgnore
    private transient long createMillisTime;

    //必填,服务名称url,服务唯一的标识
    @JsonProperty("url")
    private String serviceName;

    //必填,服务方法名称
    private String methodName;

    //必填,服务方法的参数值
    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    private Object[] parameters;

    //必填,消息类型,服务调用固定为2,心跳为1,服务框架异常为3,服务调用业务异常为4
    private int messageType = Constants.MESSAGE_TYPE_SERVICE;

    // 旧版上下文信息传递对象
    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    private Object context;

    // 服务版本
    private String version;

    // 必填,调用者所属应用名称,在META-INF/app.properties里的app.name值
    private String app = ConfigManagerLoader.getConfigManager().getAppName();
    
    // 请求体大小
    @JsonIgnore
    private transient int size;
}

DefaultResponse的具体定义如下:

@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "seq", scope = DefaultResponse.class)
public class DefaultResponse implements InvocationResponse {

    /**
     * 不能随意修改!
     */
    private static final long serialVersionUID = 4200559704846455821L;

    private transient byte serialize;

    // 返回的消息sequence,对应发送的消息sequence,long型
    @JsonProperty("seq")
    private long seq;

    //消息类型,服务调用为2,服务调用业务异常为4,服务框架异常为3,心跳为1
    private int messageType;

    // 请求返回异常的相关堆栈信息
    @JsonProperty("exception")
    private String cause;

    // 返回服务调用结果
    @JsonProperty("response")
    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    private Object returnVal;

    // 旧版上下文传递
    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
    private Object context;

    // 请求体大小
    @JsonIgnore
    private transient int size;

    // 请求创建时间, 不参与序列化传输
    @JsonIgnore
    private transient long createMillisTime;

    // 响应自定义上下文信息
    private Map<String, Serializable> responseValues = null;

}


Netty3 Handler相关实现

上下游传递原理

在开始分析pigeonRPC通信实现之前,先简略总结下netty3利用ChannelHandler完成通信处理的相关逻辑。
对于netty接收到一个请求后,会调用一系列的拦截器handler来处理我们的请求,可以将请求方看成在下游,服务方的核心处理逻辑在上游,服务方接收到请求后,会将请求不断往上游传递,交由一个个ChannelUpstreamHandler#handleUpstream方法处理,在到达最上游并由核心逻辑处理完后,又交由一个个ChannelDownStreamHandler#handleDownstream处理,到最下游通过网络通信将结果回传给客户端。
基于此,我们可以通过实现ChannelUpstreamHandler和ChannelDownStreamHandler,在请求的上下游传递中,拓展我们的逻辑。

相关实现

ChannelHandlerContext

请求在上下游处理过程中,处理的上下文数据是通过ChannelHandlerContext实现的。比如我们在特定的handler中通过ChannelHandlerContext的sendUpstream和sendDownstream方法将请求传递到下一个handler中处理。对于传递处理的上下文数据,可以通过getAttachment和setAttachment进行读写。

OneToOneDecoder & OneToOneEncoder

请求数据传输在进入服务端之后和返回客户端之前,分别需要进行解码和编码操作,这由OneToOneDecoder和OneToOneEncoder两个抽象类分别实现,两者分别实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口,一般通过继承这两个抽象类,并实现内部的encode或decode方向模版方法来实现具体的编解码操作。

SimpleChannelHandler

SimpleChannelHandler同时实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口,是一个双向handler,内部简单地实现了handleUpstream和handleDownstream,会根据传入ChannelEvent地类型,进行必要地向下转型,得到更加有意义的子类型,而后调用相关的处理方法。默认实现本Handler一般是作为最上层地handler,如果在本Handler之后还需要向更上游传递,需要确保在handleUpstream地最后,手动调用了super.handleUpstream方法。

ChannelEvent

netty有多种事件可以在Channel中传递,交由用户定义的handler处理,这些事件都以ChennelEvent的形式定义,常用有以下事件:

  1. MessageEvent:正常消息请求事件
  2. ChannelStateEvent:channel状态变更事件,包括以下几种:
    1. OPEN: channel开启
    2. BOUND: channel绑定到特定的本地地址
    3. CONNECTED: channel连接到特定的远程地址
    4. INTEREST_OPS: channel对特定感兴趣的操作会进行暂停

pigeon RPC通信的核心实现原理

下面分成服务端和客户端两部分,从这两个类展开分析pigeon

服务端实现

下面先看NettyServerPipelineFactory的实现:

public class NettyServerPipelineFactory implements ChannelPipelineFactory {
    // 服务端连接实例引用
    private NettyServer server;

    // 通过编解码工厂,获取单例编解码配置
    private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();

    public NettyServerPipelineFactory(NettyServer server) {
        this.server = server;
    }
    
    // 初始化pipeline
    public ChannelPipeline getPipeline() {
        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("framePrepender", new FramePrepender());
        pipeline.addLast("frameDecoder", new FrameDecoder());
        pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
        pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
        pipeline.addLast("providerDecoder", new ProviderDecoder());
        pipeline.addLast("providerEncoder", new ProviderEncoder());
        pipeline.addLast("serverHandler", new NettyServerHandler(server));
        return pipeline;
    }

}

对于上面所有的Handler,可以分为3类:

  1. UpStreamHandler
    1. FrameDecoder
    2. ProviderDecoder
    3. NettyServerHandler
  2. DownStreamHandler
    1. FramePrepender
    2. ProviderEncoder
  3. 双向Handler
    1. Crc32Handler
    2. compressHandler

结合代码分析,最终的调用时序如下所示:
【Pigeon源码阅读】RPC底层通信实现原理(八)
下面根据每个handler的处理顺序,依次分析每个handler的处理逻辑

FrameDecoder

顾名思义,FrameDecoder用来解析出通信管道中一次请求的数据,解决tcp通信中粘包和半包的问题。
pigeon的FrameDecoder继承自netty的org.jboss.netty.handler.codec.frame.FrameDecoder,而org.jboss.netty.handler.codec.frame.FrameDecoder又继承自SimpleChannelUpstreamHandler,在netty实现的FrameDecoder,核心实现方法是messageReceived,大致实现原理是不断读取接收到的字节流,并累加到cumulation变量,通过调用callDecode来尝试对当前累加的字节Buffer cumulation进行解码,直到解析出一个完整请求的feame对象,最后会调用Channels#fireMessageReceived触发Handler的pipeline调用来完成一次完整请求。
在解码过程callDecode中调用了一个抽象模版方法decode来完成具体的解码逻辑,decode方法尝试解析cumulation变量,如果不能按照自定义解析规则解析出一个完整请求的数据包,就返回null,否则返回一个完整的数据包,这里读取成功的同时,需要更新cumulation的字节起始点到当前完整数据包字节的尾部。
分析完Netty的解码流程,具体看看pigeon如何基于自己设计的协议格式来进行数据包解析。

在pigeon中,decode方法在自定义的FrameDecoder中实现,代码尝试先解析出请求头,再通过头部的请求体长度,解析出一次完整请求的包体,在代码中请求尾的长度包含在请求体的长度内部,具体实现如下所示:

public class FrameDecoder extends org.jboss.netty.handler.codec.frame.FrameDecoder {


    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
            throws Exception {

        Object message = null;

        // 如果当前累积的小于两个字节,直接返回null
        if (buffer.readableBytes() <= 2) {
            return message;
        }

        byte[] headMsgs = new byte[2];

        // 复制buff两个字节到headMsgs
        buffer.getBytes(buffer.readerIndex(), headMsgs);

        if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {
            // 0x39=57,0x3A=58
            //old protocol
            message = doDecode(buffer);
        } else if ((byte) 0xAB == headMsgs[0] && (byte) 0xBA == headMsgs[1]) {
            //0xAB=171,0xBA=186
            //new protocol
            message = _doDecode(buffer);

        } else {
            throw new IllegalArgumentException("Decode invalid message head:" +
                    headMsgs[0] + " " + headMsgs[1] + ", " + "message:" + buffer);
        }

        return message;

    }

    protected Object doDecode(ChannelBuffer buffer) throws Exception {

        CodecEvent codecEvent = null;
        // FRONT_LENGTH = 7,即如果buffer小于消息头7位长度,直接返回null
        if (buffer.readableBytes() <= CodecConstants.FRONT_LENGTH) {
            return codecEvent;
        }
        // 从消息的第3位开始,读取4位字节位一个无符号整数,实际位请求体大小
        int totalLength = (int) buffer.getUnsignedInt(
                buffer.readerIndex() +
                        CodecConstants.HEAD_LENGTH);

        // 最后包体大小是请求体大小+请求头大小
        int frameLength = totalLength + CodecConstants.FRONT_LENGTH;

        // 当前累积的buffer是否已经包含一个完整的数据包
        if (buffer.readableBytes() >= frameLength) {

            // 获取具体数据包字节内容
            ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
            // 更新累积缓存的读起点,方便读取处理下一个数据包
            buffer.readerIndex(buffer.readerIndex() + frameLength);

            // 用CodecEvent包装frame,并标记位非统一协议
            codecEvent = new CodecEvent(frame, false);
            // 设置接收时间
            codecEvent.setReceiveTime(System.currentTimeMillis());
        }

        return codecEvent;
    }

    protected Object _doDecode(ChannelBuffer buffer)
            throws Exception {
        CodecEvent codecEvent = null;

        // _FRONT_LENGTH = 10,即如果buffer小于消息头10位长度,直接返回null
        if (buffer.readableBytes() <= CodecConstants._FRONT_LENGTH) {
            return codecEvent;
        }

        // 从消息的第4位开始,读取4位字节位一个无符号整数,实际位请求体大小
        int totalLength = (int) (buffer.getUnsignedInt(
                buffer.readerIndex() +
                        CodecConstants._HEAD_LENGTH));

        // 最后包体大小是请求体大小+请求头大小
        int frameLength = totalLength + CodecConstants._FRONT_LENGTH_;

        // 当前累积的buffer是否已经包含一个完整的数据包
        if (buffer.readableBytes() >= frameLength) {

            // 获取具体数据包字节内容
            ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
            // 更新累积缓存的读起点,方便读取处理下一个数据包
            buffer.readerIndex(buffer.readerIndex() + frameLength);

            // 用CodecEvent包装frame,并标记位统一协议
            codecEvent = new CodecEvent(frame, true);
            // 设置接收时间
            codecEvent.setReceiveTime(System.currentTimeMillis());
        }

        return codecEvent;
    }

}

Crc32Handler

Crc32Handler主要用于校验统一协议请求的数据完整性,在解析出完整消息包长度数据之后,在解码为DefaultRequest之前,会先获取实际的数据包数据,计算crc32校验和,再和消息尾部传入的校验和进行比对,如果一致,说明校验通过,否则校验失败。而在请求结束发送相应时,又会对数据计算校验和,放在消息包尾中,以便客户端获取校验。
看看代码的具体实现:

public class Crc32Handler extends SimpleChannelHandler {

    private static final Logger logger = LoggerLoader.getLogger(Crc32Handler.class);

    private static ThreadLocal<Adler32> adler32s = new ThreadLocal<Adler32>();

    private CodecConfig codecConfig;

    public Crc32Handler(CodecConfig codecConfig) {
        this.codecConfig = codecConfig;
    }

    // 上游接收数据处理
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if (e.getMessage() == null || !(e.getMessage() instanceof CodecEvent)) {
            return;
        }

        CodecEvent codecEvent = (CodecEvent) e.getMessage();

        if (codecEvent.isValid()) {
            if (codecEvent.isUnified()) {
                // 如果是统一协议,需要进行校验
                if (!doUnChecksum(e.getChannel(), codecEvent)) {
                    // 校验失败
                    codecEvent.setIsValid(false);
                }
            }
        }
        // 向上游发送消息接收事件
        Channels.fireMessageReceived(ctx, codecEvent, e.getRemoteAddress());
    }


    // 下游发送数据前处理
    @Override
    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (!(e instanceof MessageEvent)) {
            // 只对MessageEvent添加校验
            ctx.sendDownstream(e);
            return;
        }

        MessageEvent evt = (MessageEvent) e;
        if (!(evt.getMessage() instanceof CodecEvent)) {
            // 只对CodecEvent添加校验
            ctx.sendDownstream(evt);
            return;
        }

        CodecEvent codecEvent = (CodecEvent) evt.getMessage();
        if (codecEvent.isUnified()) {
            // 只对统一协议请求添加校验
            ChannelBuffer buffer = doChecksum(e.getChannel(), codecEvent);
            codecEvent.setBuffer(buffer);

            write(ctx, evt.getFuture(), codecEvent, evt.getRemoteAddress());
        } else {
            ctx.sendDownstream(e);
        }
    }

    private boolean doUnChecksum(Channel channel, CodecEvent codecEvent) {

        ChannelBuffer frame = codecEvent.getBuffer();

        // 获取第3位字节
        byte command = frame.getByte(frame.readerIndex() +
                CodecConstants._FRONT_COMMAND_LENGTH);

        // 第8位是否位1
        if ((command & 0x80) == 0x80) {
            int frameLength = frame.readableBytes();
            // 数据包长度 = 消息包总长度 - 消息尾长度
            int dataLength = frameLength - CodecConstants._TAIL_LENGTH;
            // 初始化空buffer为数据包长度
            ChannelBuffer buffer = frame.factory().getBuffer(dataLength);
            // 将消息包中的数据包内容复制到buffer中
            buffer.writeBytes(frame, frame.readerIndex(), dataLength);

            // 设置需要进行校验
            codecEvent.setIsChecksum(true);

            // 计算数据包的校验和
            int checksum = (int) doChecksum0(buffer, dataLength);
            // 获取dataLength位(请求尾首位)的校验和
            int _checksum = frame.getInt(dataLength);

            if (checksum == _checksum) {
                // 校验有效
                int totalLength = buffer.getInt(CodecConstants._HEAD_LENGTH);
                // 更新消息包中数据包长度的4位字节实际值为数据包体的长度
                buffer.setInt(CodecConstants._HEAD_LENGTH, totalLength - CodecConstants._TAIL_LENGTH);
                // 更新codecEvent的buffer
                codecEvent.setBuffer(buffer);
            } else {
                // 校验失败
                String host = ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress();
                logger.error("Checksum failed. data from host:" + host);
                return false;
            }
        }
        return true;
    }

    private ChannelBuffer doChecksum(Channel channel, CodecEvent codecEvent) {
        ChannelBuffer frame = codecEvent.getBuffer();
        // 是否需要校验数据完整性
        boolean isChecksum = codecConfig.isChecksum();

        int command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);
        int frameLength = frame.readableBytes();

        if (isChecksum) {
            // 更新指定位数据
            command = command | 0x80;
            // command
            frame.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);
            frame.writeByte(command);
            // totalLength=消息包长度-头部+尾部
            frame.writeInt(frameLength -
                    CodecConstants._FRONT_LENGTH_ +
                    CodecConstants._TAIL_LENGTH);

            frame.writerIndex(frameLength);

            if (!(frame instanceof DynamicChannelBuffer)) {
                // 更新buffer
                ChannelBuffer buffer = frame.factory().getBuffer(frameLength +
                        CodecConstants._TAIL_LENGTH);
                buffer.writeBytes(frame, frame.readerIndex(), frameLength);
                frame = buffer;
            }

            // 计算校验和
            long checksum = doChecksum0(frame, frameLength);

            frame.writeInt((int) checksum);

        } else {
            //command=127
            command = command & 0x7f;
            frame.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);
            frame.writeByte(command);
            frame.writerIndex(frameLength);
        }

        return frame;
    }

    // crc32校验和计算
    private long doChecksum0(ChannelBuffer frame, int frameLength) {
        //checksum
        Adler32 adler32 = adler32s.get();
        if (adler32 == null) {
            adler32 = new Adler32();
            adler32s.set(adler32);
        }
        adler32.reset();

        adler32.update(frame.array(), 0, frameLength);
        return adler32.getValue();
    }

}

CompressHandler

CompressHandler构造函数传入了一个CodecConfig,用来定义压缩的相关属性,在接收请求和返回响应经过CompressHandler时,会根据配置尝试对消息包的正文部分进行压缩,压缩之后需要更新消息包正文内容的长度,先看CodeConfig实现:

public class CodecConfig {

    private static final Logger logger = LoggerLoader.getLogger(CodecConfig.class);

    private ConfigManager configManager;

    // 是否允许压缩
    private volatile boolean compressed;

    // 压缩类型
    private volatile CompressType compressType;

    // 压缩最小阈值,只有数据包大于指定阈值才进行压缩
    private volatile int compressThreshold;

    // 是否校验数据完整性
    private volatile boolean checksum;

    // 从配置中心初始化相关配置,并注册监听器,监听配置修改
    public CodecConfig(ConfigManager configManager) {
        // 指定配置中心
        this.configManager = configManager;

        // 获取配置属性并解析
        this.compressed = this.configManager.getBooleanValue(Constants.KEY_CODEC_COMPRESS_ENABLE,
                Constants.DEFAULT_CODEC_COMPRESS_ENABLE);

        this.compressType = getCompressType((byte) this.configManager.getIntValue(Constants.KEY_CODEC_COMPRESS_TYPE,
                Constants.DEFAULT_CODEC_COMPRESS_TYPE));

        this.compressThreshold = this.configManager.getIntValue(Constants.KEY_CODEC_COMPRESS_THRESHOLD,
                Constants.DEFAULT_CODEC_COMPRESS_THRESHOLD);

        this.checksum = this.configManager.getBooleanValue(Constants.KEY_CODEC_CHECKSUM_ENABLE,
                Constants.DEFAULT_CODEC_CHECKSUM_ENABLE);

        // 注册配置变更监听器
        configManager.registerConfigChangeListener(new InnerConfigChangeListener());
    }

    public boolean isCompress(int frameSize) {
        if (compressed) {
            // 只有数据包大于指定阈值才进行压缩
            if (frameSize > compressThreshold) {
                return true;
            }
        }
        return false;
    }


    public CompressType getCompressType() {
        return compressType;
    }

    // 根据配置code,获取具体的压缩类型枚举成员
    private final CompressType getCompressType(byte code) {

        CompressType compressType = CompressType.None;

        try {
            compressType = CompressType.getCompressType(code);
        } catch (Exception e) {
            logger.error("Invalid compressType. code:" + code, e);
        }

        return compressType;

    }

    public boolean isChecksum() {
        return checksum;
    }

    // 监听更新相关压缩配置
    private class InnerConfigChangeListener implements ConfigChangeListener {

        @Override
        public void onKeyUpdated(String key, String value) {
            if (key.endsWith(Constants.KEY_CODEC_COMPRESS_ENABLE)) {
                try {
                    compressed = Boolean.valueOf(value);
                } catch (RuntimeException e) {
                }
            } else if (key.endsWith(Constants.KEY_CODEC_COMPRESS_TYPE)) {
                try {
                    compressType = getCompressType(Byte.valueOf(value));
                } catch (RuntimeException e) {
                }
            } else if (key.endsWith(Constants.KEY_CODEC_COMPRESS_THRESHOLD)) {
                try {
                    compressThreshold = Integer.valueOf(value);
                } catch (RuntimeException e) {
                }
            } else if (key.endsWith(Constants.KEY_CODEC_CHECKSUM_ENABLE)) {
                try {
                    checksum = Boolean.valueOf(value);
                } catch (RuntimeException e) {
                }
            }
        }

        @Override
        public void onKeyAdded(String key, String value) {
            // TODO Auto-generated method stub

        }

        @Override
        public void onKeyRemoved(String key) {
            // TODO Auto-generated method stub

        }

    }
}

再看看CompressHandler的具体实现:

public class CompressHandler extends SimpleChannelHandler {

    private static Compress gZipCompress = CompressFactory.getGZipCompress();

    private static Compress snappyCompress = CompressFactory.getSnappyCompress();

    private CodecConfig codecConfig;

    public CompressHandler(CodecConfig codecConfig) {
        this.codecConfig = codecConfig;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if (e.getMessage() == null || !(e.getMessage() instanceof CodecEvent)) {
            return;
        }

        CodecEvent codecEvent = (CodecEvent) e.getMessage();

        if (codecEvent.isValid()) {

            if (codecEvent.isUnified()) {
                // 统一协议请求
                ChannelBuffer buffer = doUnCompress(e.getChannel(), codecEvent);
                codecEvent.setBuffer(buffer);
            }
        }

        Channels.fireMessageReceived(ctx, codecEvent, e.getRemoteAddress());
    }

    @Override
    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (!(e instanceof MessageEvent)) {
            ctx.sendDownstream(e);
            return;
        }

        MessageEvent evt = (MessageEvent) e;
        if (!(evt.getMessage() instanceof CodecEvent)) {
            ctx.sendDownstream(evt);
            return;
        }

        CodecEvent codecEvent = (CodecEvent) evt.getMessage();
        if (codecEvent.isUnified()) {
            ChannelBuffer buffer = doCompress(e.getChannel(), codecEvent);
            codecEvent.setBuffer(buffer);
            write(ctx, evt.getFuture(), codecEvent, evt.getRemoteAddress());
        } else {
            ctx.sendDownstream(e);
        }
    }

    private ChannelBuffer doUnCompress(Channel channel, CodecEvent codecEvent)
            throws IOException {
        ChannelBuffer frame = codecEvent.getBuffer();
        // 获取command值
        byte command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);
        //获取command的第6、7位
        short compress = (short) (command & 0x60);

        if (compress == 0x00) {
            return frame;
        }

        // 获取数据包体长度
        int totalLength = frame.getInt(frame.readerIndex() + CodecConstants._HEAD_LENGTH);
        // 减去头部值
        int compressLength = totalLength - CodecConstants._HEAD_FIELD_LENGTH;

        byte[] in;
        byte[] out = null;
        ChannelBuffer result;

        switch (compress) {
            case 0x00: // 没有压缩处理,直接返回
                return frame;
            case 0x20: // 解压缩处理
                in = new byte[compressLength];
                // 减去消息头长度
                frame.getBytes(frame.readerIndex() + CodecConstants._FRONT_LENGTH, in);
                // 使用snappy压缩
                out = snappyCompress.unCompress(in);
                codecEvent.setIsCompress(true);
                break;
            case 0x40: // 和0X20类似处理,只是压缩方式不同
                in = new byte[compressLength];
                frame.getBytes(frame.readerIndex() + CodecConstants._FRONT_LENGTH, in);
                // 使用gZip压缩
                out = gZipCompress.unCompress(in);
                codecEvent.setIsCompress(true);
                break;
            case 0x60:
                throw new IllegalArgumentException("Invalid compress type.");
        }

        // 实际长度是解压缩后长度+头部长度
        int _totalLength = CodecConstants._HEAD_FIELD_LENGTH + out.length;

        result = channel.getConfig().getBufferFactory().getBuffer(
                _totalLength + CodecConstants._FRONT_LENGTH_);

        // 写长度
        result.writeBytes(frame, frame.readerIndex(), CodecConstants._HEAD_LENGTH);
        result.writeInt(_totalLength);
        // 写解压缩的数据包
        result.writeBytes(frame, frame.readerIndex() + CodecConstants._FRONT_LENGTH_,
                CodecConstants._HEAD_FIELD_LENGTH);
        result.writeBytes(out);

        return result;
    }

    private ChannelBuffer doCompress(Channel channel, CodecEvent codecEvent)
            throws IOException {
        ChannelBuffer frame = codecEvent.getBuffer();
        // 取第三位字节command
        int command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);
        //compress
        ChannelBuffer result = frame;
        int frameLength = frame.readableBytes();

        if (codecConfig.isCompress(frameLength)) {
            CompressType compressType = codecConfig.getCompressType();
            // 设置command第6、7位来定义压缩类型,并进行相应的压缩操作
            switch (compressType) {
                case None:
                    command = command | 0x00;
                    break;
                case Snappy:
                    command = command | 0x20;
                    result = doCompress0(channel, frame, frameLength, snappyCompress);
                    break;
                case Gzip:
                    command = command | 0x40;
                    result = doCompress0(channel, frame, frameLength, gZipCompress);
                    break;
            }
        } else {
            // 默认不压缩
            command = command | 0x00;
        }
        // 更新command位值
        int oldWriteIndex = result.writerIndex();
        result.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);
        result.writeByte(command);
        result.writerIndex(oldWriteIndex);
        return result;
    }

    private ChannelBuffer doCompress0(Channel channel, ChannelBuffer frame,
                                      int frameLength, Compress compress)
            throws IOException {
        ChannelBuffer result;
        // 数据体长度位消息包长度减去请求头
        int bodyLength = frameLength - CodecConstants._FRONT_LENGTH;
        byte[] in = new byte[bodyLength];
        // 复制数据体到in
        frame.getBytes(CodecConstants._FRONT_LENGTH, in, 0, bodyLength);

        // 调用传入的压缩器类型完成压缩
        byte[] out = compress.compress(in);
        byte[] lengthBuf = new byte[CodecConstants._HEAD_FIELD_LENGTH];

        // 获取frame第8~10位到lengBuf
        frame.getBytes(CodecConstants._FRONT_LENGTH_, lengthBuf, 0, lengthBuf.length);

        // 压缩后的长度+2
        int totalLength = out.length + lengthBuf.length;
        // 压缩后的长度+2+8
        int _frameLength = totalLength + CodecConstants._FRONT_LENGTH_;
        result = dynamicBuffer(_frameLength, channel.getConfig().getBufferFactory());
        // 写入前4位
        result.writeBytes(frame, frame.readerIndex(), CodecConstants._HEAD_LENGTH);
        // 写入4~8位
        result.writeInt(totalLength);
        // 写入8~10位
        result.writeBytes(lengthBuf);
        // 写入压缩内容
        result.writeBytes(out);
        return result;
    }
}

ProviderDecoder

在完成消息包体解析、消息解压缩等处理后,可以开始真正对数据包解码,以解析出DefaultRequest请求对象。ProviderDecoder继承了pigeon实现的AbstractDecoder,而AbstractDecoder又继承netty实现的OneToOneDecoder,具体继承类型如下所示:
【Pigeon源码阅读】RPC底层通信实现原理(八)
在Netty的OneToOneDecoder中,实现了ChannelUpstreamHandler接口的handleUpstream方法,在内部调用了抽象方法decode来尝试对消息进行解码,如果解码前后,消息未发生变化,会继续委托给上游处理,如果解码结果为null,则丢弃本消息,否则会调用Channels#fireMessageReceived方法触发上游方法调用。而抽象模版方法decode在AbstractDecoder中有了实现,可以先看AbstractDecoder的实现:

public abstract class AbstractDecoder extends OneToOneDecoder {

    private static final Logger logger = LoggerLoader.getLogger(AbstractDecoder.class);

    @Override
    public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
            throws Exception {

        if (msg == null || !(msg instanceof CodecEvent)) {
            return null;
        }

        CodecEvent codecEvent = (CodecEvent) msg;

        if (codecEvent.isValid()) {

            Object message = null;

            if (codecEvent.isUnified()) {
                // 解码统一协议请求
                message = _doDecode(ctx, channel, codecEvent);
                codecEvent.setInvocation((InvocationSerializable) message);
            } else {
                // 解码非统一协议请求
                message = doDecode(ctx, channel, codecEvent);
                codecEvent.setInvocation((InvocationSerializable) message);
            }

        }

        return codecEvent;
    }

    protected Object doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent)
            throws IOException {
        Object msg = null;
        ChannelBuffer buffer = codecEvent.getBuffer();
        // 忽略头部两位(用于确认是否为统一协议的)
        buffer.skipBytes(CodecConstants.MEGIC_FIELD_LENGTH);
        // 第3位表示序列化方式
        byte serialize = buffer.readByte();
        Long sequence = null;

        try {
            // 4-8位表示数据体长
            int totalLength = buffer.readInt();
            // 总消息包长=消息体+头部
            int frameLength = totalLength + CodecConstants.FRONT_LENGTH;
            // 数据体长度 = 消息体 - 尾部11位长度
            int bodyLength = (totalLength - CodecConstants.TAIL_LENGTH);
            // 获取数据体长度,不包括尾部
            ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), bodyLength);
            buffer.readerIndex(buffer.readerIndex() + bodyLength);
            // 获取8位***
            sequence = buffer.readLong();
            // 跳过3位拓展位
            buffer.skipBytes(CodecConstants.EXPAND_FIELD_LENGTH);
            //deserialize
            ChannelBufferInputStream is = new ChannelBufferInputStream(frame);

            // 根据传入的序列化类型调用抽象方法完成序列化
            msg = deserialize(serialize, is);
            //after
            doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());
        } catch (Throwable e) {
            SerializationException se = new SerializationException(e);

            try {
                if (sequence != null) {
                    // 调用子类实现的抽象模版方法处理失败响应
                    doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(sequence.longValue(),
                            serialize, se));
                }

                logger.error("Deserialize failed. host:"
                        + ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()
                        + "\n" + e.getMessage(), se);

            } catch (Throwable t) {
                logger.error("[doDecode] doFailResponse failed.", t);
            }
        }
        return msg;
    }

    protected Object _doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent) throws IOException {
        Object msg = null;
        ChannelBuffer buffer = codecEvent.getBuffer();

        try {
            //magic
            buffer.skipBytes(CodecConstants._MEGIC_FIELD_LENGTH);
            //version
            buffer.readByte();
            //serialize
            byte serialize = (byte) (buffer.readByte() & 0x1f);
            serialize = SerializerFactory.convertToSerialize(serialize);

            int totalLength = buffer.readInt();
            int frameLength = totalLength + CodecConstants._FRONT_LENGTH_;

            ChannelBuffer frameBody = extractFrame(buffer, buffer.readerIndex(), totalLength);
            buffer.readerIndex(buffer.readerIndex() + totalLength);

            ChannelBufferInputStream is = new ChannelBufferInputStream(frameBody);
            //deserialize
            msg = deserialize(serialize, is);
            //doAfter
            doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());
        } catch (Throwable e) {

            logger.error("Deserialize failed. host:"
                    + ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()
                    + "\n" + e.getMessage(), e);
        }

        return msg;
    }

    // 获取buffer的一个子序列,从index开始,长度位length
    protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {
        ChannelBuffer frame = buffer.slice(index, length);
        return frame;
    }

    private Object doAfter(Channel channel,
                           Object msg,
                           byte serialize,
                           int frameLength,
                           long receiveTime)
            throws IOException {

        if (msg instanceof InvocationSerializable) {

            InvocationSerializable msg_ = (InvocationSerializable) msg;
            int msgType = msg_.getMessageType();

            // 注入size属性
            if (msgType == Constants.MESSAGE_TYPE_SERVICE && frameLength > 0) {
                msg_.setSize(frameLength);
            }

            // 注入序列化类型
            msg_.setSerialize(serialize);

            // 调用子类实现的抽象消息初始化方法
            doInitMsg(msg, channel, receiveTime);
        }


        return msg;
    }


    protected abstract Object deserialize(byte serializerType, InputStream is);

    protected abstract Object doInitMsg(Object message, Channel channel, long receiveTime);

    protected abstract void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response);

}

在子类ProviderDecoder中,实现了AbstractDecoder的三个抽象方法:

public class ProviderDecoder extends AbstractDecoder {

    @Override
    public Object doInitMsg(Object message, Channel channel, long receiveTime) {
        if (message == null) {
            return null;
        }
        // 更新请求创建时间
        InvocationRequest request = (InvocationRequest) message;
        request.setCreateMillisTime(receiveTime);
        return request;
    }

    @Override
    public void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {
        // 获取Channel并回写客户端
        NettyServerChannel nettyChannel = new NettyServerChannel(channel);
        nettyChannel.write(null, response);
    }

    @Override
    public Object deserialize(byte serializerType, InputStream is) {
        // 根据序列化类型获取指定的序列化工具类来完成序列化工作
        Object decoded = SerializerFactory.getSerializer(serializerType).deserializeRequest(is);
        return decoded;
    }
}

NettyServerHandler

NettyServerHandler继承自SimpleChannelUpstreamHandler,是pigeon服务提供方pipeline中最上游的一个Handler,调用了服务提供方处理逻辑的拦截器链,核心实现如下:

public class NettyServerHandler extends SimpleChannelUpstreamHandler {

    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        // 在SimpleChannelUpstreamHandler基础上,添加一个debug处理
        if (log.isDebugEnabled()) {
            if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
                log.debug(e.toString());
            }
        }
        // 父类根据事件类型,进行向下转型,并交由响应的方法处理,如MessageEvent则调用messageReceived方法
        super.handleUpstream(ctx, e);
    }

    /**
     * 服务器端接受到消息
     *
     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
     * org.jboss.netty.channel.MessageEvent)
     */
    @SuppressWarnings("unchecked")
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
        // 解密获取真正的InvocationRequest
        CodecEvent codecEvent = (CodecEvent) (message.getMessage());

        if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
            return;
        }

        InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();
        // 用DefaultProviderContext包装InvocationRequest请求
        ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
        try {
            // 调用注册的拦截器链处理请求
            this.server.processRequest(request, invocationContext);

        } catch (Throwable e) {
            String msg = "process request failed:" + request;
            // 心跳消息只返回正常的, 异常不返回
            if (request.getCallType() == Constants.CALLTYPE_REPLY
                    && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
                ctx.getChannel().write(ProviderUtils.createFailResponse(request, e));
            }
            log.error(msg, e);
        }
    }
}

ProviderEncoder

ProviderEncoder继承自AbstractEncoder,用于对消息响应进行加密处理,类似于ProviderDecoder的实现,下面展示其继承类图:
【Pigeon源码阅读】RPC底层通信实现原理(八)
从pigeon实现从下往上看,OneToOneEncoder类似前面的OneToOneDecoder实现,核心编码实现通过调用了内部的抽象方法encode实现,encode方法在AbstractEncoder中实现:

public abstract class AbstractEncoder extends OneToOneEncoder {

    private static final Logger logger = LoggerLoader.getLogger(AbstractEncoder.class);

    public abstract void serialize(byte serializer, OutputStream os, Object obj, Channel channel) throws IOException;

    @Override
    public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
        if (msg instanceof InvocationSerializable) {

            InvocationSerializable _msg = (InvocationSerializable) msg;
            try {

                ChannelBuffer frame;
                CodecEvent codecEvent;

                if (msg instanceof UnifiedInvocation) {
                    // 统一协议请求编码
                    frame = _doEncode(channel, (UnifiedInvocation) _msg);
                    // 用CodecEvent包装消息包
                    codecEvent = new CodecEvent(frame, true);
                } else {
                    // 非统一协议请求编码
                    frame = doEncode(channel, _msg);
                    // 用CodecEvent包装消息包
                    codecEvent = new CodecEvent(frame, false);
                }

                return codecEvent;
            } catch (Exception e) {
                SerializationException se = new SerializationException(e);

                try {
                    // 调用子类实现的抽象模版方法处理编码响应
                    doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(_msg,
                            _msg.getSerialize(), se));
                } catch (Throwable t) {
                }

                logger.error(e.getMessage(), se);
                throw se;
            }

        } else {
            throw new SerializationException("Invalid message format");
        }
    }

    protected ChannelBuffer doEncode(Channel channel, InvocationSerializable msg)
            throws IOException {
        ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
                channel.getConfig().getBufferFactory()));
        // 魔数, 0x39(57),0x3A(58)
        os.write(CodecConstants.MAGIC);
        // 序列化方式
        os.writeByte(msg.getSerialize());
        // 数据包体大小,包括消息尾部,先写最大值占位
        os.writeInt(Integer.MAX_VALUE);

        // 调用抽象模版方式序列化
        serialize(msg.getSerialize(), os, msg, channel);
        ChannelBuffer frame = os.buffer();
        // 写入递增***
        frame.writeLong(msg.getSequence());
        // 写入拓展位
        frame.writeBytes(CodecConstants.EXPAND);
        // 根据序列化后的数据包体长度更新消息体长度,这里包含消息尾
        frame.setInt(CodecConstants.HEAD_LENGTH, frame.readableBytes() -
                CodecConstants.FRONT_LENGTH);
        doAfter(msg, frame.readableBytes());
        return frame;
    }

    protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg)
            throws IOException {

        ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
                channel.getConfig().getBufferFactory()));

        // 魔数, 0xAB(171),0xBA(186)
        os.write(CodecConstants._MAGIC);
        // 写入协议版本
        os.writeByte(msg.getProtocolVersion());
        // 写入序列化方式
        byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize());
        os.writeByte(serialize);
        // 数据包体大小,包括消息尾部,先写最大值占位
        os.writeInt(Integer.MAX_VALUE);

        // 调用抽象模版方式序列化
        serialize(msg.getSerialize(), os, msg, channel);

        ChannelBuffer frame = os.buffer();
        // 根据序列化后的数据包体长度更新消息体长度,非统一协议不包含消息尾
        frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() -
                CodecConstants._FRONT_LENGTH_);
        doAfter(msg, frame.readableBytes());
        return frame;
    }

    private void doAfter(Object msg,
                         int frameLength) throws IOException {

        if (msg instanceof InvocationSerializable) {

            InvocationSerializable msg_ = (InvocationSerializable) msg;
            int msgType = msg_.getMessageType();
            // 注入size属性
            if (msgType == Constants.MESSAGE_TYPE_SERVICE && frameLength > 0) {
                msg_.setSize(frameLength);
            }

        }

    }

    public abstract void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response);

}

再看ProviderEncoder实现了AbstractEncoder的三个方法:

public class ProviderEncoder extends AbstractEncoder {

    public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
        // 无实际意义的重写实现
        Object encoded = super.encode(ctx, channel, msg);
        return encoded;
    }

    @Override
    public void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {
        // 直接写回客户端
        Channels.write(channel, response);
    }

    @Override
    public void serialize(byte serializerType, OutputStream os, Object obj, Channel channel) throws IOException {
        // 根据序列化方式获取指定的序列化工具类完成序列化操作,将序列化结果写入os中
        SerializerFactory.getSerializer(serializerType).serializeResponse(os, obj);
    }

}

FramePrepender

FramePrepender也是继承自OneToOneEncoder,重写了内部的encode方法,实现及其简单,判断如果消息类型是CodecEvent,则提取出消息的具体消息字节码内容,具体实现:

public class FramePrepender extends OneToOneEncoder {
    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
        if (msg == null || !(msg instanceof CodecEvent)) {
            return null;
        } else {
            // 提取CodecEvent的buffer数据,即实际的消息字节码
            return ((CodecEvent) msg).getBuffer();
        }
    }
}

客户端实现

下面先看NettyClientPipelineFactory的实现:

public class NettyClientPipelineFactory implements ChannelPipelineFactory {
    // 客户端连接实例引用
    private NettyClient client;
    // 通过编解码工厂,获取单例编解码配置
    private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();

    public NettyClientPipelineFactory(NettyClient client) {
        this.client = client;
    }

    // 初始化pipeline
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("framePrepender", new FramePrepender());
        pipeline.addLast("frameDecoder", new FrameDecoder());
        pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
        pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
        pipeline.addLast("invokerDecoder", new InvokerDecoder());
        pipeline.addLast("invokerEncoder", new InvokerEncoder());
        pipeline.addLast("clientHandler", new NettyClientHandler(this.client));
        return pipeline;
    }

}

从上面看到,大部分Handler与服务方类型,区别只有3个Handler:FramePrepender,FrameDecoder,NettyClientHandler
对于上面所有的Handler,类似服务方,同样可以分为3类:

  1. UpStreamHandler
    1. FrameDecoder
    2. invokerDecoder
    3. NettyClientHandler
  2. DownStreamHandler
    1. FramePrepender
    2. invokerEncoder
  3. 双向Handler
    1. Crc32Handler
    2. compressHandler

结合代码分析,最终的调用时序如下所示:

sequenceDiagram
客户端->>FrameDecoder: 开始处理请求
FrameDecoder->>Crc32Handler:decode
Crc32Handler->>compressHandler: messageReceived
compressHandler->>invokerDecoder: messageReceived
invokerDecoder->>NettyClientHandler: AbstractDecoder.decode
NettyClientHandler->>invokerEncoder: handleUpstream
invokerEncoder->>compressHandler: AbstractEncoder.encode
compressHandler->>Crc32Handler:  handleDownstream
Crc32Handler->>FramePrepender:  handleDownstream
FramePrepender->>客户端: encode

上面的所有Handler中,除了NettyClientHandler,其他Handler的处理逻辑基本和服务端实现是一致的,下面具体看看NettyClientHandler的实现原理:

NettyClientHandler

NettyClientHandler是客户端请求处理链中最顶层的Handler,用于完成处理服务端调用响应的具体逻辑,里面的核心方法实现是messageReceived:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    CodecEvent codecEvent = (CodecEvent) e.getMessage();
    if (codecEvent.isValid() && codecEvent.getInvocation() != null) {
        // 请求有效且响应消息不为null
        client.processResponse((InvocationResponse) codecEvent.getInvocation());
    }
}

调用了NettyClient#processResponse方法:

public void processResponse(InvocationResponse response) {
    this.responseProcessor.processResponse(response, this);
}

this.responseProcessor是ResponseThreadPoolProcessor实例,继承了AbstractResponseProcessor抽象类,上面调用的是AbstractResponseProcessor#processResponse方法:

public void processResponse(InvocationResponse response, Client client) {
    try {
        doProcessResponse(response, client);
    } catch (Throwable e) {
        String error = String.format("process response failed:%s, processor stats:%s", response,
                getProcessorStatistics());
        logger.error(error, e);
        if (monitor != null) {
            monitor.logError(error, e);
        }
    }
}

doProcessResponse是一个抽象方法,在子类ResponseThreadPoolProcessor中实现:

public class ResponseThreadPoolProcessor extends AbstractResponseProcessor {

    private static ThreadPool responseProcessThreadPool;

    public ResponseThreadPoolProcessor() {
        ConfigManager configManager = ConfigManagerLoader.getConfigManager();
        int corePoolSize = configManager.getIntValue(Constants.KEY_RESPONSE_COREPOOLSIZE,
                Constants.DEFAULT_RESPONSE_COREPOOLSIZE);
        int maxPoolSize = configManager.getIntValue(Constants.KEY_RESPONSE_MAXPOOLSIZE,
                Constants.DEFAULT_RESPONSE_MAXPOOLSIZE);
        int queueSize = configManager.getIntValue(Constants.KEY_RESPONSE_WORKQUEUESIZE,
                Constants.DEFAULT_RESPONSE_WORKQUEUESIZE);
        responseProcessThreadPool = new DynamicThreadPool("Pigeon-Client-Response-Processor", corePoolSize,
                maxPoolSize, queueSize, new CallerRunsPolicy(), false, false);
    }

    public void stop() {
        ThreadPoolUtils.shutdown(responseProcessThreadPool.getExecutor());
    }

    // 重写父类的抽象模版方法完成具体的处理响应逻辑
    public void doProcessResponse(final InvocationResponse response, final Client client) {
        Runnable task = new Runnable() {
            public void run() {
                // 调用ServiceInvocationRepository单例的receiveResponse方法处理接收的请求
                ServiceInvocationRepository.getInstance().receiveResponse(response);
            }
        };
        try {
            // 由根据配置初始化的线程池来执行相关信息
            responseProcessThreadPool.execute(task);
        } catch (RejectedExecutionException e) {
            String error = String.format("process response failed:%s, processor stats:%s", response,
                    getProcessorStatistics());
            throw new RejectedException(error, e);
        }
    }

    @Override
    public String getProcessorStatistics() {
        // 获取当前响应处理线程池的实际处理情况
        ThreadPoolExecutor e = responseProcessThreadPool.getExecutor();
        String stats = String.format(
                "response pool size:%d(active:%d,core:%d,max:%d,largest:%),task count:%d(completed:%d),queue size:%d",
                e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
                e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.getQueue().size());
        return stats;
    }

    @Override
    public ThreadPool getResponseProcessThreadPool() {
        return responseProcessThreadPool;
    }
}

具体处理接收响应的逻辑如下:

ublic void receiveResponse(InvocationResponse response) {
    RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
    if (invocationBean != null) {
        if (logger.isDebugEnabled()) {
            logger.debug("received response:" + response);
        }
        InvocationRequest request = invocationBean.request;
        try {
            // 处理回调
            Callback callback = invocationBean.callback;
            if (callback != null) {
                Client client = callback.getClient();
                if (client != null) {
                    // 记录请求流出
                    ServiceStatisticsHolder.flowOut(request, client.getAddress());
                }
                // 向callback对象注入response
                callback.callback(response);
                // 通知等待线程接收响应
                callback.run();
            }
        } finally {
            invocations.remove(response.getSequence());
        }
    }
}

callback.run方法会唤醒之前发送请求的拦截器链等待获取响应的线程,并根据callback.callback注入的response对象,来完成对请求响应的处理,具体实现在pigeon的CallbackFuture中,看看两个核心方法实现:

public class CallbackFuture implements Callback, CallFuture {
    @Override
    public void callback(InvocationResponse response) {
        this.response = response;
    }

    @Override
    public void run() {
        lock.lock();
        try {
            this.done = true;
            if (condition != null) {
                condition.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}