首页 > 编程语言 >dubbo 源码解析----- 服务引用

dubbo 源码解析----- 服务引用

时间:2022-10-18 18:04:51浏览次数:64  
标签:dubbo url URL 源码 ----- Invoker new null Constants

转自:https://blog.csdn.net/beichen8641/article/details/104815163

 

在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。服务直连的方式仅适合在调试或测试服务的场景下使用,不适合在线上环境使用。因此,本文我将重点分析通过注册中心引用服务的过程。从注册中心中获取服务配置只是服务引用过程中的一环

服务引用原理

服务引用关键配置demo:

 <dubbo:reference id="userApi"  interface="com.czj.rpc.api.UserApi" ></dubbo:reference>

在服务发布源码中已经说了dubbo如何集成spring的原理, 我们现在主要看是哪个解析类负责解析reference标签即可

 1 public class DubboNamespaceHandler extends NamespaceHandlerSupport {
 2 
 3     static {
 4         Version.checkDuplicate(DubboNamespaceHandler.class);
 5     }
 6 
 7     @Override
 8     public void init() {
 9         /**
10         往spring注册不同的标签解析,DubboBeanDefinitionParser是dubbo通用的
11         标签解析类, 里面主要的工作原理为:
12         1、生成 RootBeanDefinition ,  把入参的class赋值到beanClass属性中
13         2、使用parserContext.getRegistry().registerBeanDefinition()
14            往IOC容器注册创建的BeanDefinition 
15         3、根据入参的Class类型为RootBeanDefinition赋值propertyValues
16         **/
17         //省略相关代码...
18         registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
19     }
20 
21 }

Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。下面我们按照 Dubbo 默认配置进行分析,整个分析过程从 ReferenceBean 的 getObject 方法开始。当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。

我们先看ReferenceBean 的 afterPropertiesSet ,Spring在创建ReferenceBean 后, 会先执行该方法

  1 public void afterPropertiesSet() throws Exception {
  2         //检查 ConsumerConfig consumer 如果为空,从IOC容器中获取 ConsumerConfig 类型的bean对象,并赋值给当前对象的 consumer 属性
  3         if (getConsumer() == null) {
  4             Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
  5             if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
  6                 ConsumerConfig consumerConfig = null;
  7                 for (ConsumerConfig config : consumerConfigMap.values()) {
  8                     if (config.isDefault() == null || config.isDefault().booleanValue()) {
  9                         if (consumerConfig != null) {
 10                             throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
 11                         }
 12                         consumerConfig = config;
 13                     }
 14                 }
 15                 if (consumerConfig != null) {
 16                     setConsumer(consumerConfig);
 17                 }
 18             }
 19         }
 20         
 21         //检查 ApplicationConfig application 如果为空,从IOC容器中获取 ApplicationConfig 类型的bean对象,并赋值给当前对象的 application 属性
 22         if (getApplication() == null
 23                 && (getConsumer() == null || getConsumer().getApplication() == null)) {
 24             Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
 25             if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
 26                 ApplicationConfig applicationConfig = null;
 27                 for (ApplicationConfig config : applicationConfigMap.values()) {
 28                     if (config.isDefault() == null || config.isDefault().booleanValue()) {
 29                         if (applicationConfig != null) {
 30                             throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
 31                         }
 32                         applicationConfig = config;
 33                     }
 34                 }
 35                 if (applicationConfig != null) {
 36                     setApplication(applicationConfig);
 37                 }
 38             }
 39         }
 40         
 41         /**
 42              检测 ModuleConfig module 是否为空, 为空的话将从IOC容器从获取
 43              ModuleConfig 类型的bean对象,并赋值给当前对象的 module 属性
 44         **/
 45         if (getModule() == null
 46                 && (getConsumer() == null || getConsumer().getModule() == null)) {
 47             Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
 48             if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
 49                 ModuleConfig moduleConfig = null;
 50                 for (ModuleConfig config : moduleConfigMap.values()) {
 51                     if (config.isDefault() == null || config.isDefault().booleanValue()) {
 52                         if (moduleConfig != null) {
 53                             throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
 54                         }
 55                         moduleConfig = config;
 56                     }
 57                 }
 58                 if (moduleConfig != null) {
 59                     setModule(moduleConfig);
 60                 }
 61             }
 62         }
 63         
 64         
 65         /**
 66              重要:获取注册中心的配置bean
 67              检测 List<RegistryConfig> registries  是否为空, 为空的话将从IOC容器从获取
 68              RegistryConfig 类型的bean对象,并赋值给当前对象的 registries 属性
 69         **/
 70         if ((getRegistries() == null || getRegistries().isEmpty())
 71                 && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())
 72                 && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
 73             Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
 74             if (registryConfigMap != null && registryConfigMap.size() > 0) {
 75                 List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
 76                 for (RegistryConfig config : registryConfigMap.values()) {
 77                     if (config.isDefault() == null || config.isDefault().booleanValue()) {
 78                         registryConfigs.add(config);
 79                     }
 80                 }
 81                 if (registryConfigs != null && !registryConfigs.isEmpty()) {
 82                     super.setRegistries(registryConfigs);
 83                 }
 84             }
 85         }
 86         
 87         
 88         /**
 89              检测 MonitorConfig monitor 是否为空, 为空的话将从IOC容器从获取
 90              MonitorConfig 类型的bean对象,并赋值给当前对象的 monitor 属性
 91         **/
 92         if (getMonitor() == null
 93                 && (getConsumer() == null || getConsumer().getMonitor() == null)
 94                 && (getApplication() == null || getApplication().getMonitor() == null)) {
 95             Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
 96             if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
 97                 MonitorConfig monitorConfig = null;
 98                 for (MonitorConfig config : monitorConfigMap.values()) {
 99                     if (config.isDefault() == null || config.isDefault().booleanValue()) {
100                         if (monitorConfig != null) {
101                             throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
102                         }
103                         monitorConfig = config;
104                     }
105                 }
106                 if (monitorConfig != null) {
107                     setMonitor(monitorConfig);
108                 }
109             }
110         }
111         
112         //检查 <dubbo:reference init="true|false" > init属性是否为true,如果为ture证明是饿汉式,会在此处做服务引用
113         Boolean b = isInit();
114         if (b == null && getConsumer() != null) {
115             b = getConsumer().isInit();
116         }
117         if (b != null && b.booleanValue()) {
118             getObject();
119         }
120     }

