14 - 远程引用(Dubbo)
腾一部分笔记到****
1. 概述
相比本地引用,远程引用会多做如下几件事情:
-
向注册中心订阅,从而发现服务提供者列表。
-
启动通信客户端,通过它进行远程调用。
2. 远程引用
远程暴露服务的顺序图如下:
整体流程:
->url不等于空 // <dubbo:reference> 上配置的url,有配就优先用这个,不用注册中心的,当然url也可以配注册中心
->将url以分隔符分隔开来
->如果是注册中心
-> urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
->不是,即直连
-> urls.add(ClusterUtils.mergeUrl(url, map));
->url为空从注册中心加载
->List<URL> us = loadRegistries(false); // 加载注册中心
->URLmonitorUrl = loadMonitor(u); // 加载监控平台URL
->map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); // 添加注册中心到URL
->urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注册中心,带上服务引用的配置参数.与服务发布时相似
->urls个数为1
->invoker = refprotocol.refer(interfaceClass, urls.get(0));
-> extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
-> extension.refer(arg0, arg1);
-> ProtocolFilterWrapper.refer() // Aop类,注册协议时走不到
-> ProtocolListenerWrapper.refer()// Aop类,注册协议时走不到
RegistryProtocol.java
-> refer(Class<T> type, URL url)
-> url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 获得真实的注册中心的 URL
-> Registry registry = registryFactory.getRegistry(url); // 获取注册中心
-> Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 获取引用url转map
-> String group = qs.get(Constants.GROUP_KEY); // 获取group
group不为空,用合并的cluster
-> return doRefer(getMergeableCluster(), registry, type, url);
为空,用失败重试cluster(FailoverCluster)
-> return doRefer(cluster, registry, type, url);
-> RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); //创建注册中心目录,实现了NotifyListener接口,zk变更得到通知
-> directory.setRegistry(registry);
-> directory.setProtocol(protocol);
-> registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false))); //向注册中心注册dubbo/com.alibaba.dubbo.demo.DemoService/consumers
-> directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY)); //订阅 /dubbo/com.alibaba.dubbo.demo.DemoService/providers,
/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,
/dubbo/com.alibaba.dubbo.demo.DemoService/routers
-> Invoker invoker = cluster.join(directory); cluster添加目录
MockClusterWrapper.java
<--> return new MockClusterInvoker<T>(directory,this.cluster.join(directory)); //MockClusterInvoker 完成mock 及服务降级等功能
FailoverCluster.java
-> Invoker<T> join(directory)
<----> return new FailoverClusterInvoker<T>(directory);
<-----------> return invoker
->urls个数为多个,循环urls
-> invokers.add(refprotocol.refer(interfaceClass, url));
-> 如果是注册中心 记录registryURL = url,等于说记录最后一个注册中心的url.
->registryURL不为空
->URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
->invoker = cluster.join(new StaticDirectory(u, invokers)); //StaticDirectory静态目录,里面可能包动态目录或者静态,有注册中心用AvailableCluster第一个可用
->registryURL为空
->invoker = cluster.join(new StaticDirectory(invokers));//StaticDirectory静态目录,里面可能包动态目录或者静态 用默认的 failover失败重试
|
在 ->urls个数为1 时我们按注册协议,我们在看下只有一个DubboProtocol情况,即直连情况。
->invoker = refprotocol.refer(interfaceClass, urls.get(0));
-> extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
-> extension.refer(arg0, arg1);
-> ProtocolFilterWrapper.refer() // Aop类,注册协议时走不到
-> ProtocolListenerWrapper.refer()// Aop类,注册协议时走不到
DubboProtocol.java
-> refer(Class<T> type, URL url)
-> DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
<----> return invoker;
DubboInvoker.java //看代码可以看出就是发请求,这里有个知识点同步转移步,后面分析
protected Result doInvoke(final Invocation invocation) {
RpcInvocation inv = (RpcInvocation) invocation;
// 获得方法名
final String methodName = RpcUtils.getMethodName(invocation);
// 获得 `path`( 服务名 ),`version`
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 获得 ExchangeClient 对象
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
// 远程调用
try {
// 获得是否异步调用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 获得是否单向调用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获得超时时间
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 异步无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步又返回值
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 异步转同步
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: "+ getUrl() + ", cause: "+ e.getMessage(), e);
} catch (RemotingExceptione) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: "+ getUrl() + ", cause: "+ e.getMessage(), e);
}
}
|
-> directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY)); //订阅 /dubbo/com.alibaba.dubbo.demo.DemoService/providers,
/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,
/dubbo/com.alibaba.dubbo.demo.DemoService/routers
我们知道clusterInvoer里面存的Directory,而RegistryDirectory是动态的,监听上面几个CATEGORY,我们看看它监控了什么
RegistryDirectory.java
->subscribe(url);
-> setConsumerUrl(url); //设置消费者url
-> registry.subscribe(url, this); // 向注册中心,发起订阅
-> // 几种目录,就循环建几个监听器注册到zkTransport
// 通知时
notify(List<URL> urls)
->urls分类 //不太明白为什么要分类,不都是按类注册的监听器吗。
List<URL> invokerUrls = new ArrayList<URL>(); // 服务提供者 URL 集合
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
->this.configurators = toConfigurators(configuratorUrls); // 处理配置规则 URL 集合
->List<Router> routers = toRouters(routerUrls);
->List<Configurator> localConfigurators = this.configurators; // // 合并配置规则,到`directoryUrl` 中,形成`overrideDirectoryUrl` 变量。overrideDirectoryUrl作为最终配置
this.overrideDirectoryUrl= directoryUrl;
if (localConfigurators != null&& !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl= configurator.configure(overrideDirectoryUrl);
}
}
// 重点刷新消费端中服务提供者列表的
// 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
// 2.如果传入的invoker列表不为空,则表示最新的invoker列表
// 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
->refreshInvoker(invokerUrls);
-> invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol() //zk上配置了empty表示清空服务提供者
-> destroyAllInvokers(); 销毁invoker
-> invokerUrls.isEmpty() && this.cachedInvokerUrls != null// 之前有服务提供者,这次改变的是 override规则或route规则,需要比对下
-> invokerUrls.addAll(this.cachedInvokerUrls); //用之前缓存的服务提供者url
-> 否则
this.cachedInvokerUrls= new HashSet<URL>(); //缓存清空
this.cachedInvokerUrls.addAll(invokerUrls); //用新增
<----> 如果invokerUrls.isEmpty() 说明,之前也没有服务提供者,这次通知也没服务提供者,直接返回
-> Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls); //服务提供者转化成invoker
-> tring queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); // 获得引用服务的协议
-> 循环urls
-> 服务提供者protocal不在queryProtocols continue
-> 消费端加载不了这个协议 Protocol.class continue
-> URL url = mergeUrl(providerUrl); //合并参数
String key = url.toFullString();
-> Map<String, Invoker<T>> localUrlInvokerMap= this.urlInvokerMap; //获取缓存的Invoker
-> 以存在直接key,直接将之前的加到newUrlInvokerMap
-> 不存在 invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); //从对应协议取invoker,url是处理过的用refer、override合并的,还有个事providerUrl,所以这里包装一个Invoker来保存下真实providerUrl,这里也可看出配置规则>服务端>provider
-> 添加到newUrlInvokerMap
-> Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); //转成方法对应invoker,为什么更高效?
-> this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; //按分组再弄一下
-> this.urlInvokerMap = newUrlInvokerMap;
-> destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); //将old的中这次推送没有的销毁,那这里看是zk全量通知额?
|
QA.
在调用过程中参数(超时,重试此时之类)到底是谁控制的?
RegistryDirectory中有两个变量
queryMap:用于记录消费之参数
overrideDirectoryUrl:刚启动时等于queryMap,转成的URL
当有configrators配置时,会被notify,循环configrators,处理overrideDirectoryUrl。
当有服务提供者新增时,会被notify,providerUrl先会被queryMap替换,再回被overrideDirectoryUrl处理,然后在进行refernce
所以 控制 配置规则>服务端>provider
调用过程配置了相同的参数是如何处理RegistryDirectory中,有个overrideDirectoryUrl属性,他里面存的就是调用时候的配置