dubbo源码分析12-server代理创建和执行bind

之前 分析 consumer代理创建和发送请求的时候,进行了简单的分层,实际上并不准确, 本文将根据 dubbo开发文档中的分层进行分析,下面列出

整体设计

dubbo源码分析12-server代理创建和执行bind

图例说明:

图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明
config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
protocol 远程调用层:封将 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

代理创建

下面是 serve代理创建-订阅方式的前半部分

dubbo源码分析12-server代理创建和执行bind

ServiceConfig<T>.export()//转发
ServiceConfig<T>.doExport()    //读取配置并检查
ServiceConfig<T>.doExportUrls() //获取一个或多个注册中心的配置,如果有多个协议(通常只有一个dubbo),每个协议执行一次 export
ServiceConfig<T>.doExportUrlsFor1Protocol(ProtocolConfig, List<URL>)     创建目标接口 动态代理invoker, 缓存服务对象exporter
详细步骤如下

  1. 获取 application,module,privoder,protocol 配置
  2. 获取method配置
  3. get /set 各种参数
  4. 没有配置远程注册地址,执行本地export, 否则执行注册。

config 配置层:读取配置,解析spring配置,初始化配置,
细节:多种协议执行暴露服务,创建目标接口 动态代理invoker, 缓存服务对象exporter
proxy 服务代理层:创建目标接口 动态代理invoker,
cluster 路由层;server端没有用到

ProtocolListenerWrapper.export(Invoker<T>)//转发执行 export
ProtocolFilterWrapper.export(Invoker<T>)//转发执行 export
QosProtocolWrapper.export(Invoker<T>)//启动 qos服务 转发执行 export
QosServer服务于telnet访问和 http访问。

这部分只是执行了转发,推测是由于 协议嵌套的原因导致的

RegistryProtocol.export(Invoker<T>) //暴露服务,注册,缓存服务对象exporter
RegistryProtocol.doLocalExport(Invoker<T>) //封装了 ListenerExporterWrapper 和接口代理执行代理invoker ,作用未知
registry注册中心层:conusmer端 获取 providers,routes,configs配置 执行notify流程,server端 创建并缓存接口代理,启动服务,等待接收请求,注册provider url到zookeeper。
ProtocolListenerWrapper.export(Invoker<T>)// 封装监听器
ProtocolFilterWrapper.export(Invoker<T>)//构建调用链
QosProtocolWrapper.export(Invoker<T>)//转发执行 export

下面是之前总结过的
 过滤链层:ProtocolFilterWrapper 构建发送请求时要执行的过滤链
工具层:QosProtocolWrapper  支持 telnet,http 服务
监听器层:ProtocolListenerWrapper  获取invoker,执行自定义的监听器
可以 概括为 工具链层: 构建发送请求时要执行的过滤链,支持 telnet,http 服务, 获取invoker,执行自定义的监听器。

下图是server创建代理的后半部分

dubbo源码分析12-server代理创建和执行bind


DubboProtocol.export(Invoker<T> 缓存exporter,启动server  key 是接口名+端口号 如tuling.dubbo.server.UserService:20880  DubboExporter 创建没啥逻辑, 保存了 invoker 类型是RegistryProtocol$InvokerDelegete
DubboProtocol.openServer(URL) //创建 server,缓存 server到 serverMap中
DubboProtocol.createServer(URL) //添加 心跳 时间间隔 默认60000ms
重点代码分析
 DubboProtocol.export(Invoker<T>) 
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);例:
//key 规则   分组名/接口名:端口号 如 langfang/tuling.dubbo.server.UserService:20880
        // key: tuling.dubbo.server.UserService:20880 没有分组名
// 创建DubboExporter,DubboExporter 用来包装 接口执行引用。
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);//缓存代理DubboExporter,用于 server接受数据时查找和执行。

        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
//回调有关 不知何用
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
//上面的流程是创建了方法执行代理
        openServer(url);//这里 开始启动服务端,组织多层handler的嵌套。

        return exporter;
    }
逻辑总结:
创建了方法执行代理invoker,使用接口名+端口号缓存,开始启动服务端,组织多层handler的嵌套。
DubboProtocol.createServer(URL)     
private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
//...
        try {
            server = Exchangers.bind(url, requestHandler);//执行绑定 创建server
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
//...
        }
        return server;
    }