Dubbo 选择在afterPropertiesSet 里做其他配置项的依赖注入和校验,并根据init属性判断是否需要提前服务引用

服务引用的入口方法为 ReferenceBean 的 getObject 方法,该方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法。实现代码如下:

 1 public Object getObject() throws Exception {
 2     return get();
 3 }
 4 
 5 public synchronized T get() {
 6     if (destroyed) {
 7         throw new IllegalStateException("Already destroyed!");
 8     }
 9     // 检测 ref 是否为空,为空则通过 init 方法创建
10     if (ref == null) {
11         // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
12         init();
13     }
14     return ref;
15 }

检查配置

Dubbo 提供了丰富的配置,用于调整和优化框架行为,性能等。Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性。配置解析逻辑封装在 ReferenceConfig 的 init 方法中,下面进行分析。

  1 private void init() {
  2     // 避免重复初始化
  3     if (initialized) {
  4         return;
  5     }
  6     initialized = true;
  7     // 检测接口名合法性
  8     if (interfaceName == null || interfaceName.length() == 0) {
  9         throw new IllegalStateException("interface not allow null!");
 10     }
 11 
 12     // 检测 consumer 变量是否为空,为空则创建
 13     checkDefault();
 14     appendProperties(this);
 15     if (getGeneric() == null && getConsumer() != null) {
 16         // 设置 generic
 17         setGeneric(getConsumer().getGeneric());
 18     }
 19 
 20     // 检测是否为泛化接口
 21     if (ProtocolUtils.isGeneric(getGeneric())) {
 22         interfaceClass = GenericService.class;
 23     } else {
 24         try {
 25             // 加载类
 26             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
 27                     .getContextClassLoader());
 28         } catch (ClassNotFoundException e) {
 29             throw new IllegalStateException(e.getMessage(), e);
 30         }
 31         checkInterfaceAndMethods(interfaceClass, methods);
 32     }
 33     
 34     // -------------------------------✨ 分割线1 ✨------------------------------
 35 
 36     // 从系统变量中获取与接口名对应的属性值
 37     String resolve = System.getProperty(interfaceName);
 38     String resolveFile = null;
 39     if (resolve == null || resolve.length() == 0) {
 40         // 从系统属性中获取解析文件路径
 41         resolveFile = System.getProperty("dubbo.resolve.file");
 42         if (resolveFile == null || resolveFile.length() == 0) {
 43             // 从指定位置加载配置文件
 44             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
 45             if (userResolveFile.exists()) {
 46                 // 获取文件绝对路径
 47                 resolveFile = userResolveFile.getAbsolutePath();
 48             }
 49         }
 50         if (resolveFile != null && resolveFile.length() > 0) {
 51             Properties properties = new Properties();
 52             FileInputStream fis = null;
 53             try {
 54                 fis = new FileInputStream(new File(resolveFile));
 55                 // 从文件中加载配置
 56                 properties.load(fis);
 57             } catch (IOException e) {
 58                 throw new IllegalStateException("Unload ..., cause:...");
 59             } finally {
 60                 try {
 61                     if (null != fis) fis.close();
 62                 } catch (IOException e) {
 63                     logger.warn(e.getMessage(), e);
 64                 }
 65             }
 66             // 获取与接口名对应的配置
 67             resolve = properties.getProperty(interfaceName);
 68         }
 69     }
 70     if (resolve != null && resolve.length() > 0) {
 71         // 将 resolve 赋值给 url
 72         url = resolve;
 73     }
 74     
 75     // -------------------------------✨ 分割线2 ✨------------------------------
 76     if (consumer != null) {
 77         if (application == null) {
 78             // 从 consumer 中获取 Application 实例,下同
 79             application = consumer.getApplication();
 80         }
 81         if (module == null) {
 82             module = consumer.getModule();
 83         }
 84         if (registries == null) {
 85             registries = consumer.getRegistries();
 86         }
 87         if (monitor == null) {
 88             monitor = consumer.getMonitor();
 89         }
 90     }
 91     if (module != null) {
 92         if (registries == null) {
 93             registries = module.getRegistries();
 94         }
 95         if (monitor == null) {
 96             monitor = module.getMonitor();
 97         }
 98     }
 99     if (application != null) {
100         if (registries == null) {
101             registries = application.getRegistries();
102         }
103         if (monitor == null) {
104             monitor = application.getMonitor();
105         }
106     }
107     
108     // 检测 Application 合法性
109     checkApplication();
110     // 检测本地存根配置合法性
111     checkStubAndMock(interfaceClass);
112     
113     // -------------------------------✨ 分割线3 ✨------------------------------
114     
115     Map<String, String> map = new HashMap<String, String>();
116     Map<Object, Object> attributes = new HashMap<Object, Object>();
117 
118     // 添加 side、协议版本信息、时间戳和进程号等信息到 map 中
119     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
120     map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
121     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
122     if (ConfigUtils.getPid() > 0) {
123         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
124     }
125 
126     // 非泛化服务
127     if (!isGeneric()) {
128         // 获取版本
129         String revision = Version.getVersion(interfaceClass, version);
130         if (revision != null && revision.length() > 0) {
131             map.put("revision", revision);
132         }
133 
134         // 获取接口方法列表,并添加到 map 中
135         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
136         if (methods.length == 0) {
137             map.put("methods", Constants.ANY_VALUE);
138         } else {
139             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
140         }
141     }
142     map.put(Constants.INTERFACE_KEY, interfaceName);
143     // 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中
144     appendParameters(map, application);
145     appendParameters(map, module);
146     appendParameters(map, consumer, Constants.DEFAULT_KEY);
147     appendParameters(map, this);
148     
149     // -------------------------------✨ 分割线4 ✨------------------------------
150     
151     String prefix = StringUtils.getServiceKey(map);
152     if (methods != null && !methods.isEmpty()) {
153         // 遍历 MethodConfig 列表
154         for (MethodConfig method : methods) {
155             appendParameters(map, method, method.getName());
156             String retryKey = method.getName() + ".retry";
157             // 检测 map 是否包含 methodName.retry
158             if (map.containsKey(retryKey)) {
159                 String retryValue = map.remove(retryKey);
160                 if ("false".equals(retryValue)) {
161                     // 添加重试次数配置 methodName.retries
162                     map.put(method.getName() + ".retries", "0");
163                 }
164             }
165  
166             // 添加 MethodConfig 中的“属性”字段到 attributes
167             // 比如 onreturn、onthrow、oninvoke 等
168             appendAttributes(attributes, method, prefix + "." + method.getName());
169             checkAndConvertImplicitConfig(method, map, attributes);
170         }
171     }
172     
173     // -------------------------------✨ 分割线5 ✨------------------------------
174 
175     // 获取服务消费者 ip 地址
176     String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
177     if (hostToRegistry == null || hostToRegistry.length() == 0) {
178         hostToRegistry = NetUtils.getLocalHost();
179     } else if (isInvalidLocalHost(hostToRegistry)) {
180         throw new IllegalArgumentException("Specified invalid registry ip from property..." );
181     }
182     map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
183 
184     // 存储 attributes 到系统上下文中
185     StaticContext.getSystemContext().putAll(attributes);
186 
187     // 创建代理类
188     ref = createProxy(map);
189 
190     // 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
191     // 并将 ConsumerModel 存入到 ApplicationModel 中
192     ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
193     ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
194 }

