dubbo源码分析-consumer订阅创建代理

订阅创建代理的第一部分dubbo源码分析-consumer订阅创建代理

红框之前的部分已经分析过,本文分析红框内的部分。

 

 

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//保存并解析了订阅信息

directory.setRegistry(registry);

directory.setProtocol(protocol);

// all attributes of REFER_KEY

Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

if (!Constants.ANY_VALUE.equals(url.getServiceInterface())

&& url.getParameter(Constants.REGISTER_KEY, true)) {

//注册consumer订阅信息

registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,

Constants.CHECK_KEY, String.valueOf(false)));

}

//订阅并获取 服务端列表,配置,路由信息。然后连接服务端,并创建执行代理的invoker

directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,

 

Constants.PROVIDERS_CATEGORY

+ "," + Constants.CONFIGURATORS_CATEGORY

+ "," + Constants.ROUTERS_CATEGORY));

//创建容错层

Invoker invoker = cluster.join(directory);

ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);

return invoker;

}

注意方法中黑体的部分,本文将围绕 这几个部分逐个分析

  1. 保存并解析了订阅信息。
  2. 注册consumer订阅信息
  3. 订阅并获取 服务端列表,配置,路由信息。然后连接服务端,并创建执行代理的invoker
  4. 创建容错层

 

RegistryDirectory创建分析

执行 RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL) 时创建,代码如下

RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

directory.setRegistry(registry);

directory.setProtocol(protocol);

 

public RegistryDirectory(Class<T> serviceType, URL url) {

super(url);

if (serviceType == null)

throw new IllegalArgumentException("service type is null.");

if (url.getServiceKey() == null || url.getServiceKey().length() == 0)

throw new IllegalArgumentException("registry serviceKey is null.")

this.serviceType = serviceType;

this.serviceKey = url.getServiceKey();

//解析引用参数 application=young-app&default.timeout=30000000&dubbo=2.6.2&interface=tuling.dubbo.server.UserService&loadbalance=consistenthash&methods=getUser&pid=25060&qos.enable=false&register.ip=169.254.23.140&side=consumer&timeout=30000000&timestamp=1573355812993

this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));

//url.setPath(url.getServiceInterface()).clearParameters()结果: zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService

//addParameters 将引用参数拼接在zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService后面

this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);

String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");

this.multiGroup = group != null && ("*".equals(group) || group.contains(","));//获取是否 配置多组

String methods = queryMap.get(Constants.METHODS_KEY);

this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);//解析方法名数组

}

创建 RegistryDirectory的过程就是保存并解析了订阅信息。

 

RegistryDirectory<T>.mergeUrl(URL) 分析

private URL mergeUrl(URL providerUrl) {

providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters 合并 consumer端的参数

 

List<Configurator> localConfigurators = this.configurators; // local reference

if (localConfigurators != null && !localConfigurators.isEmpty()) {// 通过配置 更改providerUrl,需要详细分析。

for (Configurator configurator : localConfigurators) {

providerUrl = configurator.configure(providerUrl);

}

}

//不检查连接是否成功,总是创建invoker

providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false));

 

// The combination of directoryUrl and override is at the end of notify, which can't be handled here

this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); //合并provider 的参数到overrideDirectoryUrl

 

if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0)// version 1.0 可能出现providerUrl path 为空的情况

&& "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0

//fix by tony.chenl DUBBO-44

String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);

if (path != null) {

int i = path.indexOf('/');

if (i >= 0) {

path = path.substring(i + 1);

}

i = path.lastIndexOf(':');

if (i >= 0) {

path = path.substring(0, i);

}

providerUrl = providerUrl.setPath(path);

}

}

return providerUrl;

}

方法逻辑:合并consumer参数到 provoider 代表的providerUrl

ClusterUtils

将localMap的参数合并到remoteUrl中,将remoteUrl中的 filters and listeners合并到localMap中

