一个学习式的mogoLink框架介绍
https://github.com/penkee/mogoLink
mogoLink是我于2016年开始设计的一个rpc框架,当时只是接触了Netty技术,觉得它非常适合做rpc框架的底层通讯。对于编解码产品,选用的是谷歌的protoBuf,不过苦于它的schema和每个类都得静态编译问题无法解决,然后耽搁下来。而那时候各大公司尚未服务化改造,故开发此框架经验不足,考虑不周。
今年公司开展服务化工作,此时又接触了dubbo,spring cloud相关书籍,系统介绍了设计rpc的理论基础。故而有重新维护此项目。
那么设计一个rpc框架,需要注意哪些地方呢?
- socket通讯框架:一般这个框架目前也只有netty开源的,上手快,性能也杠杠的
- 编解码技术:java序列化、谷歌的protobuf、facebook thift、jboss marshaling、kryo、json、hession、aryo
- 粘包处理:消息定长、消息尾加分隔符、消息头加表示长度的字段、其他复杂的应用层协议
- 服务注册中心:分布式系统用的,管理可用的服务地址
- 负载均衡算法:客户端用的,分散请求不同的服务器
- 限流熔断处理:服务端用,防止请求过多,造成服务中断的
- 监控系统:监控服务调用次数,耗时,客户端连接数等信息
- 日志跟踪系统:由于会级联调用多层rpc,所以要生成唯一标识来跟踪调用链的日志收集系统
上图是本系统的设计流程图,虽然糙了点,但能看明白就行。
- FutureObject类,用于获取未来的对象的。因为netty接收消息是异步的,所以只能用此对象作为沟通工具,当服务端接收到消息时放入此对象里,则监听端就立刻获取到对象,否则超时处理
/**
* @brief 获取未来对象
* @details (必填)
* @author 彭堃
* @date 2016年8月26日下午5:56:29
*/
public class FutureObject<T> {
private T value;
public T get(long outTime) {
if (value == null) {
synchronized (this) {
try {
this.wait(outTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return value;
}
public void set(T value) {
this.value = value;
synchronized (this) {
this.notify();
}
}
}
-
客户端的代理方法:这样所有加@Autowird的接口,spring自动注入成工厂类产生的代理对象,当调用服务方法时,则代码执行代理的请求远程的服务
<bean id="userInfoRemoteService" class="com.eastorm.mogolink.client.proxy.ProxyFactory">
<constructor-arg name="className" value="com.eastorm.mogolink.demo.client.service.api.IUserInfoService" />
</bean>
/**
* 创建动态代理对象
* 动态代理不需要实现接口,但是需要指定接口类型
* @author 慕容恪
*/
public class ProxyFactory implements FactoryBean {
private static final Logger logger = LoggerFactory.getLogger(ProxyFactory.class);
private String className;
public ProxyFactory(String className){
this.className=className;
}
public Object getProxyInstance() throws ClassNotFoundException {
Class target=Class.forName(className);
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//无需代理的父类方法
if("toString".equals(method.getName())){
return method.invoke(proxy,args);
}
BaseMessage req=new BaseMessage();
UUID uuid = UUID.randomUUID();
req.setRequestId(uuid.toString());
List<MethodParam> paramTypes=new ArrayList<>();
if(args!=null&&args.length>0){
for (Object arg : args) {
paramTypes.add(new MethodParam(arg.getClass().getName(),arg));
}
}
req.setParamTypes(paramTypes);
req.setServiceName(className);
req.setMethod(method.getName());
long s=System.currentTimeMillis();
ClientMsgHandler handler= ClientStarter.getHandler();
if(handler==null){
logger.info("请求失败req={},耗时:{}ms",req.getRequestId(),System.currentTimeMillis()-s);
return null;
}
// Request and get the response.
BaseMessage resMsg = handler.getData(req);
if(resMsg!=null&&resMsg.getCode().equals(ServiceCodeEnum.SUCCCESS.getId())){
logger.info("req={},耗时:{}ms",resMsg.getRequestId(),System.currentTimeMillis()-s);
return resMsg.getReturnData();
}else{
logger.info("失败req={},耗时:{}ms",req.getRequestId(),System.currentTimeMillis()-s);
}
return null;
}
});
}
@Override
public Object getObject() throws Exception {
return getProxyInstance();
}
@Override
public Class<?> getObjectType() {
Class target= null;
try {
target = Class.forName(className);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return target;
}
@Override
public boolean isSingleton() {
return true;
}
}
- channel连接池
一个客户端如果只连接一个channel,那岂不是暴殄天物。一个channel是同步阻塞的,所有这里每个客户端要有一个channel连接池。本系统采用是apache的common-pool工具类,来维护channel连接。
- 粘包问题
本框架采用消息头加消息长度字段的方式实现,具体的类是netty自带的LengthFieldBasedFrameDecoder。我们在kryo编码器里加了一行头长度字段。
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, BaseMessage baseMessage, ByteBuf byteBuf) throws Exception {
byte[] data= messageCodec.serialize(baseMessage);
byteBuf.writeShort(data.length);
byteBuf.writeBytes(data);
}
如果能确定编码后不包含分割符,也是可以用分隔符处理的,更节约。
- 消息体的定义:方法支持重载,故需要方法参数的class名
ublic class BaseMessage {
private String requestId;
private String code;
private String msg;
/**
* 返回的信息
*/
private Object returnData;
/**
* 服务名
*/
private String serviceName;
/**
* 方法
*/
private String method;
/**
* 方法的参数类型
* 用来转型
*/
private List<MethodParam> paramTypes;
}