上面这段代码主要做了以下事情:

检查是否重复初始化 与 接口的合法性
对必须的配置项进行校验
构造一个Map 遍历 application module consumer methods 自身(ReferenceBean) 所有的get is方法,并且返回值是基本数据类型,并且不为空,则放入map中, 该Map是构造服务代理的顶级配置项
调用createProxy(map) 创建代理对象
接下来我们重点查看createProxy(map) 如何创建代理对象

创建代理对象

  1 private T createProxy(Map<String, String> map) {
  2     URL tmpUrl = new URL("temp", "localhost", 0, map);
  3     final boolean isJvmRefer;
  4     if (isInjvm() == null) {
  5         // url 配置被指定,则不做本地引用
  6         if (url != null && url.length() > 0) {
  7             isJvmRefer = false;
  8         // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
  9         // 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
 10         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
 11             isJvmRefer = true;
 12         } else {
 13             isJvmRefer = false;
 14         }
 15     } else {
 16         // 获取 injvm 配置值
 17         isJvmRefer = isInjvm().booleanValue();
 18     }
 19 
 20     // 本地引用
 21     if (isJvmRefer) {
 22         // 生成本地引用 URL,协议为 injvm
 23         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
 24         // 调用 refer 方法构建 InjvmInvoker 实例
 25         invoker = refprotocol.refer(interfaceClass, url);
 26         
 27     // 远程引用
 28     } else {
 29         // url 不为空,表明用户可能想进行点对点调用
 30         if (url != null && url.length() > 0) {
 31             // 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
 32             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
 33             if (us != null && us.length > 0) {
 34                 for (String u : us) {
 35                     URL url = URL.valueOf(u);
 36                     if (url.getPath() == null || url.getPath().length() == 0) {
 37                         // 设置接口全限定名为 url 路径
 38                         url = url.setPath(interfaceName);
 39                     }
 40                     
 41                     // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
 42                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
 43                         // 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
 44                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
 45                     } else {
 46                         // 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
 47                         // 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
 48                         // 最后将合并后的配置设置为 url 查询字符串中。
 49                         urls.add(ClusterUtils.mergeUrl(url, map));
 50                     }
 51                 }
 52             }
 53         } else {
 54             // 加载注册中心 url
 55             List<URL> us = loadRegistries(false);
 56             if (us != null && !us.isEmpty()) {
 57                 for (URL u : us) {
 58                     URL monitorUrl = loadMonitor(u);
 59                     if (monitorUrl != null) {
 60                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
 61                     }
 62                     // 添加 refer 参数到 url 中,并将 url 添加到 urls 中
 63                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
 64                 }
 65             }
 66 
 67             // 未配置注册中心,抛出异常
 68             if (urls.isEmpty()) {
 69                 throw new IllegalStateException("No such any registry to reference...");
 70             }
 71         }
 72 
 73         // 单个注册中心或服务提供者(服务直连,下同)
 74         if (urls.size() == 1) {
 75             // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
 76             invoker = refprotocol.refer(interfaceClass, urls.get(0));
 77             
 78         // 多个注册中心或多个服务提供者,或者两者混合
 79         } else {
 80             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
 81             URL registryURL = null;
 82 
 83             // 获取所有的 Invoker
 84             for (URL url : urls) {
 85                 // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
 86                 // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
 87                 invokers.add(refprotocol.refer(interfaceClass, url));
 88                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
 89                     registryURL = url;
 90                 }
 91             }
 92             if (registryURL != null) {
 93                 // 如果注册中心链接不为空,则将使用 AvailableCluster
 94                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
 95                 // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
 96                 invoker = cluster.join(new StaticDirectory(u, invokers));
 97             } else {
 98                 invoker = cluster.join(new StaticDirectory(invokers));
 99             }
100         }
101     }
102 
103     Boolean c = check;
104     if (c == null && consumer != null) {
105         c = consumer.isCheck();
106     }
107     if (c == null) {
108         c = true;
109     }
110     
111     // invoker 可用性检查
112     if (c && !invoker.isAvailable()) {
113         throw new IllegalStateException("No provider available for the service...");
114     }
115 
116     // 生成代理类
117     return (T) proxyFactory.getProxy(invoker);
118 }