public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {

Map<String, String> map = new HashMap<String, String>();

Map<String, String> remoteMap = remoteUrl.getParameters();

 

if (remoteMap != null && remoteMap.size() > 0) {

map.putAll(remoteMap);

//一些参数 必须以localMap为准,移除被provider影响的参数

// Remove configurations from provider, some items should be affected by provider.

map.remove(Constants.THREAD_NAME_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY);

 

map.remove(Constants.THREADPOOL_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADPOOL_KEY);

 

map.remove(Constants.CORE_THREADS_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.CORE_THREADS_KEY);

 

map.remove(Constants.THREADS_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADS_KEY);

 

map.remove(Constants.QUEUES_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.QUEUES_KEY);

 

map.remove(Constants.ALIVE_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.ALIVE_KEY);

 

map.remove(Constants.TRANSPORTER_KEY);

map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY);

}

 

if (localMap != null && localMap.size() > 0) {

map.putAll(localMap);//放入所有本地属性

}

if (remoteMap != null && remoteMap.size() > 0) { 一些属性要以 remoteUrl为准,要重点获取并赋值。

// Use version passed from provider side

String dubbo = remoteMap.get(Constants.DUBBO_VERSION_KEY);

if (dubbo != null && dubbo.length() > 0) {

map.put(Constants.DUBBO_VERSION_KEY, dubbo);

}

String version = remoteMap.get(Constants.VERSION_KEY);

if (version != null && version.length() > 0) {

map.put(Constants.VERSION_KEY, version);

}

String group = remoteMap.get(Constants.GROUP_KEY);

if (group != null && group.length() > 0) {

map.put(Constants.GROUP_KEY, group);

}

String methods = remoteMap.get(Constants.METHODS_KEY);

if (methods != null && methods.length() > 0) {

map.put(Constants.METHODS_KEY, methods);

}

// Reserve timestamp of provider url.

String remoteTimestamp = remoteMap.get(Constants.TIMESTAMP_KEY);

if (remoteTimestamp != null && remoteTimestamp.length() > 0) {

map.put(Constants.REMOTE_TIMESTAMP_KEY, remoteMap.get(Constants.TIMESTAMP_KEY));

}

// Combine filters and listeners on Provider and Consumer 合并 在Provider and Consumer中的 filters and listeners 到本地缓存

String remoteFilter = remoteMap.get(Constants.REFERENCE_FILTER_KEY);

String localFilter = localMap.get(Constants.REFERENCE_FILTER_KEY);

if (remoteFilter != null && remoteFilter.length() > 0

&& localFilter != null && localFilter.length() > 0) {

localMap.put(Constants.REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);

}

String remoteListener = remoteMap.get(Constants.INVOKER_LISTENER_KEY);

String localListener = localMap.get(Constants.INVOKER_LISTENER_KEY);

if (remoteListener != null && remoteListener.length() > 0

&& localListener != null && localListener.length() > 0) {

localMap.put(Constants.INVOKER_LISTENER_KEY, remoteListener + "," + localListener);

}

}

//将合并后的 参数 加入到 provoider 即remoteUrl中

return remoteUrl.clearParameters().addParameters(map);

}

 

Cluster.join()分析。

Cluster.join()比较简单先分析一下。

Cluster 就是容错层,每一个Cluster 依赖对应的invoker。

如FailoverCluster 依赖FailoverClusterInvoker。(我认为存在过度设计的情况)

public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

@Override

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {

return new FailoverClusterInvoker<T>(directory);

}

}

Cluster.join()的过程就是创建对应的ClusterInvoker

AbstractClusterInvoker 的这个方法 执行路由,按照方法名获取invoker列表,依赖directory实现

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {

List<Invoker<T>> invokers = directory.list(invocation);

return invokers;

}

ClusterInvoker体系分析

AbstractClusterInvoker主要功能

  执行路由,按照方法名获取invoker列表,依赖directory实现。

  获取订阅的基本信息如 获取订阅接口类,获取完整订阅信息url

   实现了负载均衡的调用骨架。

