dubbo源码分析-consumer订阅创建代理
订阅创建代理的第一部分
红框之前的部分已经分析过,本文分析红框内的部分。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//保存并解析了订阅信息。
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//注册consumer订阅信息
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//订阅并获取 服务端列表,配置,路由信息。然后连接服务端,并创建执行代理的invoker
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//创建容错层
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
注意方法中黑体的部分,本文将围绕 这几个部分逐个分析
- 保存并解析了订阅信息。
- 注册consumer订阅信息
- 订阅并获取 服务端列表,配置,路由信息。然后连接服务端,并创建执行代理的invoker
- 创建容错层
RegistryDirectory创建分析
执行 RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL) 时创建,代码如下
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
public RegistryDirectory(Class<T> serviceType, URL url) {
super(url);
if (serviceType == null)
throw new IllegalArgumentException("service type is null.");
if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
throw new IllegalArgumentException("registry serviceKey is null.")
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
//解析引用参数 application=young-app&default.timeout=30000000&dubbo=2.6.2&interface=tuling.dubbo.server.UserService&loadbalance=consistenthash&methods=getUser&pid=25060&qos.enable=false®ister.ip=169.254.23.140&side=consumer&timeout=30000000×tamp=1573355812993
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
//url.setPath(url.getServiceInterface()).clearParameters()结果: zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
//addParameters 将引用参数拼接在zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService后面
this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
this.multiGroup = group != null && ("*".equals(group) || group.contains(","));//获取是否 配置多组
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);//解析方法名数组
}
创建 RegistryDirectory的过程就是保存并解析了订阅信息。
RegistryDirectory<T>.mergeUrl(URL) 分析
private URL mergeUrl(URL providerUrl) {
providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters 合并 consumer端的参数
List<Configurator> localConfigurators = this.configurators; // local reference
if (localConfigurators != null && !localConfigurators.isEmpty()) {// 通过配置 更改providerUrl,需要详细分析。
for (Configurator configurator : localConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}
//不检查连接是否成功,总是创建invoker
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false));
// The combination of directoryUrl and override is at the end of notify, which can't be handled here
this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); //合并provider 的参数到overrideDirectoryUrl
if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0)// version 1.0 可能出现providerUrl path 为空的情况
&& "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0
//fix by tony.chenl DUBBO-44
String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);
if (path != null) {
int i = path.indexOf('/');
if (i >= 0) {
path = path.substring(i + 1);
}
i = path.lastIndexOf(':');
if (i >= 0) {
path = path.substring(0, i);
}
providerUrl = providerUrl.setPath(path);
}
}
return providerUrl;
}
方法逻辑:合并consumer参数到 provoider 代表的providerUrl
ClusterUtils
将localMap的参数合并到remoteUrl中,将remoteUrl中的 filters and listeners合并到localMap中
public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
Map<String, String> map = new HashMap<String, String>();
Map<String, String> remoteMap = remoteUrl.getParameters();
if (remoteMap != null && remoteMap.size() > 0) {
map.putAll(remoteMap);
//一些参数 必须以localMap为准,移除被provider影响的参数
// Remove configurations from provider, some items should be affected by provider.
map.remove(Constants.THREAD_NAME_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY);
map.remove(Constants.THREADPOOL_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADPOOL_KEY);
map.remove(Constants.CORE_THREADS_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.CORE_THREADS_KEY);
map.remove(Constants.THREADS_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADS_KEY);
map.remove(Constants.QUEUES_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.QUEUES_KEY);
map.remove(Constants.ALIVE_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.ALIVE_KEY);
map.remove(Constants.TRANSPORTER_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY);
}
if (localMap != null && localMap.size() > 0) {
map.putAll(localMap);//放入所有本地属性
}
if (remoteMap != null && remoteMap.size() > 0) { 一些属性要以 remoteUrl为准,要重点获取并赋值。
// Use version passed from provider side
String dubbo = remoteMap.get(Constants.DUBBO_VERSION_KEY);
if (dubbo != null && dubbo.length() > 0) {
map.put(Constants.DUBBO_VERSION_KEY, dubbo);
}
String version = remoteMap.get(Constants.VERSION_KEY);
if (version != null && version.length() > 0) {
map.put(Constants.VERSION_KEY, version);
}
String group = remoteMap.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
map.put(Constants.GROUP_KEY, group);
}
String methods = remoteMap.get(Constants.METHODS_KEY);
if (methods != null && methods.length() > 0) {
map.put(Constants.METHODS_KEY, methods);
}
// Reserve timestamp of provider url.
String remoteTimestamp = remoteMap.get(Constants.TIMESTAMP_KEY);
if (remoteTimestamp != null && remoteTimestamp.length() > 0) {
map.put(Constants.REMOTE_TIMESTAMP_KEY, remoteMap.get(Constants.TIMESTAMP_KEY));
}
// Combine filters and listeners on Provider and Consumer 合并 在Provider and Consumer中的 filters and listeners 到本地缓存
String remoteFilter = remoteMap.get(Constants.REFERENCE_FILTER_KEY);
String localFilter = localMap.get(Constants.REFERENCE_FILTER_KEY);
if (remoteFilter != null && remoteFilter.length() > 0
&& localFilter != null && localFilter.length() > 0) {
localMap.put(Constants.REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);
}
String remoteListener = remoteMap.get(Constants.INVOKER_LISTENER_KEY);
String localListener = localMap.get(Constants.INVOKER_LISTENER_KEY);
if (remoteListener != null && remoteListener.length() > 0
&& localListener != null && localListener.length() > 0) {
localMap.put(Constants.INVOKER_LISTENER_KEY, remoteListener + "," + localListener);
}
}
//将合并后的 参数 加入到 provoider 即remoteUrl中
return remoteUrl.clearParameters().addParameters(map);
}
Cluster.join()分析。
Cluster.join()比较简单先分析一下。
Cluster 就是容错层,每一个Cluster 依赖对应的invoker。
如FailoverCluster 依赖FailoverClusterInvoker。(我认为存在过度设计的情况)
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
Cluster.join()的过程就是创建对应的ClusterInvoker
AbstractClusterInvoker 的这个方法 执行路由,按照方法名获取invoker列表,依赖directory实现
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
ClusterInvoker体系分析
AbstractClusterInvoker主要功能
执行路由,按照方法名获取invoker列表,依赖directory实现。
获取订阅的基本信息如 获取订阅接口类,获取完整订阅信息url
实现了负载均衡的调用骨架。
AbstractClusterInvoker子类
实现了容错机制,如FailoverClusterInvoker 实现了失败重试机制
consumer注册流程分析
consumer注册流程在zookeeper上创建了consumers节点和其子节点。
流程如下
RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)
--ZookeeperRegistry(FailbackRegistry).register(URL) // 提供重试功能,抛出异常,加入重试列表
--ZookeeperRegistry.doRegister(URL)// 注册, 写入订阅信息。
下面逐个分析。
RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//...
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
//...
}
ZookeeperRegistry(FailbackRegistry).register(URL)
public void register(URL url) {
super.register(url);// 缓存已经注册的 url
failedRegistered.remove(url);//开始注册前,将此次url从注册失败列表中移除
failedUnregistered.remove(url);//开始注册前,将此次url从取消注册失败列表中移除
try {
doRegister(url);//执行注册
} catch (Exception e) {
//。。。
//加入失败列表 ,定时任务重试
failedRegistered.add(url);
}
}
这里中断一**册流程分析,先分析一**册中心的重试任务
FailbackRegistry创建时启动了一个定时任务
public FailbackRegistry(URL url) {
super(url);
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);//retryPeriod默认5000,也就是5s
}
//retry这个定时任务用于 重试 注册失败,取消注册失败,订阅失败,取消订阅失败,通知失败。
继续分析注册流程
protected void doRegister(URL url) {
//url=consumer://169.254.23.140/tuling.dubbo.server.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=tuling.dubbo.server.UserService&loadbalance=roundrobin&methods=findUser,getUser&pid=188256&qos.enable=false&side=consumer&timeout=2147483647×tamp=1576140510811
try {//目录有就创建,没有也不抛出异常。
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
//path=/dubbo/tuling.dubbo.server.UserService/consumers/注册信息
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
注册的过程 就是创建了consumers目录和注册信息节点。没有在这个节点上订阅,推测这个注册只是用于展示consumer订阅信息用。
写入了consumer节点 如下图
consumer订阅流程分析
consumer订阅获取服务端地址列表如下图
过程比较复杂,简单的概况就是 生成invokers列表 存到RegistryDirectory实例中了
订阅关键方法分析
分析订阅中2个步骤,一看怎么有2个订阅,实际上步骤1只是缓存了NotifyListener,步骤2才是真正的订阅逻辑。
调用链如下
ZookeeperRegistry(FailbackRegistry).subscribe(URL, NotifyListener)
--ZookeeperRegistry(AbstractRegistry).subscribe(URL, NotifyListener)
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);//步骤1
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);///步骤2
} catch (Exception e) {
//...
}
}
步骤1 ZookeeperRegistry(AbstractRegistry).subscribe(URL, NotifyListener) 分析
public void subscribe(URL url, NotifyListener listener) {
//。。。
Set<NotifyListener> listeners = subscribed.get(url);//subscribed: key是已经订阅的URL ,value是多个listenter。
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
listeners.add(listener);// 缓存已经订阅的listener
}
步骤2,就是前面图中 红框中的doSubscribe是关键方法。
ZookeeperRegistry.doSubscribe(URL, NotifyListener) 为 多个目录添加监听 获取urls
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
//通常不会执行这里
} else {
List<URL> urls = new ArrayList<URL>();
//url:consumer://169.254.23.140/tuling.dubbo.server.UserService?application=young-app&category=providers,configurators,routers&dubbo=2.6.2&interface=tuling.dubbo.server.UserService&loadbalance=roundrobin&methods=getUser&pid=26820&qos.enable=false&side=consumer&timeout=2147483647×tamp=1573304974074
//这里会获取 /dubbo/接口完整类型名 (如:dubbo/tuling.dubbo.server.UserService)下的providers,configurators,routers子节点name,并在这3个目录添加监听器。
for (String path : toCategoriesPath(url)) {
// 解析后获取的数组 [/dubbo/tuling.dubbo.server.UserService/providers, /dubbo/tuling.dubbo.server.UserService/configurators, /dubbo/tuling.dubbo.server.UserService/routers]
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {{//listener type is RegistryDirectory, RegistryDirectory实现了 NotifyListener接口的notify方法
public void childChanged(String parentPath, List<String> currentChilds) {//当path目录下的子节点发生变化(增加,删除,修改)执行。
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//这里的create 没有节点则创建,有不报异常。
//服务端没有参数配置的情况下,configurators,routers 节点在此创建
zkClient.create(path, false);
//××关键点××addChildListener后会获取服务端链接url
//这里的url包括服务端连接地址,调用配置,路由配置。
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));//过滤掉children 中与url表示要的订阅的不匹配的数据。
}
}
//调用listener,使用获取的urls
notify(url, listener, urls);//子节点为空目录,协议改为empty 添加属性 category=目录名(如routers)
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
zookeeper 节点情况
只有 一个server的情况
启动了一个consumer后
notify 流程分析
ZookeeperRegistry.doSubscribe(URL, NotifyListener) 为 多个目录添加监听 获取urls
--ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List<URL>)转发,记录通知失败的url
---ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List<URL>) 转发
----ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>) 分类整理,并缓存,然后分类调用监听器。
ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>)
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//。。。
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) { //将订阅获取的 urls分类整理,注意 这里最重要的分类是providers 分类如:providers,configurators,routers
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {//缓存已经执行 notify的订阅url
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
//缓存 consumer订阅url ,分类整理list数据
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
}
}
匹配规则分析
public static boolean isMatch(URL consumerUrl, URL providerUrl) {
String consumerInterface = consumerUrl.getServiceInterface();
String providerInterface = providerUrl.getServiceInterface();
if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface))) //完整接口名相同,或consumerInterface是通配符
return false;
if (!isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),
consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {// providerUrl的分类是 consumer 订阅的 如 provider.category=providers consumer.category=providers,configurators,routers
return false;
}
if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)
&& !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {
//providerUrl enabled 等于true或consumerUrl enabled 是通配符
return false;
}
String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
//分组名相同,版本号相同,类标识相同
}
匹配成功的条件:完整接口名相同,或consumerInterface是通配符,并且providerUrl的分类是 consumer 订阅的,并且providerUrl enabled 等于true或consumerUrl enabled 是通配符,并且分组名相同,版本号相同,类标识相同(默认*,不知用途)
RegistryDirectory<T>.refreshInvoker(List<URL>) 附加以前订阅的provider列表,解析出 key为服务调用端订阅字符串,value为invoker, key为方法名,value为invoker的两个map,然后缓存。
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//只有一个 empty协议头的 invokerUrl,清理缓存,断开连接
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers 关闭所有invoker,清理缓存。
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {// 添加之前缓存的url
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();// 缓存 provider urls
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//根据provider信息列表创建invoker并缓存
// newUrlInvokerMap key是合并consumer参数到 provoider url 字符串,value 类型是InvokerDelegate,即代理的invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
//根据方法名分类invoker, key是方法简单名 如 “getUser”,“findUser”
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
//分组处理。
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
RegistryDirectory<T>.toInvokers(List<URL>) :为新增加的provider创建invoker并缓存,返回provider列表中全部新旧的invoker
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);//queryMap 是解析 refer参数时创建
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) { //如果 reference 配置了执行的协议,只有匹配的协议被选中
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {//协议没有实现类,抛出异常
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//合并consumer参数到 provoider 代表的providerUrl
URL url = mergeUrl(providerUrl);
//获取url 字符串类型
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 没有缓存 ,再次引用
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {//获取是否可用配置,默认可用
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {//重点,这段代码执行 引用,即创建连接 生成invoker.
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // 缓存新的invoker
newUrlInvokerMap.put(key, invoker);
}
} else {//添加缓存的invoker
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
InvokerDelegate 保存了 执行协议引用后创建的invoker,合并后的url,未合并的providerUrl(url 和 providerUrl并未看出有何用途)。
从这个方法看出 InvokerDelegate 提供了执行 引用 invoker的入口。
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
跟踪后续引用流程
ProtocolFilterWrapper.refer(Class<T>, URL)
--ProtocolListenerWrapper.refer(Class<T>, URL)
---QosProtocolWrapper.refer(Class<T>, URL)
----DubboProtocol.refer(Class<T>, URL)
执行到ProtocolFilterWrapper.refer(Class<T>, URL) 协议头是dubbo.
是不是很眼熟?对,就是 ProtocolListenerWrapper consumer代理创建和连接分析2中的流程。
后面的流程与直连流程相同了。
providers子节点增加或删除
providers 增加子节点
ZookeeperRegistry$3.childChanged(String, List<String>) line: 179
只要添加一个provider, 订阅监听收到providers目录下所有的子节点数据
看一下这个这个代码位置,就是订阅的时候添加的监听器。
protected void doSubscribe(final URL url, final NotifyListener listener) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));//执行的就是这里。
}
});
}
RegistryDirectory<T>.toInvokers(List<URL>)
将新添加的provider执行引用后 加入到当前 invoker缓存。
providers 子节点删除,就是provider下线。
前面与添加逻辑相同
RegistryDirectory<T>.refreshInvoker(List<URL>) 处理 稍有不同
{
// 这个方法, 比较新的和旧的 invoker缓存,关闭已经下线的provider 的 invoker
//如果是添加了invoker,这里不会起到作用
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
}
router 启动或禁用
cousumer启动时轮询订阅了 providers,routers,configurators 三个目录
router 启动或禁用与providers子节点增加或删除 流程相同
当启用或禁用指定router时,执行如下。
ZookeeperRegistry$3.childChanged(String, List<String>)
--ZookeeperRegistry.access$400(ZookeeperRegistry, URL, NotifyListener, List)
----ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List<URL>)
------ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List<URL>)
--------ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>)
起到作用的是下面的代码
RegistryDirectory<T>.notify(List<URL>)
{
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {//触发了配置更新
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);//根据路由配置 ,解析处理路由实例
if (routers != null) { // null - do nothing
setRouters(routers);//加入路由列表
}
}
// providers
refreshInvoker(invokerUrls);//触发的是配置或路由,invokerUrls是空的,后面校验或终止执行
}
在执行代理时 调用了router. 多个router循环调用
proxy0.getUser(Integer)
--InvokerInvocationHandler.invoke(Object, Method, Object[])
-----MockClusterInvoker<T>.invoke(Invocation)
-------FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).invoke(Invocation)
---------FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).list(Invocation)
------------RegistryDirectory<T>(AbstractDirectory<T>).list(Invocation)