上面代码的逻辑比较清晰,主要判断是否本地服务引用或者远程服务引用:

一、判断本地服务引用逻辑

检查是否有配置inJvm=true或者配置的scope=local|true
构造一个URL 协议为 injvm:// , 并委派给 Protocol$Adaptive 自适应扩展类去寻找扩展点为 injvm 的 InjvmProtocol.refer() 实例创建Invoker
二、判断远程服务引用逻辑

检查是否配置url属性,不为空证明用户想绕过注册中心进行点对点的直连调用 或者 指定注册中心
将注册中心配置列表转为URL 列表, 并把入参map转为xx=xx&bb=bb的字符串并进行URL编码添加到URL参数中,参数名为refer,
若url列表等于1,则直接根据URL的协议 委派给 Protocol$Adaptive 自适应扩展类实例创建Invoker,
若 urls 列表数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 将多个 Invoker 合并成一个集群Invoker,
最后调用 ProxyFactory 生成代理类

创建Invoker

我们在这里只研究Url列表等于1其使用注册中心发现服务的情况,url内容如下:

registry://10.0.0.77:2181/com.alibaba.dubbo.registry.RegistryService?application=hello-world-consumer&dubbo=2.0.2&pid=17708&refer=application%3Dhello-world-consumer%26dubbo%3D2.0.2%26interface%3Dcom.czj.api.UserApi%26methods%3Dget%26pid%3D17708%26register.ip%3D192.168.233.1%26side%3Dconsumer%26timestamp%3D1580353582836&registry=zookeeper&timestamp=1580353582863

协议为 registry , 我们找到 扩展点为 registry 的 Protocol的实例 RegistryProtocol

 1 public class RegistryProtocol implements Protocol {
 2     @Override
 3     @SuppressWarnings("unchecked")
 4     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
 5         //重新设置url的协议值为registry参数, 此处值应该为 zookeeper 
 6         url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
 7         //寻找扩展点为zookeeper的RegistryFactory实例, ZookeeperRegistryFactory
 8         Registry registry = registryFactory.getRegistry(url);
 9         
10         //若
11         if (RegistryService.class.equals(type)) {
12             return proxyFactory.getInvoker((T) registry, type, url);
13         }
14 
15         //  将 url 的refer参数值转为 Map
16         Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
17         
18         //获取 group 配置,处理相同服务名不同组,此处不研究
19         String group = qs.get(Constants.GROUP_KEY);
20         if (group != null && group.length() > 0) {
21             if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
22                     || "*".equals(group)) {
23                 return doRefer(getMergeableCluster(), registry, type, url);
24             }
25         }
26         
27         //创建Invoker
28         return doRefer(cluster, registry, type, url);
29     }
30 }