AbstractClusterInvoker子类

    实现了容错机制,如FailoverClusterInvoker 实现了失败重试机制

consumer注册流程分析

consumer注册流程在zookeeper上创建了consumers节点和其子节点。

流程如下

RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)

--ZookeeperRegistry(FailbackRegistry).register(URL) // 提供重试功能,抛出异常,加入重试列表

--ZookeeperRegistry.doRegister(URL)// 注册, 写入订阅信息。

下面逐个分析。

RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL) 

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

//...

registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,

Constants.CHECK_KEY, String.valueOf(false)));

//...

}

ZookeeperRegistry(FailbackRegistry).register(URL)

public void register(URL url) {

super.register(url);// 缓存已经注册的 url

failedRegistered.remove(url);//开始注册前,将此次url从注册失败列表中移除

failedUnregistered.remove(url);//开始注册前,将此次url从取消注册失败列表中移除

try {

doRegister(url);//执行注册

} catch (Exception e) {

//。。。

//加入失败列表 ,定时任务重试

failedRegistered.add(url);

}

}

这里中断一**册流程分析,先分析一**册中心的重试任务

FailbackRegistry创建时启动了一个定时任务

public FailbackRegistry(URL url) {

super(url);

int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {

@Override

public void run() {

// Check and connect to the registry

try {

retry();

} catch (Throwable t) { // Defensive fault tolerance

logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);

}

}

}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);//retryPeriod默认5000,也就是5s

}

//retry这个定时任务用于 重试 注册失败,取消注册失败,订阅失败,取消订阅失败,通知失败。

 

继续分析注册流程

protected void doRegister(URL url) {

//url=consumer://169.254.23.140/tuling.dubbo.server.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=tuling.dubbo.server.UserService&loadbalance=roundrobin&methods=findUser,getUser&pid=188256&qos.enable=false&side=consumer&timeout=2147483647&timestamp=1576140510811

try {//目录有就创建,没有也不抛出异常。

zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

//path=/dubbo/tuling.dubbo.server.UserService/consumers/注册信息

} catch (Throwable e) {

throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

}

}

注册的过程 就是创建了consumers目录和注册信息节点。没有在这个节点上订阅,推测这个注册只是用于展示consumer订阅信息用。

写入了consumer节点 如下图

dubbo源码分析-consumer订阅创建代理

consumer订阅流程分析

consumer订阅获取服务端地址列表如下图

dubbo源码分析-consumer订阅创建代理

过程比较复杂,简单的概况就是 生成invokers列表 存到RegistryDirectory实例中了

 

 

订阅关键方法分析

分析订阅中2个步骤,一看怎么有2个订阅,实际上步骤1只是缓存了NotifyListener,步骤2才是真正的订阅逻辑。

调用链如下

ZookeeperRegistry(FailbackRegistry).subscribe(URL, NotifyListener)

--ZookeeperRegistry(AbstractRegistry).subscribe(URL, NotifyListener)

public void subscribe(URL url, NotifyListener listener) {

super.subscribe(url, listener);//步骤1

removeFailedSubscribed(url, listener);

try {

// Sending a subscription request to the server side

doSubscribe(url, listener);///步骤2

} catch (Exception e) {

//...

}

}

步骤1 ZookeeperRegistry(AbstractRegistry).subscribe(URL, NotifyListener) 分析

public void subscribe(URL url, NotifyListener listener) {

//。。。

Set<NotifyListener> listeners = subscribed.get(url);//subscribed: key是已经订阅的URL ,value是多个listenter。

if (listeners == null) {

subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());

listeners = subscribed.get(url);

}

listeners.add(listener);// 缓存已经订阅的listener

}

步骤2,就是前面图中 红框中的doSubscribe是关键方法。