这层 称为 protocol 远程调用层,还有其他的实现 如HessianProtocol,InjvmProtocol
server远程调用层总结:创建了相应的exporter 并缓存,执行服务启动流程,添加 心跳 时间间隔。
结合consumer引用创建流程分析
consumer引用创建:创建了相应的invoker并缓存,然后执行连接远端服务,由于Hessian连接比较简单 包含在创建invoker 的流程中
server端代理创建: 创建了相应的exporter 并缓存,然后执行服务启动流程,由于Hessian启动比较简单 包含在创建exporter 的流程中。
DubboProtocol 的数据传输依赖于netty, 有一个单独的启动服务流程
HessianProtocol的数据传输依赖于http,比较简单没有单独的流程
由此可知,不同的Protocol 实现  其创建exporter 的处理逻辑不同。
可以总结:protocol 远程调用层 提供不类型协议的实现, 屏蔽了 invoker,exporter的创建细节。


Exchangers.bind(URL, ExchangeHandler) //check  url,handler   添加配置codec=exchange,绑定最内层的handler.
HeaderExchanger.bind(URL, ExchangeHandler)// return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

HeaderExchangeServer嵌套如下图

dubbo源码分析12-server代理创建和执行bind

与consumer比较 没有 嵌套 HeaderExchangeChannel


重点代码分析
   public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//心跳周期间隔时间 默认60000,
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//心跳超时时间, 默认是heartbeat 的3倍
        if (heartbeatTimeout < heartbeat * 2) {//heartbeatTimeout 不能小于heartbeat 的2倍
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeartbeatTimer();//启动心跳检测线程
    }
HeaderExchangeServer的主要用途体现在构造方法,它获取了心跳相关参数,启动心跳线程,这个心跳任务 每隔一段时间向当前server的所有channel(就是所有客户端)发送心跳。
HeaderExchangeHandler
有2个主要的方法 
数据的发送 sent 分析过没什么用
数据的接受 received    request 接受流程 和response接受流程 重要

HeaderExchangeHandler.received(Channel, Object) 
处理  request 接受流程 和response接受流程

    public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
// server端收到的是request
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
//正常同步请求 执行此处,handleRequest是重要的方法
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {//处理响应  consmer端常用
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
//目前不支持
            } else {
                handler.received(exchangeChannel, message);//正常不会执行
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

总结一下交换层在consumer,和server端的作用
consumer:启动心跳,处理心跳请求,实现请求同步等待。
server:启动心跳,处理心跳请求,获取接口执行引用invoker,执行请求。 
交换层 顾名思义 应该是进行了某种交换  在consumer端 将 netty的异步请求 交换为同步请求, 在sever端 请求数据 交换给 接口引用执行。

Transporters.bind(URL, ChannelHandler...)//check  url,handlers..  封装 多个hanlder为ChannelHandlerDispatcher.    
NettyTransporter.bind(URL, ChannelHandler)//  return new NettyServer(url, listener);
NettyServer.<init>(URL, ChannelHandler)    // 生成新的多层嵌套handler
NettyServer(AbstractServer).<init>(URL, ChannelHandler)//创建 ExecutorService executor
NettyServer.doOpen()//执行bind   

关于这一层 dubbo开发手册上的描述如下
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
这个解释已经比较清晰了。
进一步的解释 就是 定义了网络传输层的统一接口, 提供了 netty和mina实现。

重点代码分析
NettyServer.<init>(URL, ChannelHandler)     
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
//wrap 生成新的多层嵌套handler,类型与consumer中handler类型一致
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

serialize 数据序列化层 将在后续的文章中分析。

server代理创建各层功能总结:
config 配置层:读取配置,解析spring配置,初始化配置,
proxy 服务代理层:创建目标接口 动态代理invoker,
cluster 路由层:server端没有用到
工具链层: 构建发送请求时要执行的过滤链,支持 telnet,http 服务, 获取invoker,执行自定义的监听器。
protocol 远程调用层: 提供不类型协议的实现, 屏蔽了 invoker,exporter的创建细节
exchange 交换层:启动心跳,处理心跳请求,获取接口执行引用invoker,执行请求。 
transport 网络传输层:定义了网络传输层的统一接口, 提供了 netty和mina实现.

最后总结:
本文分析了server代理创建过程中各层的功能,并结合consumer进行分析,这样分析, 就能比较深入的理解server代理创建过程中代码,并且能加深记忆。
分层设计是dubbo框架设计的核心,理解其功能,原理,对理解 多个重要流程的源码如 consumer创建代理,conusmer发送流程 ,server创建代理,server接受流程等 有很大的帮助

dubbo源码分析系列 写到这 已经分析了 dubbo大部分的内容 剩下的还有  server处理请求和发送响应,参照前面 consumer的相关部分,能够比较容易分析了。就不再写文章分析了。