上面的代码主要重新构造Url的协议值,并动态获取RegistryFactory实例对象,真正创建Invoker的在doRefer代码块中

 1 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
 2         //构造注册中心服务目录对象
 3         RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
 4         directory.setRegistry(registry);
 5         directory.setProtocol(protocol);
 6         // 把refer查询字符串转为map
 7         Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
 8         /**
 9         重新构造一个协议为comsumer://,参数为refer的URL 示例内容:
10         consumer://192.168.233.1/com.czj.api.UserApi?application=hello-world-consumer&dubbo=2.0.2&interface=com.czj.api.UserApi&methods=get&pid=17708&side=consumer&timestamp=1580353582836
11         **/
12         //
13         URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
14         
15         //检测是否往/dubbo/${服务名}/consumers/consumers 注册,主要用于监控全局服务引用情况
16         if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
17                 && url.getParameter(Constants.REGISTER_KEY, true)) {
18             registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
19                     Constants.CHECK_KEY, String.valueOf(false)));
20         }
21         
22         /**
23             该方法做了以下几个事情:
24             1、为subscribeUrl追加一个参数:category=providers,configurators,routers
25             
26             2、并监听下面几个节点下的内容变化
27             /dubbo/com.czj.api.UserApi/providers
28             /dubbo/com.czj.api.UserApi/configurators
29             /dubbo/com.czj.api.UserApi/routers
30             
31             
32             3、第一次监听会得到监听节点下的所有子节点列表,把节点列表转为URL,并委派给 RegistryDirectory.notify(List<URL> urls) 生成最终的Invoker实例列表
33             
34             
35             
36             
37         **/
38         
39         directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
40                 Constants.PROVIDERS_CATEGORY
41                         + "," + Constants.CONFIGURATORS_CATEGORY
42                         + "," + Constants.ROUTERS_CATEGORY));
43 
44         Invoker invoker = cluster.join(directory);
45         ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
46         return invoker;
47 }
48     
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    public void subscribe(URL url) {
            setConsumerUrl(url);
            //委派给 ZookeeperRegistry 去监听,  第二个参数是监听事件,因为RegistryDirectory 实现了 NotifyListener,所以传递自身进去
            registry.subscribe(url, this);
    }
}
 1 //ZookeeperRegistry 继承于 FailbackRegistry,所以我们进到FailbackRegistry 查看
 2 public abstract class FailbackRegistry extends AbstractRegistry {
 3     @Override
 4     public void subscribe(URL url, NotifyListener listener) {
 5         super.subscribe(url, listener);
 6         removeFailedSubscribed(url, listener);
 7         try {
 8             // 委派给子类实现
 9             doSubscribe(url, listener);
10         } catch (Exception e) {
11             //...省略异常出出力
12         }
13     }
14     
15     
16 }
 1 public class ZookeeperRegistry extends FailbackRegistry{
 2 @Override
 3     protected void doSubscribe(final URL url, final NotifyListener listener) {
 4         try {
 5             if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
 6                 
 7                 //... 省略无关代码
 8             } else {
 9                 //远程服务的url列表, 里面装着的都是dubbo://打头的
10                 List<URL> urls = new ArrayList<URL>();
11                 
12                 /**
13                     获取要监听的路径列表,路径获取规则为: 得到category参数,并进行逗号分隔,进行遍历
14                     ${服务名}/${catgoryItem}
15                     以服务名com.czj.api.UserApi为例:
16                     /dubbo/com.czj.api.UserApi/providers
17                     /dubbo/com.czj.api.UserApi/configurators
18                     /dubbo/com.czj.api.UserApi/routers
19                 **/
20                 for (String path : toCategoriesPath(url)) {
21                     //根据服务引用URL为key得到一个Map<业务监听事件,zookeeper子节点变化监听事件>
22                     ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
23                     //使用putIfAbsent解决并发问题
24                     if (listeners == null) {
25                         zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
26                         listeners = zkListeners.get(url);
27                     }
28                     //查看map是否有缓存
29                     ChildListener zkListener = listeners.get(listener);
30                     if (zkListener == null) {
31                         /**
32                             创建一个zookeeper子节点监听事件并加入缓存,监听事件主要做的事:
33                             1、把子节点内容转为provider URL列表,并校验provider Url对应的接口名、版本号、分组、版本号是否与入参consumer Url是否一致
34                             2、把consumer Url、父节点路径、provider URL列表 委派给ZookeeperRegistry做节点变更事件
35                         **/
36                         
37                         listeners.putIfAbsent(listener, new ChildListener() {
38                             @Override
39                             public void childChanged(String parentPath, List<String> currentChilds) {
40                                 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
41                             }
42                         });
43                         zkListener = listeners.get(listener);
44                     }
45                     //根据路径创建持久化节点
46                     zkClient.create(path, false);
47                     //为节点添加字节点变化事件,并返回已存在的子节点路径列表
48                     List<String> children = zkClient.addChildListener(path, zkListener);
49                     if (children != null) {
50                         //把子节点内容转为provider URL列表,并校验provider Url对应的接口名、版本号、分组、版本号是否与入参consumer Url是否一致
51                         urls.addAll(toUrlsWithEmpty(url, path, children));
52                     }
53                 }
54                 //第一次初始化,需要手动执行节点变更事件
55                 notify(url, listener, urls);
56             }
57         } catch (Throwable e) {
58             throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
59         }
60     }
61     
62     }

上面主要对URL进行解析,得到需要监听事件的节点,并进行监听事件的注册,第一次监听会得到已存在的子节点路径,并会手动执行节点变更事件,接下来我们来看看
变更事件notify()做了什么事情

 1 protected void notify(URL url, NotifyListener listener, List<URL> urls) {
 2         if (url == null) {
 3             throw new IllegalArgumentException("notify url == null");
 4         }
 5         if (listener == null) {
 6             throw new IllegalArgumentException("notify listener == null");
 7         }
 8         if ((urls == null || urls.isEmpty())
 9                 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
10             logger.warn("Ignore empty notify urls for subscribe url " + url);
11             return;
12         }
13         if (logger.isInfoEnabled()) {
14             logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
15         }
16         //对不同类型的URL进行分组
17         Map<String, List<URL>> result = new HashMap<String, List<URL>>();
18         for (URL u : urls) {
19             //再次检测子节点URL 与 消费URL的对应的接口名、版本号、分组、版本号是否一致
20             if (UrlUtils.isMatch(url, u)) {
21                 //获取URL的category的参数,如果为空取providers
22                 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
23                 //加入缓存
24                 List<URL> categoryList = result.get(category);
25                 if (categoryList == null) {
26                     categoryList = new ArrayList<URL>();
27                     result.put(category, categoryList);
28                 }
29                 categoryList.add(u);
30             }
31         }
32         if (result.size() == 0) {
33             return;
34         }
35         Map<String, List<URL>> categoryNotified = notified.get(url);
36         if (categoryNotified == null) {
37             notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
38             categoryNotified = notified.get(url);
39         }
40         
41         //对不同类型的URL进行分批触发事件
42         for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
43             String category = entry.getKey();
44             List<URL> categoryList = entry.getValue();
45             categoryNotified.put(category, categoryList);
46             saveProperties(url);
47             //这里是创建远程调用Invoker的入口,原理是远程节点可能有多个,当有新增或删减时,将会触发远程Invoker的重构事件
48             listener.notify(categoryList);
49         }
50     }
51     