ZookeeperRegistry.doSubscribe(URL, NotifyListener) 为 多个目录添加监听 获取urls

protected void doSubscribe(final URL url, final NotifyListener listener) {

try {

if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {

//通常不会执行这里

} else {

List<URL> urls = new ArrayList<URL>();

//url:consumer://169.254.23.140/tuling.dubbo.server.UserService?application=young-app&category=providers,configurators,routers&dubbo=2.6.2&interface=tuling.dubbo.server.UserService&loadbalance=roundrobin&methods=getUser&pid=26820&qos.enable=false&side=consumer&timeout=2147483647&timestamp=1573304974074

//这里会获取 /dubbo/接口完整类型名 (如:dubbo/tuling.dubbo.server.UserService)下的providers,configurators,routers子节点name,并在这3个目录添加监听器。

 

for (String path : toCategoriesPath(url)) {

// 解析后获取的数组 [/dubbo/tuling.dubbo.server.UserService/providers, /dubbo/tuling.dubbo.server.UserService/configurators, /dubbo/tuling.dubbo.server.UserService/routers]

ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);

if (listeners == null) {

zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());

listeners = zkListeners.get(url);

}

ChildListener zkListener = listeners.get(listener);

if (zkListener == null) {

listeners.putIfAbsent(listener, new ChildListener() {{//listener type is RegistryDirectory, RegistryDirectory实现了 NotifyListener接口的notify方法

public void childChanged(String parentPath, List<String> currentChilds) {//当path目录下的子节点发生变化(增加,删除,修改)执行。

ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));

}

});

zkListener = listeners.get(listener);

}

//这里的create 没有节点则创建,有不报异常。

//服务端没有参数配置的情况下,configurators,routers 节点在此创建

zkClient.create(path, false);

//××关键点××addChildListener后会获取服务端链接url

//这里的url包括服务端连接地址,调用配置,路由配置。

List<String> children = zkClient.addChildListener(path, zkListener);

if (children != null) {

urls.addAll(toUrlsWithEmpty(url, path, children));//过滤掉children 中与url表示要的订阅的不匹配的数据。

}

}

//调用listener,使用获取的urls

notify(url, listener, urls);//子节点为空目录,协议改为empty 添加属性 category=目录名(如routers)

}

} catch (Throwable e) {

throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);

}

}

zookeeper 节点情况

只有 一个server的情况

dubbo源码分析-consumer订阅创建代理

启动了一个consumer后

dubbo源码分析-consumer订阅创建代理

notify 流程分析

ZookeeperRegistry.doSubscribe(URL, NotifyListener) 为 多个目录添加监听 获取urls

--ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List<URL>)转发,记录通知失败的url

---ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List<URL>) 转发

----ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>) 分类整理,并缓存,然后分类调用监听器。

 

 

ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>)

protected void notify(URL url, NotifyListener listener, List<URL> urls) {

//。。。

Map<String, List<URL>> result = new HashMap<String, List<URL>>();

for (URL u : urls) { //将订阅获取的 urls分类整理,注意 这里最重要的分类是providers 分类如:providers,configurators,routers

if (UrlUtils.isMatch(url, u)) {

String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

List<URL> categoryList = result.get(category);

if (categoryList == null) {

categoryList = new ArrayList<URL>();

result.put(category, categoryList);

}

categoryList.add(u);

}

}

if (result.size() == 0) {

return;

}

Map<String, List<URL>> categoryNotified = notified.get(url);

if (categoryNotified == null) {//缓存已经执行 notify的订阅url

notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());

categoryNotified = notified.get(url);

}

//缓存 consumer订阅url ,分类整理list数据

for (Map.Entry<String, List<URL>> entry : result.entrySet()) {

String category = entry.getKey();

List<URL> categoryList = entry.getValue();

categoryNotified.put(category, categoryList);

saveProperties(url);

listener.notify(categoryList);

}

}

 

匹配规则分析

