【Pigeon源码阅读】RPC底层通信实现原理(八)
pigeon底层通过Netty-3.9.2.Final实现服务端和客户端的连接通信,对应实现类为NettyServer和NettyClient。在内部,处理RPC通信的核心逻辑又分别定义在NettyServerPipelineFactory和NettyClientPipelineFactory,这两个类都实现了ChannelPipelineFactory,重写了里面的getPipeline方法,用于处理发送请求和处理请求的相关流程逻辑。
pigeon TCP协议格式
pigeon目前区分两种TCP协议方式,一种是非统一(默认)协议,为普通序列化方式如Hessian,json等方式调用,另一种是统一协议,如Thrift调用和泛化调用,其中泛化调用可以在不直到api设计的基础上,直接通过指定方法名字符串来调用相应的服务方法。基于不同协议,tcp消息包格式也是不同的,下面分开解析
粘包半包问题
在TCP传输中,一个完整的消息包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,因而数据接收方无法区分消息包的头尾,在接收方来看,可能一次接收的数据内容只是一次完整请求的部分数据,或包含多次请求的数据等情况。基于此,常见有三种解决方式:
- 消息包定长,固定每个消息包长度,不够的话用空格补,缺点是长度不能灵活控制,字节不够的情况下补空格会造成浪费,字节过多还要处理分包传输的逻辑
- 使用定长消息头和变长消息体,其中消息体的长度必须在消息头指出,在接收方每次接收数据时,先确保获取到定长的消息头,再从消息头中解析出消息体的长度,以此来界定获取以此完整请求的消息包数据。
- 在消息包尾部指定分隔符,缺点是需要遍历数据,判断是否为分隔符,影响效率。
在pigeon中,是基于第二种,使用定长消息头和变长消息体的方式实现的。
定长消息头格式
在消息头部分,统一协议和默认协议的区别较大,这里分开讲述:
默认协议消息格式
默认协议消息格式具体包括2部分:消息头、消息体,其中消息体包含变长请求体和定长请求尾两部分。
默认协议消息头固定为7个字节:
- 第1-2个字节固定为十六进制的0x39、0x3A,即十进制的57、58,可以用来区分是默认协议还是统一协议。
- 第3个字节为序列化方式,如Hessian是2,java是3,json是7等。
- 第4-7个字节:消息体长度,int,占4个字节,值为请求体长度(请求或响应对象序列化后的字节长度)+请求尾长度11。
统一协议消息格式
类似默认协议消息,统一协议消息格式也包括2部分:消息头、消息体,和默认协议不同的是,统一协议中,消息体部分为完整请求体尾部不再包含请求尾,但会在请求体头部包含一个两字节长度的请求头。这里需要区分消息头和请求头的区别。
统一协议的消息头固定为8个字节:
- 第1-2个字节固定为十六进制的0xAB、0xBA,即十进制的171、186,或8位有符号整型的-85、-70,可以用来区分是默认协议还是统一协议。
- 第3个字节为协议版本,也可以称作command字节,会用来定义编解码的相关自定义行为,如压缩方式、数据校验方式等,具体command第8位表示是否需要进行校验数据完整性,第6、7位定义了是否进行压缩及具体的压缩方式。
- 第4个字节为序列化方式,一般为1。
- 第5~8个字节为消息体长度。
消息体
消息体部分,不区分是统一协议还是默认协议,最终解析出请求和响应对象类型分别为com.dianping.dpsf.protocol.DefaultRequest或com.dianping.dpsf.protocol.DefaultResponse,而除此之外,两种协议有细微区别:
- 统一协议没有请求尾,在消息体头部会有两个定长字节,这两个字节在序列化内部赋值,是Thrift内部计算的头部长度。
- 默认协议没有请求头,在消息体尾部会有11位定长字节,前8个字节为消息sequence,long型,值请从0开始递增,每个消息的sequence都不同;后3个字节固定为:29,30,31
DefaultRequest或com的具体定义如下:
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "seq", scope = DefaultRequest.class)
public class DefaultRequest implements InvocationRequest {
/**
* 不能随意修改!
*/
private static final long serialVersionUID = 652592942114047764L;
// 必填,序列化类型,默认hessian为2
private byte serialize;
// 必填,消息sequence,long型,值请从0开始递增,每个消息的sequence都不同
@JsonProperty("seq")
private long seq;
//必填,如果调用需要返回结果,固定为1,不需要回复为2,手动回复为3
private int callType = Constants.CALLTYPE_REPLY;
// 必填,超时时间,单位毫秒
private int timeout = 0;
// 请求创建时间, 不参与序列化传输
@JsonIgnore
private transient long createMillisTime;
//必填,服务名称url,服务唯一的标识
@JsonProperty("url")
private String serviceName;
//必填,服务方法名称
private String methodName;
//必填,服务方法的参数值
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
private Object[] parameters;
//必填,消息类型,服务调用固定为2,心跳为1,服务框架异常为3,服务调用业务异常为4
private int messageType = Constants.MESSAGE_TYPE_SERVICE;
// 旧版上下文信息传递对象
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
private Object context;
// 服务版本
private String version;
// 必填,调用者所属应用名称,在META-INF/app.properties里的app.name值
private String app = ConfigManagerLoader.getConfigManager().getAppName();
// 请求体大小
@JsonIgnore
private transient int size;
}
DefaultResponse的具体定义如下:
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "seq", scope = DefaultResponse.class)
public class DefaultResponse implements InvocationResponse {
/**
* 不能随意修改!
*/
private static final long serialVersionUID = 4200559704846455821L;
private transient byte serialize;
// 返回的消息sequence,对应发送的消息sequence,long型
@JsonProperty("seq")
private long seq;
//消息类型,服务调用为2,服务调用业务异常为4,服务框架异常为3,心跳为1
private int messageType;
// 请求返回异常的相关堆栈信息
@JsonProperty("exception")
private String cause;
// 返回服务调用结果
@JsonProperty("response")
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
private Object returnVal;
// 旧版上下文传递
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
private Object context;
// 请求体大小
@JsonIgnore
private transient int size;
// 请求创建时间, 不参与序列化传输
@JsonIgnore
private transient long createMillisTime;
// 响应自定义上下文信息
private Map<String, Serializable> responseValues = null;
}
Netty3 Handler相关实现
上下游传递原理
在开始分析pigeonRPC通信实现之前,先简略总结下netty3利用ChannelHandler完成通信处理的相关逻辑。
对于netty接收到一个请求后,会调用一系列的拦截器handler来处理我们的请求,可以将请求方看成在下游,服务方的核心处理逻辑在上游,服务方接收到请求后,会将请求不断往上游传递,交由一个个ChannelUpstreamHandler#handleUpstream方法处理,在到达最上游并由核心逻辑处理完后,又交由一个个ChannelDownStreamHandler#handleDownstream处理,到最下游通过网络通信将结果回传给客户端。
基于此,我们可以通过实现ChannelUpstreamHandler和ChannelDownStreamHandler,在请求的上下游传递中,拓展我们的逻辑。
相关实现
ChannelHandlerContext
请求在上下游处理过程中,处理的上下文数据是通过ChannelHandlerContext实现的。比如我们在特定的handler中通过ChannelHandlerContext的sendUpstream和sendDownstream方法将请求传递到下一个handler中处理。对于传递处理的上下文数据,可以通过getAttachment和setAttachment进行读写。
OneToOneDecoder & OneToOneEncoder
请求数据传输在进入服务端之后和返回客户端之前,分别需要进行解码和编码操作,这由OneToOneDecoder和OneToOneEncoder两个抽象类分别实现,两者分别实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口,一般通过继承这两个抽象类,并实现内部的encode或decode方向模版方法来实现具体的编解码操作。
SimpleChannelHandler
SimpleChannelHandler同时实现了ChannelUpstreamHandler和ChannelDownstreamHandler接口,是一个双向handler,内部简单地实现了handleUpstream和handleDownstream,会根据传入ChannelEvent地类型,进行必要地向下转型,得到更加有意义的子类型,而后调用相关的处理方法。默认实现本Handler一般是作为最上层地handler,如果在本Handler之后还需要向更上游传递,需要确保在handleUpstream地最后,手动调用了super.handleUpstream方法。
ChannelEvent
netty有多种事件可以在Channel中传递,交由用户定义的handler处理,这些事件都以ChennelEvent的形式定义,常用有以下事件:
- MessageEvent:正常消息请求事件
- ChannelStateEvent:channel状态变更事件,包括以下几种:
- OPEN: channel开启
- BOUND: channel绑定到特定的本地地址
- CONNECTED: channel连接到特定的远程地址
- INTEREST_OPS: channel对特定感兴趣的操作会进行暂停
pigeon RPC通信的核心实现原理
下面分成服务端和客户端两部分,从这两个类展开分析pigeon
服务端实现
下面先看NettyServerPipelineFactory的实现:
public class NettyServerPipelineFactory implements ChannelPipelineFactory {
// 服务端连接实例引用
private NettyServer server;
// 通过编解码工厂,获取单例编解码配置
private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();
public NettyServerPipelineFactory(NettyServer server) {
this.server = server;
}
// 初始化pipeline
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("framePrepender", new FramePrepender());
pipeline.addLast("frameDecoder", new FrameDecoder());
pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
pipeline.addLast("providerDecoder", new ProviderDecoder());
pipeline.addLast("providerEncoder", new ProviderEncoder());
pipeline.addLast("serverHandler", new NettyServerHandler(server));
return pipeline;
}
}
对于上面所有的Handler,可以分为3类:
- UpStreamHandler
- FrameDecoder
- ProviderDecoder
- NettyServerHandler
- DownStreamHandler
- FramePrepender
- ProviderEncoder
- 双向Handler
- Crc32Handler
- compressHandler
结合代码分析,最终的调用时序如下所示:
下面根据每个handler的处理顺序,依次分析每个handler的处理逻辑
FrameDecoder
顾名思义,FrameDecoder用来解析出通信管道中一次请求的数据,解决tcp通信中粘包和半包的问题。
pigeon的FrameDecoder继承自netty的org.jboss.netty.handler.codec.frame.FrameDecoder,而org.jboss.netty.handler.codec.frame.FrameDecoder又继承自SimpleChannelUpstreamHandler,在netty实现的FrameDecoder,核心实现方法是messageReceived
,大致实现原理是不断读取接收到的字节流,并累加到cumulation变量,通过调用callDecode
来尝试对当前累加的字节Buffer cumulation进行解码,直到解析出一个完整请求的feame对象,最后会调用Channels#fireMessageReceived触发Handler的pipeline调用来完成一次完整请求。
在解码过程callDecode
中调用了一个抽象模版方法decode
来完成具体的解码逻辑,decode
方法尝试解析cumulation变量,如果不能按照自定义解析规则解析出一个完整请求的数据包,就返回null,否则返回一个完整的数据包,这里读取成功的同时,需要更新cumulation的字节起始点到当前完整数据包字节的尾部。
分析完Netty的解码流程,具体看看pigeon如何基于自己设计的协议格式来进行数据包解析。
在pigeon中,decode
方法在自定义的FrameDecoder中实现,代码尝试先解析出请求头,再通过头部的请求体长度,解析出一次完整请求的包体,在代码中请求尾的长度包含在请求体的长度内部,具体实现如下所示:
public class FrameDecoder extends org.jboss.netty.handler.codec.frame.FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
throws Exception {
Object message = null;
// 如果当前累积的小于两个字节,直接返回null
if (buffer.readableBytes() <= 2) {
return message;
}
byte[] headMsgs = new byte[2];
// 复制buff两个字节到headMsgs
buffer.getBytes(buffer.readerIndex(), headMsgs);
if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {
// 0x39=57,0x3A=58
//old protocol
message = doDecode(buffer);
} else if ((byte) 0xAB == headMsgs[0] && (byte) 0xBA == headMsgs[1]) {
//0xAB=171,0xBA=186
//new protocol
message = _doDecode(buffer);
} else {
throw new IllegalArgumentException("Decode invalid message head:" +
headMsgs[0] + " " + headMsgs[1] + ", " + "message:" + buffer);
}
return message;
}
protected Object doDecode(ChannelBuffer buffer) throws Exception {
CodecEvent codecEvent = null;
// FRONT_LENGTH = 7,即如果buffer小于消息头7位长度,直接返回null
if (buffer.readableBytes() <= CodecConstants.FRONT_LENGTH) {
return codecEvent;
}
// 从消息的第3位开始,读取4位字节位一个无符号整数,实际位请求体大小
int totalLength = (int) buffer.getUnsignedInt(
buffer.readerIndex() +
CodecConstants.HEAD_LENGTH);
// 最后包体大小是请求体大小+请求头大小
int frameLength = totalLength + CodecConstants.FRONT_LENGTH;
// 当前累积的buffer是否已经包含一个完整的数据包
if (buffer.readableBytes() >= frameLength) {
// 获取具体数据包字节内容
ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
// 更新累积缓存的读起点,方便读取处理下一个数据包
buffer.readerIndex(buffer.readerIndex() + frameLength);
// 用CodecEvent包装frame,并标记位非统一协议
codecEvent = new CodecEvent(frame, false);
// 设置接收时间
codecEvent.setReceiveTime(System.currentTimeMillis());
}
return codecEvent;
}
protected Object _doDecode(ChannelBuffer buffer)
throws Exception {
CodecEvent codecEvent = null;
// _FRONT_LENGTH = 10,即如果buffer小于消息头10位长度,直接返回null
if (buffer.readableBytes() <= CodecConstants._FRONT_LENGTH) {
return codecEvent;
}
// 从消息的第4位开始,读取4位字节位一个无符号整数,实际位请求体大小
int totalLength = (int) (buffer.getUnsignedInt(
buffer.readerIndex() +
CodecConstants._HEAD_LENGTH));
// 最后包体大小是请求体大小+请求头大小
int frameLength = totalLength + CodecConstants._FRONT_LENGTH_;
// 当前累积的buffer是否已经包含一个完整的数据包
if (buffer.readableBytes() >= frameLength) {
// 获取具体数据包字节内容
ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
// 更新累积缓存的读起点,方便读取处理下一个数据包
buffer.readerIndex(buffer.readerIndex() + frameLength);
// 用CodecEvent包装frame,并标记位统一协议
codecEvent = new CodecEvent(frame, true);
// 设置接收时间
codecEvent.setReceiveTime(System.currentTimeMillis());
}
return codecEvent;
}
}
Crc32Handler
Crc32Handler主要用于校验统一协议请求的数据完整性,在解析出完整消息包长度数据之后,在解码为DefaultRequest之前,会先获取实际的数据包数据,计算crc32校验和,再和消息尾部传入的校验和进行比对,如果一致,说明校验通过,否则校验失败。而在请求结束发送相应时,又会对数据计算校验和,放在消息包尾中,以便客户端获取校验。
看看代码的具体实现:
public class Crc32Handler extends SimpleChannelHandler {
private static final Logger logger = LoggerLoader.getLogger(Crc32Handler.class);
private static ThreadLocal<Adler32> adler32s = new ThreadLocal<Adler32>();
private CodecConfig codecConfig;
public Crc32Handler(CodecConfig codecConfig) {
this.codecConfig = codecConfig;
}
// 上游接收数据处理
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() == null || !(e.getMessage() instanceof CodecEvent)) {
return;
}
CodecEvent codecEvent = (CodecEvent) e.getMessage();
if (codecEvent.isValid()) {
if (codecEvent.isUnified()) {
// 如果是统一协议,需要进行校验
if (!doUnChecksum(e.getChannel(), codecEvent)) {
// 校验失败
codecEvent.setIsValid(false);
}
}
}
// 向上游发送消息接收事件
Channels.fireMessageReceived(ctx, codecEvent, e.getRemoteAddress());
}
// 下游发送数据前处理
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (!(e instanceof MessageEvent)) {
// 只对MessageEvent添加校验
ctx.sendDownstream(e);
return;
}
MessageEvent evt = (MessageEvent) e;
if (!(evt.getMessage() instanceof CodecEvent)) {
// 只对CodecEvent添加校验
ctx.sendDownstream(evt);
return;
}
CodecEvent codecEvent = (CodecEvent) evt.getMessage();
if (codecEvent.isUnified()) {
// 只对统一协议请求添加校验
ChannelBuffer buffer = doChecksum(e.getChannel(), codecEvent);
codecEvent.setBuffer(buffer);
write(ctx, evt.getFuture(), codecEvent, evt.getRemoteAddress());
} else {
ctx.sendDownstream(e);
}
}
private boolean doUnChecksum(Channel channel, CodecEvent codecEvent) {
ChannelBuffer frame = codecEvent.getBuffer();
// 获取第3位字节
byte command = frame.getByte(frame.readerIndex() +
CodecConstants._FRONT_COMMAND_LENGTH);
// 第8位是否位1
if ((command & 0x80) == 0x80) {
int frameLength = frame.readableBytes();
// 数据包长度 = 消息包总长度 - 消息尾长度
int dataLength = frameLength - CodecConstants._TAIL_LENGTH;
// 初始化空buffer为数据包长度
ChannelBuffer buffer = frame.factory().getBuffer(dataLength);
// 将消息包中的数据包内容复制到buffer中
buffer.writeBytes(frame, frame.readerIndex(), dataLength);
// 设置需要进行校验
codecEvent.setIsChecksum(true);
// 计算数据包的校验和
int checksum = (int) doChecksum0(buffer, dataLength);
// 获取dataLength位(请求尾首位)的校验和
int _checksum = frame.getInt(dataLength);
if (checksum == _checksum) {
// 校验有效
int totalLength = buffer.getInt(CodecConstants._HEAD_LENGTH);
// 更新消息包中数据包长度的4位字节实际值为数据包体的长度
buffer.setInt(CodecConstants._HEAD_LENGTH, totalLength - CodecConstants._TAIL_LENGTH);
// 更新codecEvent的buffer
codecEvent.setBuffer(buffer);
} else {
// 校验失败
String host = ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress();
logger.error("Checksum failed. data from host:" + host);
return false;
}
}
return true;
}
private ChannelBuffer doChecksum(Channel channel, CodecEvent codecEvent) {
ChannelBuffer frame = codecEvent.getBuffer();
// 是否需要校验数据完整性
boolean isChecksum = codecConfig.isChecksum();
int command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);
int frameLength = frame.readableBytes();
if (isChecksum) {
// 更新指定位数据
command = command | 0x80;
// command
frame.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);
frame.writeByte(command);
// totalLength=消息包长度-头部+尾部
frame.writeInt(frameLength -
CodecConstants._FRONT_LENGTH_ +
CodecConstants._TAIL_LENGTH);
frame.writerIndex(frameLength);
if (!(frame instanceof DynamicChannelBuffer)) {
// 更新buffer
ChannelBuffer buffer = frame.factory().getBuffer(frameLength +
CodecConstants._TAIL_LENGTH);
buffer.writeBytes(frame, frame.readerIndex(), frameLength);
frame = buffer;
}
// 计算校验和
long checksum = doChecksum0(frame, frameLength);
frame.writeInt((int) checksum);
} else {
//command=127
command = command & 0x7f;
frame.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);
frame.writeByte(command);
frame.writerIndex(frameLength);
}
return frame;
}
// crc32校验和计算
private long doChecksum0(ChannelBuffer frame, int frameLength) {
//checksum
Adler32 adler32 = adler32s.get();
if (adler32 == null) {
adler32 = new Adler32();
adler32s.set(adler32);
}
adler32.reset();
adler32.update(frame.array(), 0, frameLength);
return adler32.getValue();
}
}
CompressHandler
CompressHandler构造函数传入了一个CodecConfig,用来定义压缩的相关属性,在接收请求和返回响应经过CompressHandler时,会根据配置尝试对消息包的正文部分进行压缩,压缩之后需要更新消息包正文内容的长度,先看CodeConfig实现:
public class CodecConfig {
private static final Logger logger = LoggerLoader.getLogger(CodecConfig.class);
private ConfigManager configManager;
// 是否允许压缩
private volatile boolean compressed;
// 压缩类型
private volatile CompressType compressType;
// 压缩最小阈值,只有数据包大于指定阈值才进行压缩
private volatile int compressThreshold;
// 是否校验数据完整性
private volatile boolean checksum;
// 从配置中心初始化相关配置,并注册监听器,监听配置修改
public CodecConfig(ConfigManager configManager) {
// 指定配置中心
this.configManager = configManager;
// 获取配置属性并解析
this.compressed = this.configManager.getBooleanValue(Constants.KEY_CODEC_COMPRESS_ENABLE,
Constants.DEFAULT_CODEC_COMPRESS_ENABLE);
this.compressType = getCompressType((byte) this.configManager.getIntValue(Constants.KEY_CODEC_COMPRESS_TYPE,
Constants.DEFAULT_CODEC_COMPRESS_TYPE));
this.compressThreshold = this.configManager.getIntValue(Constants.KEY_CODEC_COMPRESS_THRESHOLD,
Constants.DEFAULT_CODEC_COMPRESS_THRESHOLD);
this.checksum = this.configManager.getBooleanValue(Constants.KEY_CODEC_CHECKSUM_ENABLE,
Constants.DEFAULT_CODEC_CHECKSUM_ENABLE);
// 注册配置变更监听器
configManager.registerConfigChangeListener(new InnerConfigChangeListener());
}
public boolean isCompress(int frameSize) {
if (compressed) {
// 只有数据包大于指定阈值才进行压缩
if (frameSize > compressThreshold) {
return true;
}
}
return false;
}
public CompressType getCompressType() {
return compressType;
}
// 根据配置code,获取具体的压缩类型枚举成员
private final CompressType getCompressType(byte code) {
CompressType compressType = CompressType.None;
try {
compressType = CompressType.getCompressType(code);
} catch (Exception e) {
logger.error("Invalid compressType. code:" + code, e);
}
return compressType;
}
public boolean isChecksum() {
return checksum;
}
// 监听更新相关压缩配置
private class InnerConfigChangeListener implements ConfigChangeListener {
@Override
public void onKeyUpdated(String key, String value) {
if (key.endsWith(Constants.KEY_CODEC_COMPRESS_ENABLE)) {
try {
compressed = Boolean.valueOf(value);
} catch (RuntimeException e) {
}
} else if (key.endsWith(Constants.KEY_CODEC_COMPRESS_TYPE)) {
try {
compressType = getCompressType(Byte.valueOf(value));
} catch (RuntimeException e) {
}
} else if (key.endsWith(Constants.KEY_CODEC_COMPRESS_THRESHOLD)) {
try {
compressThreshold = Integer.valueOf(value);
} catch (RuntimeException e) {
}
} else if (key.endsWith(Constants.KEY_CODEC_CHECKSUM_ENABLE)) {
try {
checksum = Boolean.valueOf(value);
} catch (RuntimeException e) {
}
}
}
@Override
public void onKeyAdded(String key, String value) {
// TODO Auto-generated method stub
}
@Override
public void onKeyRemoved(String key) {
// TODO Auto-generated method stub
}
}
}
再看看CompressHandler的具体实现:
public class CompressHandler extends SimpleChannelHandler {
private static Compress gZipCompress = CompressFactory.getGZipCompress();
private static Compress snappyCompress = CompressFactory.getSnappyCompress();
private CodecConfig codecConfig;
public CompressHandler(CodecConfig codecConfig) {
this.codecConfig = codecConfig;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() == null || !(e.getMessage() instanceof CodecEvent)) {
return;
}
CodecEvent codecEvent = (CodecEvent) e.getMessage();
if (codecEvent.isValid()) {
if (codecEvent.isUnified()) {
// 统一协议请求
ChannelBuffer buffer = doUnCompress(e.getChannel(), codecEvent);
codecEvent.setBuffer(buffer);
}
}
Channels.fireMessageReceived(ctx, codecEvent, e.getRemoteAddress());
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (!(e instanceof MessageEvent)) {
ctx.sendDownstream(e);
return;
}
MessageEvent evt = (MessageEvent) e;
if (!(evt.getMessage() instanceof CodecEvent)) {
ctx.sendDownstream(evt);
return;
}
CodecEvent codecEvent = (CodecEvent) evt.getMessage();
if (codecEvent.isUnified()) {
ChannelBuffer buffer = doCompress(e.getChannel(), codecEvent);
codecEvent.setBuffer(buffer);
write(ctx, evt.getFuture(), codecEvent, evt.getRemoteAddress());
} else {
ctx.sendDownstream(e);
}
}
private ChannelBuffer doUnCompress(Channel channel, CodecEvent codecEvent)
throws IOException {
ChannelBuffer frame = codecEvent.getBuffer();
// 获取command值
byte command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);
//获取command的第6、7位
short compress = (short) (command & 0x60);
if (compress == 0x00) {
return frame;
}
// 获取数据包体长度
int totalLength = frame.getInt(frame.readerIndex() + CodecConstants._HEAD_LENGTH);
// 减去头部值
int compressLength = totalLength - CodecConstants._HEAD_FIELD_LENGTH;
byte[] in;
byte[] out = null;
ChannelBuffer result;
switch (compress) {
case 0x00: // 没有压缩处理,直接返回
return frame;
case 0x20: // 解压缩处理
in = new byte[compressLength];
// 减去消息头长度
frame.getBytes(frame.readerIndex() + CodecConstants._FRONT_LENGTH, in);
// 使用snappy压缩
out = snappyCompress.unCompress(in);
codecEvent.setIsCompress(true);
break;
case 0x40: // 和0X20类似处理,只是压缩方式不同
in = new byte[compressLength];
frame.getBytes(frame.readerIndex() + CodecConstants._FRONT_LENGTH, in);
// 使用gZip压缩
out = gZipCompress.unCompress(in);
codecEvent.setIsCompress(true);
break;
case 0x60:
throw new IllegalArgumentException("Invalid compress type.");
}
// 实际长度是解压缩后长度+头部长度
int _totalLength = CodecConstants._HEAD_FIELD_LENGTH + out.length;
result = channel.getConfig().getBufferFactory().getBuffer(
_totalLength + CodecConstants._FRONT_LENGTH_);
// 写长度
result.writeBytes(frame, frame.readerIndex(), CodecConstants._HEAD_LENGTH);
result.writeInt(_totalLength);
// 写解压缩的数据包
result.writeBytes(frame, frame.readerIndex() + CodecConstants._FRONT_LENGTH_,
CodecConstants._HEAD_FIELD_LENGTH);
result.writeBytes(out);
return result;
}
private ChannelBuffer doCompress(Channel channel, CodecEvent codecEvent)
throws IOException {
ChannelBuffer frame = codecEvent.getBuffer();
// 取第三位字节command
int command = frame.getByte(CodecConstants._FRONT_COMMAND_LENGTH);
//compress
ChannelBuffer result = frame;
int frameLength = frame.readableBytes();
if (codecConfig.isCompress(frameLength)) {
CompressType compressType = codecConfig.getCompressType();
// 设置command第6、7位来定义压缩类型,并进行相应的压缩操作
switch (compressType) {
case None:
command = command | 0x00;
break;
case Snappy:
command = command | 0x20;
result = doCompress0(channel, frame, frameLength, snappyCompress);
break;
case Gzip:
command = command | 0x40;
result = doCompress0(channel, frame, frameLength, gZipCompress);
break;
}
} else {
// 默认不压缩
command = command | 0x00;
}
// 更新command位值
int oldWriteIndex = result.writerIndex();
result.writerIndex(CodecConstants._FRONT_COMMAND_LENGTH);
result.writeByte(command);
result.writerIndex(oldWriteIndex);
return result;
}
private ChannelBuffer doCompress0(Channel channel, ChannelBuffer frame,
int frameLength, Compress compress)
throws IOException {
ChannelBuffer result;
// 数据体长度位消息包长度减去请求头
int bodyLength = frameLength - CodecConstants._FRONT_LENGTH;
byte[] in = new byte[bodyLength];
// 复制数据体到in
frame.getBytes(CodecConstants._FRONT_LENGTH, in, 0, bodyLength);
// 调用传入的压缩器类型完成压缩
byte[] out = compress.compress(in);
byte[] lengthBuf = new byte[CodecConstants._HEAD_FIELD_LENGTH];
// 获取frame第8~10位到lengBuf
frame.getBytes(CodecConstants._FRONT_LENGTH_, lengthBuf, 0, lengthBuf.length);
// 压缩后的长度+2
int totalLength = out.length + lengthBuf.length;
// 压缩后的长度+2+8
int _frameLength = totalLength + CodecConstants._FRONT_LENGTH_;
result = dynamicBuffer(_frameLength, channel.getConfig().getBufferFactory());
// 写入前4位
result.writeBytes(frame, frame.readerIndex(), CodecConstants._HEAD_LENGTH);
// 写入4~8位
result.writeInt(totalLength);
// 写入8~10位
result.writeBytes(lengthBuf);
// 写入压缩内容
result.writeBytes(out);
return result;
}
}
ProviderDecoder
在完成消息包体解析、消息解压缩等处理后,可以开始真正对数据包解码,以解析出DefaultRequest请求对象。ProviderDecoder继承了pigeon实现的AbstractDecoder,而AbstractDecoder又继承netty实现的OneToOneDecoder,具体继承类型如下所示:
在Netty的OneToOneDecoder中,实现了ChannelUpstreamHandler接口的handleUpstream方法,在内部调用了抽象方法decode来尝试对消息进行解码,如果解码前后,消息未发生变化,会继续委托给上游处理,如果解码结果为null,则丢弃本消息,否则会调用Channels#fireMessageReceived方法触发上游方法调用。而抽象模版方法decode在AbstractDecoder中有了实现,可以先看AbstractDecoder的实现:
public abstract class AbstractDecoder extends OneToOneDecoder {
private static final Logger logger = LoggerLoader.getLogger(AbstractDecoder.class);
@Override
public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
throws Exception {
if (msg == null || !(msg instanceof CodecEvent)) {
return null;
}
CodecEvent codecEvent = (CodecEvent) msg;
if (codecEvent.isValid()) {
Object message = null;
if (codecEvent.isUnified()) {
// 解码统一协议请求
message = _doDecode(ctx, channel, codecEvent);
codecEvent.setInvocation((InvocationSerializable) message);
} else {
// 解码非统一协议请求
message = doDecode(ctx, channel, codecEvent);
codecEvent.setInvocation((InvocationSerializable) message);
}
}
return codecEvent;
}
protected Object doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent)
throws IOException {
Object msg = null;
ChannelBuffer buffer = codecEvent.getBuffer();
// 忽略头部两位(用于确认是否为统一协议的)
buffer.skipBytes(CodecConstants.MEGIC_FIELD_LENGTH);
// 第3位表示序列化方式
byte serialize = buffer.readByte();
Long sequence = null;
try {
// 4-8位表示数据体长
int totalLength = buffer.readInt();
// 总消息包长=消息体+头部
int frameLength = totalLength + CodecConstants.FRONT_LENGTH;
// 数据体长度 = 消息体 - 尾部11位长度
int bodyLength = (totalLength - CodecConstants.TAIL_LENGTH);
// 获取数据体长度,不包括尾部
ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), bodyLength);
buffer.readerIndex(buffer.readerIndex() + bodyLength);
// 获取8位***
sequence = buffer.readLong();
// 跳过3位拓展位
buffer.skipBytes(CodecConstants.EXPAND_FIELD_LENGTH);
//deserialize
ChannelBufferInputStream is = new ChannelBufferInputStream(frame);
// 根据传入的序列化类型调用抽象方法完成序列化
msg = deserialize(serialize, is);
//after
doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());
} catch (Throwable e) {
SerializationException se = new SerializationException(e);
try {
if (sequence != null) {
// 调用子类实现的抽象模版方法处理失败响应
doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(sequence.longValue(),
serialize, se));
}
logger.error("Deserialize failed. host:"
+ ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()
+ "\n" + e.getMessage(), se);
} catch (Throwable t) {
logger.error("[doDecode] doFailResponse failed.", t);
}
}
return msg;
}
protected Object _doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent) throws IOException {
Object msg = null;
ChannelBuffer buffer = codecEvent.getBuffer();
try {
//magic
buffer.skipBytes(CodecConstants._MEGIC_FIELD_LENGTH);
//version
buffer.readByte();
//serialize
byte serialize = (byte) (buffer.readByte() & 0x1f);
serialize = SerializerFactory.convertToSerialize(serialize);
int totalLength = buffer.readInt();
int frameLength = totalLength + CodecConstants._FRONT_LENGTH_;
ChannelBuffer frameBody = extractFrame(buffer, buffer.readerIndex(), totalLength);
buffer.readerIndex(buffer.readerIndex() + totalLength);
ChannelBufferInputStream is = new ChannelBufferInputStream(frameBody);
//deserialize
msg = deserialize(serialize, is);
//doAfter
doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());
} catch (Throwable e) {
logger.error("Deserialize failed. host:"
+ ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()
+ "\n" + e.getMessage(), e);
}
return msg;
}
// 获取buffer的一个子序列,从index开始,长度位length
protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {
ChannelBuffer frame = buffer.slice(index, length);
return frame;
}
private Object doAfter(Channel channel,
Object msg,
byte serialize,
int frameLength,
long receiveTime)
throws IOException {
if (msg instanceof InvocationSerializable) {
InvocationSerializable msg_ = (InvocationSerializable) msg;
int msgType = msg_.getMessageType();
// 注入size属性
if (msgType == Constants.MESSAGE_TYPE_SERVICE && frameLength > 0) {
msg_.setSize(frameLength);
}
// 注入序列化类型
msg_.setSerialize(serialize);
// 调用子类实现的抽象消息初始化方法
doInitMsg(msg, channel, receiveTime);
}
return msg;
}
protected abstract Object deserialize(byte serializerType, InputStream is);
protected abstract Object doInitMsg(Object message, Channel channel, long receiveTime);
protected abstract void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response);
}
在子类ProviderDecoder中,实现了AbstractDecoder的三个抽象方法:
public class ProviderDecoder extends AbstractDecoder {
@Override
public Object doInitMsg(Object message, Channel channel, long receiveTime) {
if (message == null) {
return null;
}
// 更新请求创建时间
InvocationRequest request = (InvocationRequest) message;
request.setCreateMillisTime(receiveTime);
return request;
}
@Override
public void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {
// 获取Channel并回写客户端
NettyServerChannel nettyChannel = new NettyServerChannel(channel);
nettyChannel.write(null, response);
}
@Override
public Object deserialize(byte serializerType, InputStream is) {
// 根据序列化类型获取指定的序列化工具类来完成序列化工作
Object decoded = SerializerFactory.getSerializer(serializerType).deserializeRequest(is);
return decoded;
}
}
NettyServerHandler
NettyServerHandler继承自SimpleChannelUpstreamHandler,是pigeon服务提供方pipeline中最上游的一个Handler,调用了服务提供方处理逻辑的拦截器链,核心实现如下:
public class NettyServerHandler extends SimpleChannelUpstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// 在SimpleChannelUpstreamHandler基础上,添加一个debug处理
if (log.isDebugEnabled()) {
if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
log.debug(e.toString());
}
}
// 父类根据事件类型,进行向下转型,并交由响应的方法处理,如MessageEvent则调用messageReceived方法
super.handleUpstream(ctx, e);
}
/**
* 服务器端接受到消息
*
* @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.MessageEvent)
*/
@SuppressWarnings("unchecked")
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
// 解密获取真正的InvocationRequest
CodecEvent codecEvent = (CodecEvent) (message.getMessage());
if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
return;
}
InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();
// 用DefaultProviderContext包装InvocationRequest请求
ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
try {
// 调用注册的拦截器链处理请求
this.server.processRequest(request, invocationContext);
} catch (Throwable e) {
String msg = "process request failed:" + request;
// 心跳消息只返回正常的, 异常不返回
if (request.getCallType() == Constants.CALLTYPE_REPLY
&& request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
ctx.getChannel().write(ProviderUtils.createFailResponse(request, e));
}
log.error(msg, e);
}
}
}
ProviderEncoder
ProviderEncoder继承自AbstractEncoder,用于对消息响应进行加密处理,类似于ProviderDecoder的实现,下面展示其继承类图:
从pigeon实现从下往上看,OneToOneEncoder类似前面的OneToOneDecoder实现,核心编码实现通过调用了内部的抽象方法encode
实现,encode
方法在AbstractEncoder中实现:
public abstract class AbstractEncoder extends OneToOneEncoder {
private static final Logger logger = LoggerLoader.getLogger(AbstractEncoder.class);
public abstract void serialize(byte serializer, OutputStream os, Object obj, Channel channel) throws IOException;
@Override
public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (msg instanceof InvocationSerializable) {
InvocationSerializable _msg = (InvocationSerializable) msg;
try {
ChannelBuffer frame;
CodecEvent codecEvent;
if (msg instanceof UnifiedInvocation) {
// 统一协议请求编码
frame = _doEncode(channel, (UnifiedInvocation) _msg);
// 用CodecEvent包装消息包
codecEvent = new CodecEvent(frame, true);
} else {
// 非统一协议请求编码
frame = doEncode(channel, _msg);
// 用CodecEvent包装消息包
codecEvent = new CodecEvent(frame, false);
}
return codecEvent;
} catch (Exception e) {
SerializationException se = new SerializationException(e);
try {
// 调用子类实现的抽象模版方法处理编码响应
doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(_msg,
_msg.getSerialize(), se));
} catch (Throwable t) {
}
logger.error(e.getMessage(), se);
throw se;
}
} else {
throw new SerializationException("Invalid message format");
}
}
protected ChannelBuffer doEncode(Channel channel, InvocationSerializable msg)
throws IOException {
ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
channel.getConfig().getBufferFactory()));
// 魔数, 0x39(57),0x3A(58)
os.write(CodecConstants.MAGIC);
// 序列化方式
os.writeByte(msg.getSerialize());
// 数据包体大小,包括消息尾部,先写最大值占位
os.writeInt(Integer.MAX_VALUE);
// 调用抽象模版方式序列化
serialize(msg.getSerialize(), os, msg, channel);
ChannelBuffer frame = os.buffer();
// 写入递增***
frame.writeLong(msg.getSequence());
// 写入拓展位
frame.writeBytes(CodecConstants.EXPAND);
// 根据序列化后的数据包体长度更新消息体长度,这里包含消息尾
frame.setInt(CodecConstants.HEAD_LENGTH, frame.readableBytes() -
CodecConstants.FRONT_LENGTH);
doAfter(msg, frame.readableBytes());
return frame;
}
protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg)
throws IOException {
ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
channel.getConfig().getBufferFactory()));
// 魔数, 0xAB(171),0xBA(186)
os.write(CodecConstants._MAGIC);
// 写入协议版本
os.writeByte(msg.getProtocolVersion());
// 写入序列化方式
byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize());
os.writeByte(serialize);
// 数据包体大小,包括消息尾部,先写最大值占位
os.writeInt(Integer.MAX_VALUE);
// 调用抽象模版方式序列化
serialize(msg.getSerialize(), os, msg, channel);
ChannelBuffer frame = os.buffer();
// 根据序列化后的数据包体长度更新消息体长度,非统一协议不包含消息尾
frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() -
CodecConstants._FRONT_LENGTH_);
doAfter(msg, frame.readableBytes());
return frame;
}
private void doAfter(Object msg,
int frameLength) throws IOException {
if (msg instanceof InvocationSerializable) {
InvocationSerializable msg_ = (InvocationSerializable) msg;
int msgType = msg_.getMessageType();
// 注入size属性
if (msgType == Constants.MESSAGE_TYPE_SERVICE && frameLength > 0) {
msg_.setSize(frameLength);
}
}
}
public abstract void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response);
}
再看ProviderEncoder实现了AbstractEncoder的三个方法:
public class ProviderEncoder extends AbstractEncoder {
public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
// 无实际意义的重写实现
Object encoded = super.encode(ctx, channel, msg);
return encoded;
}
@Override
public void doFailResponse(ChannelHandlerContext ctx, Channel channel, InvocationResponse response) {
// 直接写回客户端
Channels.write(channel, response);
}
@Override
public void serialize(byte serializerType, OutputStream os, Object obj, Channel channel) throws IOException {
// 根据序列化方式获取指定的序列化工具类完成序列化操作,将序列化结果写入os中
SerializerFactory.getSerializer(serializerType).serializeResponse(os, obj);
}
}
FramePrepender
FramePrepender也是继承自OneToOneEncoder,重写了内部的encode方法,实现及其简单,判断如果消息类型是CodecEvent,则提取出消息的具体消息字节码内容,具体实现:
public class FramePrepender extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (msg == null || !(msg instanceof CodecEvent)) {
return null;
} else {
// 提取CodecEvent的buffer数据,即实际的消息字节码
return ((CodecEvent) msg).getBuffer();
}
}
}
客户端实现
下面先看NettyClientPipelineFactory的实现:
public class NettyClientPipelineFactory implements ChannelPipelineFactory {
// 客户端连接实例引用
private NettyClient client;
// 通过编解码工厂,获取单例编解码配置
private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();
public NettyClientPipelineFactory(NettyClient client) {
this.client = client;
}
// 初始化pipeline
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("framePrepender", new FramePrepender());
pipeline.addLast("frameDecoder", new FrameDecoder());
pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
pipeline.addLast("invokerDecoder", new InvokerDecoder());
pipeline.addLast("invokerEncoder", new InvokerEncoder());
pipeline.addLast("clientHandler", new NettyClientHandler(this.client));
return pipeline;
}
}
从上面看到,大部分Handler与服务方类型,区别只有3个Handler:FramePrepender,FrameDecoder,NettyClientHandler
对于上面所有的Handler,类似服务方,同样可以分为3类:
- UpStreamHandler
- FrameDecoder
- invokerDecoder
- NettyClientHandler
- DownStreamHandler
- FramePrepender
- invokerEncoder
- 双向Handler
- Crc32Handler
- compressHandler
结合代码分析,最终的调用时序如下所示:
sequenceDiagram
客户端->>FrameDecoder: 开始处理请求
FrameDecoder->>Crc32Handler:decode
Crc32Handler->>compressHandler: messageReceived
compressHandler->>invokerDecoder: messageReceived
invokerDecoder->>NettyClientHandler: AbstractDecoder.decode
NettyClientHandler->>invokerEncoder: handleUpstream
invokerEncoder->>compressHandler: AbstractEncoder.encode
compressHandler->>Crc32Handler: handleDownstream
Crc32Handler->>FramePrepender: handleDownstream
FramePrepender->>客户端: encode
上面的所有Handler中,除了NettyClientHandler,其他Handler的处理逻辑基本和服务端实现是一致的,下面具体看看NettyClientHandler的实现原理:
NettyClientHandler
NettyClientHandler是客户端请求处理链中最顶层的Handler,用于完成处理服务端调用响应的具体逻辑,里面的核心方法实现是messageReceived:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
CodecEvent codecEvent = (CodecEvent) e.getMessage();
if (codecEvent.isValid() && codecEvent.getInvocation() != null) {
// 请求有效且响应消息不为null
client.processResponse((InvocationResponse) codecEvent.getInvocation());
}
}
调用了NettyClient#processResponse方法:
public void processResponse(InvocationResponse response) {
this.responseProcessor.processResponse(response, this);
}
this.responseProcessor是ResponseThreadPoolProcessor实例,继承了AbstractResponseProcessor抽象类,上面调用的是AbstractResponseProcessor#processResponse方法:
public void processResponse(InvocationResponse response, Client client) {
try {
doProcessResponse(response, client);
} catch (Throwable e) {
String error = String.format("process response failed:%s, processor stats:%s", response,
getProcessorStatistics());
logger.error(error, e);
if (monitor != null) {
monitor.logError(error, e);
}
}
}
doProcessResponse是一个抽象方法,在子类ResponseThreadPoolProcessor中实现:
public class ResponseThreadPoolProcessor extends AbstractResponseProcessor {
private static ThreadPool responseProcessThreadPool;
public ResponseThreadPoolProcessor() {
ConfigManager configManager = ConfigManagerLoader.getConfigManager();
int corePoolSize = configManager.getIntValue(Constants.KEY_RESPONSE_COREPOOLSIZE,
Constants.DEFAULT_RESPONSE_COREPOOLSIZE);
int maxPoolSize = configManager.getIntValue(Constants.KEY_RESPONSE_MAXPOOLSIZE,
Constants.DEFAULT_RESPONSE_MAXPOOLSIZE);
int queueSize = configManager.getIntValue(Constants.KEY_RESPONSE_WORKQUEUESIZE,
Constants.DEFAULT_RESPONSE_WORKQUEUESIZE);
responseProcessThreadPool = new DynamicThreadPool("Pigeon-Client-Response-Processor", corePoolSize,
maxPoolSize, queueSize, new CallerRunsPolicy(), false, false);
}
public void stop() {
ThreadPoolUtils.shutdown(responseProcessThreadPool.getExecutor());
}
// 重写父类的抽象模版方法完成具体的处理响应逻辑
public void doProcessResponse(final InvocationResponse response, final Client client) {
Runnable task = new Runnable() {
public void run() {
// 调用ServiceInvocationRepository单例的receiveResponse方法处理接收的请求
ServiceInvocationRepository.getInstance().receiveResponse(response);
}
};
try {
// 由根据配置初始化的线程池来执行相关信息
responseProcessThreadPool.execute(task);
} catch (RejectedExecutionException e) {
String error = String.format("process response failed:%s, processor stats:%s", response,
getProcessorStatistics());
throw new RejectedException(error, e);
}
}
@Override
public String getProcessorStatistics() {
// 获取当前响应处理线程池的实际处理情况
ThreadPoolExecutor e = responseProcessThreadPool.getExecutor();
String stats = String.format(
"response pool size:%d(active:%d,core:%d,max:%d,largest:%),task count:%d(completed:%d),queue size:%d",
e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.getQueue().size());
return stats;
}
@Override
public ThreadPool getResponseProcessThreadPool() {
return responseProcessThreadPool;
}
}
具体处理接收响应的逻辑如下:
ublic void receiveResponse(InvocationResponse response) {
RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
if (invocationBean != null) {
if (logger.isDebugEnabled()) {
logger.debug("received response:" + response);
}
InvocationRequest request = invocationBean.request;
try {
// 处理回调
Callback callback = invocationBean.callback;
if (callback != null) {
Client client = callback.getClient();
if (client != null) {
// 记录请求流出
ServiceStatisticsHolder.flowOut(request, client.getAddress());
}
// 向callback对象注入response
callback.callback(response);
// 通知等待线程接收响应
callback.run();
}
} finally {
invocations.remove(response.getSequence());
}
}
}
callback.run方法会唤醒之前发送请求的拦截器链等待获取响应的线程,并根据callback.callback注入的response对象,来完成对请求响应的处理,具体实现在pigeon的CallbackFuture中,看看两个核心方法实现:
public class CallbackFuture implements Callback, CallFuture {
@Override
public void callback(InvocationResponse response) {
this.response = response;
}
@Override
public void run() {
lock.lock();
try {
this.done = true;
if (condition != null) {
condition.signal();
}
} finally {
lock.unlock();
}
}
}