上面主要对远程节点URL进行分组并触发Dubbo NotifyListener 事件,我们入参的NotifyListener是 RegistryDirectory , 我们回到RegistryDirectory查看在里面做了什么

 1     public synchronized void notify(List<URL> urls) {
 2         //服务节点URL列表缓存
 3         List<URL> invokerUrls = new ArrayList<URL>();
 4         //服务路由URL列表缓存
 5         List<URL> routerUrls = new ArrayList<URL>();
 6         //服务配置URL列表缓存
 7         List<URL> configuratorUrls = new ArrayList<URL>();
 8         //判断入参URL的所属类型,并加入所属的缓存
 9         for (URL url : urls) {
10             String protocol = url.getProtocol();
11             String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
12             if (Constants.ROUTERS_CATEGORY.equals(category)
13                     || Constants.ROUTE_PROTOCOL.equals(protocol)) {
14                 routerUrls.add(url);
15             } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
16                     || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
17                 configuratorUrls.add(url);
18             } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
19                 invokerUrls.add(url);
20             } else {
21                 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
22             }
23         }
24         // 处理服务配置
25         if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
26             this.configurators = toConfigurators(configuratorUrls);
27         }
28         // 处理服务路由
29         if (routerUrls != null && !routerUrls.isEmpty()) {
30             List<Router> routers = toRouters(routerUrls);
31             if (routers != null) { // null - do nothing
32                 setRouters(routers);
33             }
34         }
35         List<Configurator> localConfigurators = this.configurators; // local reference
36         // merge override parameters
37         this.overrideDirectoryUrl = directoryUrl;
38         if (localConfigurators != null && !localConfigurators.isEmpty()) {
39             for (Configurator configurator : localConfigurators) {
40                 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
41             }
42         }
43         // 刷新远程Invoker
44         refreshInvoker(invokerUrls);
45     }

上面根据入参的URL的类型,分别追加到对应类型的List, 再进行对应处理, 我们在这里只查看服务节点URL的处理 refreshInvoker(invokerUrls)

 1 private void refreshInvoker(List<URL> invokerUrls) {
 2         //如果Url的数量等于1,并且协议值为empty,则证明无远程服务节点提供服务,需要删除缓存中的所有远程Invoker
 3         if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
 4                 && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
 5             //fobidden标示为ture
 6             this.forbidden = true; 
 7             //清空缓存
 8             this.methodInvokerMap = null;
 9             //销毁Invoker
10             destroyAllInvokers(); 
11         } else {
12             this.forbidden = false; // Allow to access
13             Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
14             if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
15                 invokerUrls.addAll(this.cachedInvokerUrls);
16             } else {
17                 this.cachedInvokerUrls = new HashSet<URL>();
18                 this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
19             }
20             if (invokerUrls.isEmpty()) {
21                 return;
22             }
23             //将URL转为远程Invoker
24             Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
25             //将远程Invoker提供的方法列表转为Map<方法名,List<Invoker>>
26             Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
27             // state change
28             // If the calculation is wrong, it is not processed.
29             if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
30                 logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
31                 return;
32             }
33             this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
34             this.urlInvokerMap = newUrlInvokerMap;
35             try {
36                 //销毁不可用Invoker,如果新节点中的Invoker在历史缓存中不存在,则销毁历史缓存中的Invoker
37                 destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
38             } catch (Exception e) {
39                 logger.warn("destroyUnusedInvokers error. ", e);
40             }
41         }
42 }