public static boolean isMatch(URL consumerUrl, URL providerUrl) {

String consumerInterface = consumerUrl.getServiceInterface();

String providerInterface = providerUrl.getServiceInterface();

if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface))) //完整接口名相同,或consumerInterface是通配符

return false;

 

if (!isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),

consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {// providerUrl的分类是 consumer 订阅的 如 provider.category=providers consumer.category=providers,configurators,routers

return false;

}

if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)

&& !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {

//providerUrl enabled 等于true或consumerUrl enabled 是通配符

return false;

}

 

String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);

String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);

String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);

 

String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);

String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);

String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);

return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))

&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))

&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));

//分组名相同,版本号相同,类标识相同

}

匹配成功的条件:完整接口名相同,或consumerInterface是通配符,并且providerUrl的分类是 consumer 订阅的,并且providerUrl enabled 等于true或consumerUrl enabled 是通配符,并且分组名相同,版本号相同,类标识相同(默认*,不知用途)

 

RegistryDirectory<T>.refreshInvoker(List<URL>) 附加以前订阅的provider列表,解析出 key为服务调用端订阅字符串,value为invoker, key为方法名,value为invoker的两个map,然后缓存。

private void refreshInvoker(List<URL> invokerUrls) {

if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null

&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//只有一个 empty协议头的 invokerUrl,清理缓存,断开连接

this.forbidden = true; // Forbid to access

this.methodInvokerMap = null; // Set the method invoker map to null

destroyAllInvokers(); // Close all invokers 关闭所有invoker,清理缓存。

} else {

this.forbidden = false; // Allow to access

Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {// 添加之前缓存的url

invokerUrls.addAll(this.cachedInvokerUrls);

} else {

this.cachedInvokerUrls = new HashSet<URL>();// 缓存 provider urls

this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison

}

if (invokerUrls.isEmpty()) {

return;

}

//根据provider信息列表创建invoker并缓存

// newUrlInvokerMap key是合并consumer参数到 provoider url 字符串,value 类型是InvokerDelegate,即代理的invoker

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

//根据方法名分类invoker, key是方法简单名 如 “getUser”,“findUser”

Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map

// state change

// If the calculation is wrong, it is not processed.

if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {

logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));

return;

}

//分组处理。

this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;

this.urlInvokerMap = newUrlInvokerMap;

try {

destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

} catch (Exception e) {

logger.warn("destroyUnusedInvokers error. ", e);

}

}

}

RegistryDirectory<T>.toInvokers(List<URL>) :为新增加的provider创建invoker并缓存,返回provider列表中全部新旧的invoker

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();

if (urls == null || urls.isEmpty()) {

return newUrlInvokerMap;

}

Set<String> keys = new HashSet<String>();

String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);//queryMap 是解析 refer参数时创建

for (URL providerUrl : urls) {

// If protocol is configured at the reference side, only the matching protocol is selected

if (queryProtocols != null && queryProtocols.length() > 0) { //如果 reference 配置了执行的协议,只有匹配的协议被选中

boolean accept = false;

String[] acceptProtocols = queryProtocols.split(",");

for (String acceptProtocol : acceptProtocols) {

if (providerUrl.getProtocol().equals(acceptProtocol)) {

accept = true;

break;

}

}

if (!accept) {

continue;

}

}

if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {

continue;

}

if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {//协议没有实现类,抛出异常

logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()

+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));

continue;

}

//合并consumer参数到 provoider 代表的providerUrl

URL url = mergeUrl(providerUrl);

//获取url 字符串类型

String key = url.toFullString(); // The parameter urls are sorted

if (keys.contains(key)) { // Repeated url

continue;

}

keys.add(key);

// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again

Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference

Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

if (invoker == null) { // 没有缓存 ,再次引用

try {

boolean enabled = true;

if (url.hasParameter(Constants.DISABLED_KEY)) {//获取是否可用配置,默认可用

enabled = !url.getParameter(Constants.DISABLED_KEY, false);

} else {

enabled = url.getParameter(Constants.ENABLED_KEY, true);

}

if (enabled) {//重点,这段代码执行 引用,即创建连接 生成invoker.

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

}

} catch (Throwable t) {

logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);

}

