dubbo源码分析12-server代理创建和执行bind
之前 分析 consumer代理创建和发送请求的时候,进行了简单的分层,实际上并不准确, 本文将根据 dubbo开发文档中的分层进行分析,下面列出
整体设计
图例说明:
图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。
各层说明
config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
protocol 远程调用层:封将 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
代理创建
下面是 serve代理创建-订阅方式的前半部分
ServiceConfig<T>.export()//转发
ServiceConfig<T>.doExport() //读取配置并检查
ServiceConfig<T>.doExportUrls() //获取一个或多个注册中心的配置,如果有多个协议(通常只有一个dubbo),每个协议执行一次 export
ServiceConfig<T>.doExportUrlsFor1Protocol(ProtocolConfig, List<URL>) 创建目标接口 动态代理invoker, 缓存服务对象exporter
详细步骤如下
- 获取 application,module,privoder,protocol 配置
- 获取method配置
- get /set 各种参数
- 没有配置远程注册地址,执行本地export, 否则执行注册。
config 配置层:读取配置,解析spring配置,初始化配置,
细节:多种协议执行暴露服务,创建目标接口 动态代理invoker, 缓存服务对象exporter
proxy 服务代理层:创建目标接口 动态代理invoker,
cluster 路由层;server端没有用到
ProtocolListenerWrapper.export(Invoker<T>)//转发执行 export
ProtocolFilterWrapper.export(Invoker<T>)//转发执行 export
QosProtocolWrapper.export(Invoker<T>)//启动 qos服务 转发执行 export
QosServer服务于telnet访问和 http访问。
这部分只是执行了转发,推测是由于 协议嵌套的原因导致的
RegistryProtocol.export(Invoker<T>) //暴露服务,注册,缓存服务对象exporter
RegistryProtocol.doLocalExport(Invoker<T>) //封装了 ListenerExporterWrapper 和接口代理执行代理invoker ,作用未知
registry注册中心层:conusmer端 获取 providers,routes,configs配置 执行notify流程,server端 创建并缓存接口代理,启动服务,等待接收请求,注册provider url到zookeeper。
ProtocolListenerWrapper.export(Invoker<T>)// 封装监听器
ProtocolFilterWrapper.export(Invoker<T>)//构建调用链
QosProtocolWrapper.export(Invoker<T>)//转发执行 export
下面是之前总结过的
过滤链层:ProtocolFilterWrapper 构建发送请求时要执行的过滤链
工具层:QosProtocolWrapper 支持 telnet,http 服务
监听器层:ProtocolListenerWrapper 获取invoker,执行自定义的监听器
可以 概括为 工具链层: 构建发送请求时要执行的过滤链,支持 telnet,http 服务, 获取invoker,执行自定义的监听器。
下图是server创建代理的后半部分
DubboProtocol.export(Invoker<T> 缓存exporter,启动server key 是接口名+端口号 如tuling.dubbo.server.UserService:20880 DubboExporter 创建没啥逻辑, 保存了 invoker 类型是RegistryProtocol$InvokerDelegete
DubboProtocol.openServer(URL) //创建 server,缓存 server到 serverMap中
DubboProtocol.createServer(URL) //添加 心跳 时间间隔 默认60000ms
重点代码分析
DubboProtocol.export(Invoker<T>)
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);例:
//key 规则 分组名/接口名:端口号 如 langfang/tuling.dubbo.server.UserService:20880
// key: tuling.dubbo.server.UserService:20880 没有分组名
// 创建DubboExporter,DubboExporter 用来包装 接口执行引用。
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);//缓存代理DubboExporter,用于 server接受数据时查找和执行。
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
//回调有关 不知何用
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//上面的流程是创建了方法执行代理
openServer(url);//这里 开始启动服务端,组织多层handler的嵌套。
return exporter;
}
逻辑总结:
创建了方法执行代理invoker,使用接口名+端口号缓存,开始启动服务端,组织多层handler的嵌套。
DubboProtocol.createServer(URL)
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
//...
try {
server = Exchangers.bind(url, requestHandler);//执行绑定 创建server
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//...
}
return server;
}
这层 称为 protocol 远程调用层,还有其他的实现 如HessianProtocol,InjvmProtocol
server远程调用层总结:创建了相应的exporter 并缓存,执行服务启动流程,添加 心跳 时间间隔。
结合consumer引用创建流程分析
consumer引用创建:创建了相应的invoker并缓存,然后执行连接远端服务,由于Hessian连接比较简单 包含在创建invoker 的流程中
server端代理创建: 创建了相应的exporter 并缓存,然后执行服务启动流程,由于Hessian启动比较简单 包含在创建exporter 的流程中。
DubboProtocol 的数据传输依赖于netty, 有一个单独的启动服务流程
HessianProtocol的数据传输依赖于http,比较简单没有单独的流程
由此可知,不同的Protocol 实现 其创建exporter 的处理逻辑不同。
可以总结:protocol 远程调用层 提供不类型协议的实现, 屏蔽了 invoker,exporter的创建细节。
Exchangers.bind(URL, ExchangeHandler) //check url,handler 添加配置codec=exchange,绑定最内层的handler.
HeaderExchanger.bind(URL, ExchangeHandler)// return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
HeaderExchangeServer嵌套如下图
与consumer比较 没有 嵌套 HeaderExchangeChannel
重点代码分析
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//心跳周期间隔时间 默认60000,
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//心跳超时时间, 默认是heartbeat 的3倍
if (heartbeatTimeout < heartbeat * 2) {//heartbeatTimeout 不能小于heartbeat 的2倍
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeartbeatTimer();//启动心跳检测线程
}
HeaderExchangeServer的主要用途体现在构造方法,它获取了心跳相关参数,启动心跳线程,这个心跳任务 每隔一段时间向当前server的所有channel(就是所有客户端)发送心跳。
HeaderExchangeHandler
有2个主要的方法
数据的发送 sent 分析过没什么用
数据的接受 received request 接受流程 和response接受流程 重要
HeaderExchangeHandler.received(Channel, Object)
处理 request 接受流程 和response接受流程
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// server端收到的是request
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
//正常同步请求 执行此处,handleRequest是重要的方法
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {//处理响应 consmer端常用
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
//目前不支持
} else {
handler.received(exchangeChannel, message);//正常不会执行
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
总结一下交换层在consumer,和server端的作用
consumer:启动心跳,处理心跳请求,实现请求同步等待。
server:启动心跳,处理心跳请求,获取接口执行引用invoker,执行请求。
交换层 顾名思义 应该是进行了某种交换 在consumer端 将 netty的异步请求 交换为同步请求, 在sever端 请求数据 交换给 接口引用执行。
Transporters.bind(URL, ChannelHandler...)//check url,handlers.. 封装 多个hanlder为ChannelHandlerDispatcher.
NettyTransporter.bind(URL, ChannelHandler)// return new NettyServer(url, listener);
NettyServer.<init>(URL, ChannelHandler) // 生成新的多层嵌套handler
NettyServer(AbstractServer).<init>(URL, ChannelHandler)//创建 ExecutorService executor
NettyServer.doOpen()//执行bind
关于这一层 dubbo开发手册上的描述如下
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
这个解释已经比较清晰了。
进一步的解释 就是 定义了网络传输层的统一接口, 提供了 netty和mina实现。
重点代码分析
NettyServer.<init>(URL, ChannelHandler)
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
//wrap 生成新的多层嵌套handler,类型与consumer中handler类型一致
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
serialize 数据序列化层 将在后续的文章中分析。
server代理创建各层功能总结:
config 配置层:读取配置,解析spring配置,初始化配置,
proxy 服务代理层:创建目标接口 动态代理invoker,
cluster 路由层:server端没有用到
工具链层: 构建发送请求时要执行的过滤链,支持 telnet,http 服务, 获取invoker,执行自定义的监听器。
protocol 远程调用层: 提供不类型协议的实现, 屏蔽了 invoker,exporter的创建细节
exchange 交换层:启动心跳,处理心跳请求,获取接口执行引用invoker,执行请求。
transport 网络传输层:定义了网络传输层的统一接口, 提供了 netty和mina实现.
最后总结:
本文分析了server代理创建过程中各层的功能,并结合consumer进行分析,这样分析, 就能比较深入的理解server代理创建过程中代码,并且能加深记忆。
分层设计是dubbo框架设计的核心,理解其功能,原理,对理解 多个重要流程的源码如 consumer创建代理,conusmer发送流程 ,server创建代理,server接受流程等 有很大的帮助
dubbo源码分析系列 写到这 已经分析了 dubbo大部分的内容 剩下的还有 server处理请求和发送响应,参照前面 consumer的相关部分,能够比较容易分析了。就不再写文章分析了。