上面主要做了将Url转为远程Invoker,并销毁缓存中不可用的Invoker,下面我们看看如何将URL转为远程Invoker :toInvokers(invokerUrls)

 1 private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
 2         Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
 3         if (urls == null || urls.isEmpty()) {
 4             return newUrlInvokerMap;
 5         }
 6         Set<String> keys = new HashSet<String>();
 7         //得到消费者URL的协议值
 8         String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
 9         for (URL providerUrl : urls) {
10             // 如果消费者自定义协议,则需要检查provider的协议值是否支持消费者定义的协议值
11             if (queryProtocols != null && queryProtocols.length() > 0) {
12                 boolean accept = false;
13                 String[] acceptProtocols = queryProtocols.split(",");
14                 //检查是否匹配协议
15                 for (String acceptProtocol : acceptProtocols) {
16                     if (providerUrl.getProtocol().equals(acceptProtocol)) {
17                         accept = true;
18                         break;
19                     }
20                 }
21                 //不匹配则忽略
22                 if (!accept) {
23                     continue;
24                 }
25             }
26             if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
27                 continue;
28             }
29             //检查消费端是否支持provider 协议的 Protocol  扩展点
30             if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
31                 logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
32                         + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
33                 continue;
34             }
35             //把消费端的设置覆盖掉服务端的参数设置, 例如服务端设置的retries=3 ,消费端设置了retries=0,将优先使用消费端的配置
36             URL url = mergeUrl(providerUrl);
37             //得到唯一标示
38             String key = url.toFullString(); // The parameter urls are sorted
39             //检查是否有重复值
40             if (keys.contains(key)) { // Repeated url
41                 continue;
42             }
43             keys.add(key);
44             //得到本地缓存
45             Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
46             //查看本地缓存是否已存在
47             Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
48             //缓存不存在,则需要创建
49             if (invoker == null) { 
50                 try {
51                     //查看远程服务是否禁用参数标示
52                     boolean enabled = true;
53                     if (url.hasParameter(Constants.DISABLED_KEY)) {
54                         enabled = !url.getParameter(Constants.DISABLED_KEY, false);
55                     } else {
56                         enabled = url.getParameter(Constants.ENABLED_KEY, true);
57                     }
58                     if (enabled) {
59                         /**
60                         开始创建远程Invoker, 以dubbo协议URL为例:
61                         dubbo://192.168.251.37:20880/com.czj.rpc.api.UserApi?anyhost=true&application=hello-world-app
62                         &bind.ip=192.168.251.37&bind.port=20880&dubbo=2.0.2&generic=false
63                         &interface=com.czj.rpc.api.UserApi&methods=getName,get,save&pid=41080&side=provider&timestamp=1578635962790
64                         将会调用 DubboProtocol.refer
65                         **/
66                         invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
67                     }
68                 } catch (Throwable t) {
69                     logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
70                 }
71                 //加入缓存
72                 if (invoker != null) { 
73                     newUrlInvokerMap.put(key, invoker);
74                 }
75             } else {
76                 newUrlInvokerMap.put(key, invoker);
77             }
78         }
79         keys.clear();
80         return newUrlInvokerMap;
81     }

上面的代码逻辑比较清晰 , 我们直接跟进DubboProtocol.refer查看

 1 private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
 2         Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
 3         if (urls == null || urls.isEmpty()) {
 4             return newUrlInvokerMap;
 5         }
 6         Set<String> keys = new HashSet<String>();
 7         //得到消费者URL的协议值
 8         String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
 9         for (URL providerUrl : urls) {
10             // 如果消费者自定义协议,则需要检查provider的协议值是否支持消费者定义的协议值
11             if (queryProtocols != null && queryProtocols.length() > 0) {
12                 boolean accept = false;
13                 String[] acceptProtocols = queryProtocols.split(",");
14                 //检查是否匹配协议
15                 for (String acceptProtocol : acceptProtocols) {
16                     if (providerUrl.getProtocol().equals(acceptProtocol)) {
17                         accept = true;
18                         break;
19                     }
20                 }
21                 //不匹配则忽略
22                 if (!accept) {
23                     continue;
24                 }
25             }
26             if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
27                 continue;
28             }
29             //检查消费端是否支持provider 协议的 Protocol  扩展点
30             if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
31                 logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
32                         + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
33                 continue;
34             }
35             //把消费端的设置覆盖掉服务端的参数设置, 例如服务端设置的retries=3 ,消费端设置了retries=0,将优先使用消费端的配置
36             URL url = mergeUrl(providerUrl);
37             //得到唯一标示
38             String key = url.toFullString(); // The parameter urls are sorted
39             //检查是否有重复值
40             if (keys.contains(key)) { // Repeated url
41                 continue;
42             }
43             keys.add(key);
44             //得到本地缓存
45             Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
46             //查看本地缓存是否已存在
47             Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
48             //缓存不存在,则需要创建
49             if (invoker == null) { 
50                 try {
51                     //查看远程服务是否禁用参数标示
52                     boolean enabled = true;
53                     if (url.hasParameter(Constants.DISABLED_KEY)) {
54                         enabled = !url.getParameter(Constants.DISABLED_KEY, false);
55                     } else {
56                         enabled = url.getParameter(Constants.ENABLED_KEY, true);
57                     }
58                     if (enabled) {
59                         /**
60                         开始创建远程Invoker, 以dubbo协议URL为例:
61                         dubbo://192.168.251.37:20880/com.czj.rpc.api.UserApi?anyhost=true&application=hello-world-app
62                         &bind.ip=192.168.251.37&bind.port=20880&dubbo=2.0.2&generic=false
63                         &interface=com.czj.rpc.api.UserApi&methods=getName,get,save&pid=41080&side=provider&timestamp=1578635962790
64                         将会调用 DubboProtocol.refer
65                         **/
66                         invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
67                     }
68                 } catch (Throwable t) {
69                     logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
70                 }
71                 //加入缓存
72                 if (invoker != null) { 
73                     newUrlInvokerMap.put(key, invoker);
74                 }
75             } else {
76                 newUrlInvokerMap.put(key, invoker);
77             }
78         }
79         keys.clear();
80         return newUrlInvokerMap;
81     }

上面的代码逻辑比较清晰 , 我们直接跟进DubboProtocol.refer查看

1 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
2     optimizeSerialization(url);
3     // 创建 DubboInvoker
4     DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
5     invokers.add(invoker);
6     return invoker;
7 }

上面方法看起来比较简单,不过这里有一个调用需要我们注意一下,即 getClients。这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。

 1 private ExchangeClient[] getClients(URL url) {
 2     // 是否共享连接
 3     boolean service_share_connect = false;
 4       // 获取连接数,默认为0,表示未配置
 5     int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
 6     // 如果未配置 connections,则共享连接
 7     if (connections == 0) {
 8         service_share_connect = true;
 9         connections = 1;
10     }
11 
12     ExchangeClient[] clients = new ExchangeClient[connections];
13     for (int i = 0; i < clients.length; i++) {
14         if (service_share_connect) {
15             // 获取共享客户端
16             clients[i] = getSharedClient(url);
17         } else {
18             // 初始化新的客户端
19             clients[i] = initClient(url);
20         }
21     }
22     return clients;
23 }