if (invoker != null) { // 缓存新的invoker

newUrlInvokerMap.put(key, invoker);

}

} else {//添加缓存的invoker

newUrlInvokerMap.put(key, invoker);

}

}

keys.clear();

return newUrlInvokerMap;

}

InvokerDelegate 保存了 执行协议引用后创建的invoker,合并后的url,未合并的providerUrl(url 和 providerUrl并未看出有何用途)。

从这个方法看出 InvokerDelegate 提供了执行 引用 invoker的入口。

 public Result invoke(Invocation invocation) throws RpcException {

return invoker.invoke(invocation);

}

 

跟踪后续引用流程

ProtocolFilterWrapper.refer(Class<T>, URL)

--ProtocolListenerWrapper.refer(Class<T>, URL)

---QosProtocolWrapper.refer(Class<T>, URL)

----DubboProtocol.refer(Class<T>, URL)

执行到ProtocolFilterWrapper.refer(Class<T>, URL) 协议头是dubbo.

是不是很眼熟?对,就是 ProtocolListenerWrapper consumer代理创建和连接分析2中的流程。

后面的流程与直连流程相同了。

 

providers子节点增加或删除

providers 增加子节点

ZookeeperRegistry$3.childChanged(String, List<String>) line: 179

只要添加一个provider, 订阅监听收到providers目录下所有的子节点数据

 

看一下这个这个代码位置,就是订阅的时候添加的监听器。

protected void doSubscribe(final URL url, final NotifyListener listener) {

listeners.putIfAbsent(listener, new ChildListener() {

@Override

public void childChanged(String parentPath, List<String> currentChilds) {

ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));//执行的就是这里。

}

});

}

 

RegistryDirectory<T>.toInvokers(List<URL>)

将新添加的provider执行引用后 加入到当前 invoker缓存。

providers 子节点删除,就是provider下线。

前面与添加逻辑相同

RegistryDirectory<T>.refreshInvoker(List<URL>) 处理 稍有不同

{

// 这个方法, 比较新的和旧的 invoker缓存,关闭已经下线的provider 的 invoker

//如果是添加了invoker,这里不会起到作用

destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

}

router 启动或禁用

cousumer启动时轮询订阅了 providers,routers,configurators 三个目录

router 启动或禁用与providers子节点增加或删除 流程相同

当启用或禁用指定router时,执行如下。

ZookeeperRegistry$3.childChanged(String, List<String>)

--ZookeeperRegistry.access$400(ZookeeperRegistry, URL, NotifyListener, List)

----ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List<URL>)

------ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List<URL>)

--------ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>)

 

起到作用的是下面的代码

RegistryDirectory<T>.notify(List<URL>)

{

// configurators

if (configuratorUrls != null && !configuratorUrls.isEmpty()) {//触发了配置更新

this.configurators = toConfigurators(configuratorUrls);

}

// routers

if (routerUrls != null && !routerUrls.isEmpty()) {

List<Router> routers = toRouters(routerUrls);//根据路由配置 ,解析处理路由实例

if (routers != null) { // null - do nothing

setRouters(routers);//加入路由列表

}

}

// providers

refreshInvoker(invokerUrls);//触发的是配置或路由,invokerUrls是空的,后面校验或终止执行

}

 

在执行代理时 调用了router. 多个router循环调用

proxy0.getUser(Integer)

--InvokerInvocationHandler.invoke(Object, Method, Object[])

-----MockClusterInvoker<T>.invoke(Invocation)

-------FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).invoke(Invocation)

---------FailoverClusterInvoker<T>(AbstractClusterInvoker<T>).list(Invocation)

------------RegistryDirectory<T>(AbstractDirectory<T>).list(Invocation)