这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这两个方法。

 1 private ExchangeClient getSharedClient(URL url) {
 2     String key = url.getAddress();
 3     // 获取带有“引用计数”功能的 ExchangeClient
 4     ReferenceCountExchangeClient client = referenceClientMap.get(key);
 5     if (client != null) {
 6         if (!client.isClosed()) {
 7             // 增加引用计数
 8             client.incrementAndGetCount();
 9             return client;
10         } else {
11             referenceClientMap.remove(key);
12         }
13     }
14 
15     locks.putIfAbsent(key, new Object());
16     synchronized (locks.get(key)) {
17         if (referenceClientMap.containsKey(key)) {
18             return referenceClientMap.get(key);
19         }
20 
21         // 创建 ExchangeClient 客户端
22         ExchangeClient exchangeClient = initClient(url);
23         // 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
24         client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
25         referenceClientMap.put(key, client);
26         ghostClientMap.remove(key);
27         locks.remove(key);
28         return client;
29     }
30 }

上面方法先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。ReferenceCountExchangeClient 内部实现比较简单,就不分析了。下面我们再来看一下 initClient 方法的代码。

 1 private ExchangeClient initClient(URL url) {
 2 
 3     // 获取客户端类型,默认为 netty
 4     String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
 5 
 6     // 添加编解码和心跳包参数到 url 中
 7     url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
 8     url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 9 
10     // 检测客户端类型是否存在,不存在则抛出异常
11     if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
12         throw new RpcException("Unsupported client type: ...");
13     }
14 
15     ExchangeClient client;
16     try {
17         // 获取 lazy 配置,并根据配置值决定创建的客户端类型
18         if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
19             // 创建懒加载 ExchangeClient 实例
20             client = new LazyConnectExchangeClient(url, requestHandler);
21         } else {
22             // 创建普通 ExchangeClient 实例
23             client = Exchangers.connect(url, requestHandler);
24         }
25     } catch (RemotingException e) {
26         throw new RpcException("Fail to create remoting client for service...");
27     }
28     return client;
29 }

 

标签:dubbo,url,URL,源码,-----,Invoker,new,null,Constants
From: https://www.cnblogs.com/fnlingnzb-learner/p/16803482.html

相关文章

  • docker部署的jenkins配置多jdk版本环境时,报错/var/jenkins_home/jdk-17.0.4/bin/java:
    背景:jenkins默认jdk版本1.8,某个项目需要使用jdk17,但jenkins从官网在线安装的jdk最高版本只有jdk1.9,手动安装jdk17完成后,在jenkins容器内执行手动安装的jdk17时,报错/var/jen......
  • django之五--获取url参数和name的作用
    一、前言假如我们要打开这两个地址:会发现这两个地址的最大差别就是时间参数的值是动态的(年和月的值是动态的)。那么,如果我们想要获取地址里面的【2018】和【07】这两个值,就......
  • Manifest使用示例6 - 安装并使用git私有仓库
    有的开发者会借助github创建自己的私有库,那么如何利用vcpkg使用git上的私有库呢? 请参考以下示例。 使用示例1.准备一个私有仓库Cheney-W/test,且为这个私有库生成一......
  • TS 自定义类型-修改使对象部分属性必填
    工作中常常用API的入参是非必填的,而实例的属性因为有默认值而一定存在的情况,举个例子:typeTestOptions={num?:numberstr?:strhookFn?:()=>string}c......
  • 设计模式-创建者模式-抽象工厂模式
    设计模式-创建者模式-抽象工厂模式内容摘自:重学Java设计模式:实战抽象工厂模式「替换Redis双集群升级,代理类抽象场景」|bugstack虫洞栈抽象工厂模式介绍图片来自:h......
  • Manifest使用示例2 - 依赖多个vcpkg 的历史版本库
    以下示例将提供在自定义工程中使用vcpkg中fmt、sqlite3、zlib的固定版本。Manifest模式-CMake工程1.示例根目录:E:/test_manifest,文件目录结构如下:E:/ |--test......
  • Manifest使用示例4 - Binarycaching使用缓存文件
    在多人开发环境中,我们仅希望一个人管理项目需要的所有第三方库,并使用服务器部署和分发vcpkg中已编译的库,此时可以使用vcpkg的Binarysource特性。VCPKG默认开启Binaryca......
  • CVPR2021:IoU优化——在Anchor-Free中提升目标检测精度(附源码)
    计算机视觉研究院专栏作者:Edison_G目前的anchor-free目标检测器非常简单和有效,但缺乏精确的标签分配方法,这限制了它们与经典的基于Anchor的模型竞争的潜力1 简要目前的anch......
  • 2022 CSP-S 游记
    \(2022-9-18\)初赛在考场门口听到有人在聊florr.io,florr怕不是风靡\(OI\)圈了。宇宙射线什么东西。\(2022-9-27\\Day\-\infty\)出分了,还好过了,同机房的报了提高组的......
  • vue实战-深入响应式数据原理
    本文将带大家快速过一遍Vue数据响应式原理,解析源码,学习设计思路,循序渐进。数据初始化_init在我们执行newVue创建实例时,会调用如下构造函数,在该函数内部调用this._init(......