1. Protocal:默认dubbo协议,默认端口20880
2.zk作为注册中心,创建接口下的consumer/provider/configurator/router节点是持久话节点。此四个节点下的节点是临时节点,会话心跳检测不到节点信息,就会自动删除
3.@enabledubbo:扫描@service和@reference注解
4.dubbo重试次数可以通过xml或者注解配置。也可以通过上下文动态配置,RpcContext.setAttachment("retries",2)
5.集群容错:cluser
Failover s
Failfast Cluster
Failsafe Cluster
Failback Cluster:失败时,返回空的RpcResult,然后记录失败请求,异步定时调用
Forking Cluster
Broadcast Cluster:广播调用,任意一个报错,就报错
6.负载均衡:localbalance
Random LoadBalance
RoundRobin LoadBalance
LeastActive LoadBalance
ConsistentHash LoadBalance:相同参数的请求总是发到同一提供者
7.线程模型:protcol:dispatcher,ThreadPool
8.直连服务:<dubbo:reference url=“dubbo://localhost:20890” />
9.指定订阅
10.多协议
11.多注册中心
12.SpringCloud基于http,dubbo基于TCP并发度更高。
13.SpringCloud只有HTTP协议,dubbo支持多个通信协议
14.多分组:Dubbo group多分组时,消费者指定group="*",则可以使用任意分组
16.多聚合:Dubbo group 提供merge功能,合并方法对应所有分组输出的结果(利用Dubbo SPI实现)。
17.静态服务:dynamic=false.服务上下线,人工管理。
18.多版本:version
19.Dubbo参数校验.
1.包:javax.validation/validate-hibernate.
2.@reference中设置validation=true(注意是在消费侧添加,消费测校验)
20.结果缓存:cache=。lru,threadlcoal,jcache
21.泛化调用:GenerateService,调用所有服务的实现。主要用于客户端没有Api、入参和出参数模型时。可以用来待见通用测试框架。
11.上下文信息:RpcContext.getContext()
23.隐士参数:Attachment 在服务消费方和提供方之间隐式传递参数。RpcContext.getContext().getAttachment("index");
12.异步执行:阻塞主线程,等待返回结果。consume将provider端执行的业务从Dubbo内部线程池,切换到业务线程。 (优化:completeFuture,采用同志的方式,不会阻塞主线程)。
13.本地调用:inJvm=true,使用本机服务,不需要远程调用。2.2.0版本之后,默认优先引用本地服务,如果想引用远程服务,配置scope=remote.
14.参数回调:通过参数回调,从服务端调用客户端逻辑。Dubbo基于长连接生成反向代理,这就可以从服务端调用客户端逻辑(利用TCP双向通信)dubbo非常适合这种长链接的双端通信
15.事件通知:
16.本地存根:stub消费端本地生成代理(消费端也实现接口),由本地代理决定是否调用远程服务。在本地执行部分业务逻辑+服务方业务逻辑
.实现远程接口
.持有被代理服务引用,走构造函数传值(服务暴露的时候会对stub配置进行校验)
17.本地伪装:通常用于实现服务降级。支持:force和fail(默认)。force不走远程调用,直接强制降级。fail当远程调用失败,才走降级。
1.mock=ture,默认降级类:接口名+mock。也可以mock="全限类名"。这两个都会走远程服务,当RPC异常才会降级
2.mock="force:true",直接走降级
3.mock="force: return abbc" 直接降级
4.mock="throw java.lang.runtimeException" 直接降级
5.force强制降级,一般不会在代码中直接配置,而是通过服务治理的方式,dubbo-admin over-wirte直接配置的。
18.stub和mock区别。
1.mock只关注异常,且只关注RPCException
2.stub在consumer编写部分逻辑,可在RPC调用前后编写业务逻辑,异常时降级。
19.dubbo-admin:dubbo服务治理,运行时动态改变dubbo服务配置信息。
.延迟暴露:delay。当服务需要预热,比如缓存数据,等待资源时。目前已经无用了,服务都是在Spring容器启动之后才暴露的。
.并发控制:实现线程隔离,会新创建线程池(和dubb线程池隔离)
1.executes:服务端并行执行数。
2.actives:每个客户端并发执行数。
.连接控制:protocol:accepts,connections
1.accepts=10:限制服务器端接受的连接不能超过 10 个
2.connections=10:限制客户端服务使用连接不能超过 10 个
3.connections:表示该服务对每个提供者建立的长连接数
.粘滞连接:尽可能让客户端总是向同一提供者发起调用
.令牌验证:在注册中心控制权限,可以防止消费者绕过注册中心访问提供者。
.配置规则/配置覆盖
1.Api形式修改:Registry.register()
2.dubbo-admin:overide协议目的是在运行时修改属性的
.优雅停机:优雅停机超时时间默认10s。配合Severlet规范,销毁监听器,调用Spring事件监听器,调用DubboShutdownHook.destoryAll()方法。
1.服务方:不再接受新的请求,等待线程池中正在执行的线程执行结束。
2.消费方:不再发起新的请求,等待未返回的响应完成响应。
.主机绑定:protcol:host,port. 默认获取IP可能是内网IP
.单工通信:method:isReturn=false。提高单次请求效率,客户端不需要服务端返回值,减少一次服务端响应客户端TCP通信。
# Dubbo源码解读
1.xml方式dubbo:annotion标签:注册ServiceAnnotationBeanPostProcessor和ReferenceAnnotationBeanPostProcessor
2.注解方式:@EnableDubbo
1.@EnableDubboConfig:根据dubbo.property文件创建dubbo配置类,流程如下
1.往Spring容器里面创建标签的配置类
2.对Spring容器中标签配置类进行数据绑定
3.数据源是dubbo.properties...从enverment对象获取
细节如下:
1.注册DubboConfigConfiguration.Single.class到resitry中,
2.EnableDubboConfigBindings注解,import了DubboConfigBindingsRegistrar
3.遍历EnableDubboConfigBindings中的配置注解(Application,registry,protocol等)
3.1.【变成BeanDefinetion】registerDubboConfigBean将配置类(Application,registry,protocol等)转化为BeanDefinetion,注册到上下文。
3.2.【进行数据绑定】registerDubboConfigBindingBeanPostProcessor():为每个配置类BeanDefinition注册一个DubboConfigBindingBeanPostProcessor。
3.3.【进行数据绑定】通过DubboConfigBindingBeanPostProcessor,通过DataBinder在初始化bean中对进行配置Bean的数据绑定
说明:
1.DubboConfigBindingBeanPostProcessor负责每一个Dubbo配置类的绑定
2.DataBuilder:Spring中通过配置文件前缀对类进行数据绑定。
2.@DubboComponentScan:注册ServiceAnnotationBeanPostProcessor和ReferenceAnnotationBeanPostProcessor
1.ServiceAnnotationBeanPostProcessor:对@Service的支持
1.创建DubboClassPathBeanDefinitionScanner继承自ClassPathBeanDefinitionScanner
2.doScan对包下面的类进行扫描,获取所有包含@service注解的类,并封装成BeanDefinition,注册到Registry中
3.创建ServiceBean类的BeanDefinition(每个@service注解的类都对应一个ServiceBean)
4.根据@service注解,填充ServiceBean的BeanDefinition的属性。
.class属性.interface,ref等
5.注册
说明:
1.一个@Service注解修饰的类,会对应两个beanDefinition.上面2/3。
2.ReferenceAnnotationBeanPostProcessor:对@Reference的支持(dubbo与Spring整合)
1.收集@reference注解:postProcessMergedBeanDefinition()
2.依赖注入:postProcessPropertyValues
3.题外话:自定义注解依赖注入实现:
1.自定义BeanPostProcessor
2.收集注解:实现MergedBeanDefinitionPostProcessor重写postProcessMergedBeanDefinition方法。
3.依赖注入:继承InstantiationAwareBeanPostProcessor重写postProcessProperties方法
3.Dubbo SPI.
1.dubbo SPI (META-INF/services/或META-INF/dubbo/和META-INF/dubbo/internal.文件名接口名,内容:key:class)
1.定义接口
2.接口增加@SPI注解
3.实现扩展类
4.META——INFI文件夹下配置扩展文件
2.获取扩展类:ExtensionLoader.getExtensionLoader。用法:
1.@Adaptive:扩展勒注解只能有一个.getAdaptiveExtension()
2.@Activate:扩展类分组getActivateExtension(url,value[],group)
1.首先看分组,如果值@Activate只配置了分组,那么就只匹配分组值 如(@Activate(group = {"rabbitmq"}))
2.会匹配url中的参数,如果分组和value都配置了。首先匹配分组,然后在匹配url中的参数key 如@Activate(value = {"value1"},group = {"rabbitmq"})。
3.getActivateExtension(*,values[],*)第二个参数会匹配SPI文件中的Key关键字。
3.getDefaultExtension()获取接口SPI注解中value之对应的扩展类
3.获取扩展类源码解析getExtension(String name):
1.获取ExtensionLoader。每个SPI接口对应一个ExtensionLoader。
2.cachedInstances:优先根据name从map中获取Holder<Object>
3.map没有,则进行创建createExtension(name)
3.1.getExtensionClasses()。获取扩展类集合Map,缓存建立各种映射关系
1.loadExtensionClasses:从本地文件中加载key和类的关系
1.设置cachedDefaultName:SPI注解的value
2.如果类有注解@Adaptive。设置cachedAdaptiveClass:@Adaptive
3.如果是包装类:cachedWrapperClasses:设置包装类型集合
4.其他:(不包括@Adaptive和包装类)
1.cachedActivates:建立名称和类上@Activate注解映射Map<String, Activate>
2.cachedNames:建立类和名称的映射ConcurrentMap<Class<?>, String>
3.建立名称和类的映射装成Map<String, Class<?>>
2.放入缓存cachedClasses:名称和类的映射装成Holder<Map<String, Class<?>>>
3.2.根据class从缓存EXTENSION_INSTANCES获取实例ConcurrentMap<Class<?>, Object>
3.3.缓存没有,根据class反射创建实例,放入缓存。
3.4.IOC属性注入:injectExtension(),也是通过SPI形式,从扩展Factory中拿值。通过SpiExtensionFactory或者SpringExtensionFactory获取依赖对象。
.遍历类方法中所有以set开头的
3.5.判断包装类集合是否为空【有可能返回实例,就不是名称对应的实例,而是被包装的实例】
1.不为空,则实例化包装类,且对包装类进行ioc,责任链模式(ProtocolFilterWrapper->ProtocolListenerWrapper->ProtocolListenerWrapper->DubboProtocol)
4.getActivateExtension(url,value[],group)源码解析
1.延用getExtensionClasses();流程如3.
2.遍历缓存cachedActivates,Map<String, Activate>
3.优先匹配group.
4.根据URL参数匹配@Activate的value属性
5.根据values[]匹配
5.getAdaptiveExtension()。
1.根据@Adaptive注解获取实例
2.如果获取不到,判断SPI接口方法是否有主机@Adaptive。动态生成字节码,创建对应的代理类。
4.ServiceBean的初始化和属性注入
1.@Enable->@DubboComponentScan:注册ServiceAnnotationBeanPostProcessor
2.ServiceAnnotationBeanPostProcessor:完成Beandefinition注册
.创建DubboClassPathBeanDefinitionScanner继承自ClassPathBeanDefinitionScanner
.doScan对包下面的类进行扫描,获取所有包含@service注解的类,并封装成BeanDefinition,注册到Registry中
.创建ServiceBean类的BeanDefinition(每个@service注解的类都对应一个ServiceBean)
.根据@service注解,填充ServiceBean的BeanDefinition的属性。
5.注册ServiceBeanDefinition
3.ServiceBean继承InitializingBean:在afterPropertiesSet对所有的config属性赋值,服务发布时要用。
.ProviderConfig
.ApplicaitonConfig
.moduleConfig
.registryConfigs
.monitorConfig
.protocolConfigs
5.Dubbo Provider 服务发布流程
1.ServiceBean继承ApplicationListener<ContextRefreshedEvent>,通过监听事件。实现服务发布
.Spring容器启动时,registerListeners()收集上下文中即成继承ApplicationListener的类
.Spring容器启动前完成之后,广播事件finishRefresh()->publishEvent(new ContextRefreshedEvent(this))
.export服务暴露:onApplicationEvent()事件触发,调用export()完成服务暴露
.设置配置类属性
.checkoutDefault()
.创建ProviderConfig(@EnableDubboConfig没有创建ProviderConfig的话)
.未配置Provider时,则创建provider
.appendProperties()并设置provider对象属性,遍历set方法。
优先从环境变量取属性System.getProperty()
从dubbo.property文件中获取
.绑定其他配置类属性
.校验配置信息(application,registry,Protocol,mock,stub)
.暴露服务
.概话流程总结:
1.@Service或者@Reference里的配置
2.先转化为各种配置Bean,如ApplicationConfig,RegistryCongfig,ProtocolConfig等。
3.把配置类专为map:appendParameters(map, application)
4.把map转为URL(包含协议头的协议)
细节:
.收集Registry协议:loadRegistries()根据配置Registies,获取注册的url。(获取注册协议)
.遍历所有registries
.封装属性到map中(applicaiton,registryConfig等)。 appendParameters(map, application);
.生成URL。UrlUtils.parseURLs(address, map)->loadRegistries()根据registry中的Adress地址和map中的key,value。生成url
.zookeeper://192.168.67.139:2184/com.alibaba.dubbo.registry.RegistryService?application=dubbo_provider&dubbo=2.0.2&owner=world&pid=13218×tamp=1709463351616
.URL设registry属性,把协议头换成从zookeeper改为registry
.registry://192.168.67.139:2184/com.alibaba.dubbo.registry.RegistryService?application=dubbo_provider&dubbo=2.0.2&owner=world&pid=13882®istry=zookeeper×tamp=1709465346174
.返回URL。上面的配置属性都会通过注册中心传递给消费者。
.收集Dubbo协议,以及暴露Registry和dubbo协议:doExportUrlsFor1Protocol(protocolConfig, registryURLs)完成服务暴露
.收集配置类到map中(application/module/provider/protocolConfig/serviceconfig)
.把methodsConfig/ArgumentConfig配置同样设置到map中
.提前生成接口类对应的代理类,Wrapper.getWrapper(interfaceClass)
.获取接口包含的所有方法,设置key为methods到map中
.设置token到map
.获取host:如果Protocol和Privoder配置类都没有配置IP的话,就会调用InetAddress.getLocalHost().getHostAddress();获取主机IP。如果利用容器部署,有可能获取到内网IP,导致消费端掉不通。
.获取端口:如果Protocol和Privoder配置类都没有配置端口的话,通过SPI机制获取Protocol对应的默认端口。(DubboProtocol)
.把map转成URL,DUBBO协议
.dubbo://192.168.28.25:29015/com.xiangxue.jack.service.UserService?anyhost=true&application=dubbo_provider&bean.name=com.xiangxue.jack.service.UserService&bind.ip=192.168.28.25&bind.port=29015&default.timeout=5000&dubbo=2.0.2&generic=false&interface=com.xiangxue.jack.service.UserService&methods=doKill,queryUser&owner=world&pid=16437&revision=0.0.1-SNAPSHOT&side=provider&timeout=2000×tamp=1709472937360
.exportLocal(url)本地暴露
.url:dubbo://192.168.10.47:29015/com.xiangxue.jack.service.UserService?anyhost=true&application=dubbo_provider&bean.name=com.xiangxue.jack.service.UserService&bind.ip=192.168.10.47&bind.port=29015&default.timeout=5000&dubbo=2.0.2&generic=false&interface=com.xiangxue.jack.service.UserService&methods=doKill,queryUser&owner=world&pid=27720&revision=0.0.1-SNAPSHOT&side=provider&timeout=2000×tamp=1709568154078
.遍历注册地址,远程调用暴露服务(调用关系AbstractProxyInvoker->wrapper->ServiceClass)
.遍历第一步loadRegistries()中返回的注册协议地址
.加载URL 监控url. monitorUrl = loadMonitor(registryURL)
.dubbo协议地址URL绑定monitor属性地址
.获取到invoker对象
.将Dubbo协议URL地址,添加到registry协议URL属性中。key为export
.registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())
.通过SPI获取代理工厂。因为没有@Adaptive注解。通过javassist动态生成一个代理工厂ProxyFactory$Adaptive。
.最终获取到StubProxyFactoryWrapper持有JavassistProxyFactory的代理工厂
.调用代理工厂JavassistProxyFactory.getInvoker()返回AbstractProxyInvoker。(持有被代理类【ServiceImpl】,接口类,URL(Registry))
.生成代理类:Wrapper.getWrapper对需要调用的目标类的包装类,通过javassist技术动态生成的
.生成Invoker:AbstractProxyInvoker:是Dubbo最底层的Invoker对象,只有通过他才能调用provider端的目标Service服务方法。持有被代理类,接口类型,RegisterURL
.调用关系AbstractProxyInvoker->wrapper->ServiceClass
.包装Invoker对象为DelegateProviderMetaDataInvoker,持有Invoker对象和ServiceConfig对象
.暴露远程服务:registry协议的export。protocol.export(wrapperInvoker)。
.SPI方式获取Protocol。最终获取到包装类,持有关系:QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
.开启QosServer。QosProtocolWrapper.export().就是一个Netty服务端(服务统计功能如服务列表,服务在线状态等)
.完成服务注册regist.export():RegistryProtocol.export()
.启动Server:doLocalExport()走到DubboProtocl中启动NettyServer
.创建InvokerDelegete:持有原始Invoker和Dubbo协议URL
.服务发布:protocol.export(invokerDelegete)。protocol是SPI动态生成的Protocol$Adaptive,invokerDelegete对应的URL为Dubbo,最终会掉到DubboProtocol。调用关系:QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->DubboProtocol
1.构建Invoker调用链:ProtocolFilterWrapper.class。SPI获取所有Filters,ProtocolFilterWrapper.buildInvokerChain,通过next指针传递
1.说明:最终执行链的关系:Filter1-》Filter2-〉invokerDelegete->AbstractProxyInvoker->wrapper->ServiceClass
2.ProtocolListenerWrapper.export
1.DubboProtocol.expot():启动Server
1.获取String url
2.获取key:(服务名称:端口)
3.创建DubboExporter:持有Invoker执行链和key和exporterMap。每个服务都对应一个DubboExporter
4.构建映射关系:Map<String, Exporter<?>> exporterMap。
5.openserver():启动服务(每一个@Service修饰的类都会进来,但是同一个IP+端口,只会启动启动一个netty服务端)后面重点讲。
1.获取Address:ip:port
2.从serverMap获取ExchangeServer。
3.如果获取不到则创建createServer(url):启动netty服务进行双端通信,返回ExchangeServer对象。一个主机只会启动一次NettyServer。源码后面重点讲。
.总结DubboProtocol.expot()1.完成了Netty服务端的启动,2.建立handler的链条关系
2.服务发布监听包装类,其实就是提供一个扩展,服务发布后给一个通知。ListenerExporterWrapper.持有DubboExporter和List<ExporterListener>。
.包装ExporterChangeableWrapper类,持有ListenerExporterWrapper和originInvoker【registry协议】
.registryUrl = getRegistryUrl(originInvoker):获取注册协议zookeeper协议Url
.zookeeper://192.168.67.139:2184/com.alibaba.dubbo.registry.RegistryService?application=dubbo_provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.3.6%3A20880%2Fcom.xiangxue.jack.async.AsyncService%3Fanyhost%3Dtrue%26application%3Ddubbo_provider%26bean.name%3DServiceBean%3Acom.xiangxue.jack.async.AsyncService%26bind.ip%3D192.168.3.6%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.xiangxue.jack.async.AsyncService%26methods%3DasynctoDo%26owner%3Dworld%26pid%3D86931%26revision%3D0.0.1-SNAPSHOT%26side%3Dprovider%26timeout%3D123456%26timestamp%3D1710227953244&owner=world&pid=86931×tamp=1710227939570
.获取注册类ZookeeperRegistry。getRegistry(originInvoker);
.registeredProviderUrl= getRegisteredProviderUrl(originInvoker):获提供者地址(写入接口下provider节点)
.dubbo://192.168.3.6:20880/com.xiangxue.jack.async.AsyncService?anyhost=true&application=dubbo_provider&bean.name=ServiceBean:com.xiangxue.jack.async.AsyncService&dubbo=2.0.2&generic=false&interface=com.xiangxue.jack.async.AsyncService&methods=asynctoDo&owner=world&pid=86931&revision=0.0.1-SNAPSHOT&side=provider&timeout=123456×tamp=1710227953244
.建立服务名称和ProviderInvokerWrapper的映射关系[缓存providerInvokers].ProviderInvokerWrapper持有originInvoker, registryUrl, registeredProviderUrl
.完成服务注册和监听:register(registryUrl, registeredProviderUrl)。把dubbo协议地址注册到Provider节点下,其实就是创建服务节点(providers节点和dubbo服务地址临时节点);
.SPI获取ZookeeperRegistryFacotry创建Zookeeper,创建zk客户端,注册断线重连监听
.ZookeeperRegistry.register()
.缓存注册过的URL:registered
.创建节点名称:doRegistry(),创建provider持久化节点和dubbo服务地址的临时节点
.[以下逻辑是对configurators节点注册事件监听,如果修改了属性则会覆盖客户端的该节点的数据]
.overrideSubscribeUrl=getSubscribedOverrideUrl(registeredProviderUrl)。获取override的provider协议。后面映射关系可以将provider协议Url理解为接口。(subscribed,notified两个权局缓存)
.provider://192.168.3.6:20880/com.xiangxue.jack.async.AsyncService?anyhost=true&application=dubbo_provider&bean.name=ServiceBean:com.xiangxue.jack.async.AsyncService&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.xiangxue.jack.async.AsyncService&methods=asynctoDo&owner=world&pid=96966&revision=0.0.1-SNAPSHOT&side=provider&timeout=123456×tamp=1710257665285
.注册监听事件registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)订阅configurators节点下数据变更事件,path:/dubbo/com.a.b.service/configurators。
.全局变量缓存:subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(),缓存overrideSubscribeUrl和NotifyListener
.建立overrideListener和 ChildListener的映射
.创建path:/dubbo/com.a.b/configurators
.注册监听事件
.建立dubbo的ChildListener事件类和Curator的事件类的映射
.事件关系:NotifyListener->ChildListener->CuratorWatch
.当发生overrite时,zk监听器调用curatorWatch的process方法,最终掉到NotifyListener方法,实现动态修改配置参数功能(下节重点将)
.创建Empty协议:
.empty://192.168.3.6:20880/com.xiangxue.jack.async.AsyncService?anyhost=true&application=dubbo_provider&bean.name=ServiceBean:com.xiangxue.jack.async.AsyncService&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.xiangxue.jack.async.AsyncService&methods=asynctoDo&owner=world&pid=26895&revision=0.0.1-SNAPSHOT&side=provider&timeout=123456×tamp=1710430398847
.notify(url, listener, urls);通知Empty协议。
.为QOS统计数据做准备.看服务列表,服务在线状态等
.publishExportEvent()发布服务暴露事件ServiceBeanExportedEvent,即发布一个服务已经暴露的通知。消费端会通过ApplicationListener关注此事件类型
.总结:核心流程在RegistryProtocol.export中
1.ServiceBean即成ApplicationListener。Spring启动后会触发onApplicationEvent事件
2.List<URL> registryURLs = loadRegistries(true)。加载所有Registry,将zookeeper协议URL变为registry协议URL。
3.遍历Protocols.
4.收集配置信息到map,把map转化为Dubbo协议URL。
5.遍历registryURLs.
6.将DubboUrl地址绑定到registryUrl属性中。key为export.
7.protocol.export(wrapperInvoker):开始Registry协议URL流转
8.QosProtocolWrapper.export():开启QosServer。就是一个Netty服务端(服务统计功能如服务列表,服务在线状态等)
9.RegistryProtocol.export():protocol.export(invokerDelegete):开始Dubbo协议流转
10:doLocalExport(originInvoker):启动netty服务端。Protocol调用链:QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->DubboProtocol
1.ProtocolFilterWrapper:根据Filter生成Invoker调用链
2.DubboProtocol:启动NettyServer:,根据filter构建invoker执行链,启动nettyserver,构建handler链
11.回到RegistryProtocol,完成服务注册:register(registryUrl, registeredProviderUrl),dubbo协议URL写入到/dubbo/com.*.*/providers/节点下
12.回到RegistryProtocol:对configurators节点注册事件监听:.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
6.override事件对主机属性的覆盖和对客户端代理的生成
1.override事件通知
1.CuratorWatc.precess调用到ChildListener.childChanged()
2.AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls).url:provider协议URL,urls为empty协议或者override协议地址:configurators节点下的
.遍历urls
.判断和url是否匹配,主要是判断目录是否相等
.result = new HashMap<String, List<URL>>().创建新的目录和override的list的映射
.覆盖notified全局变量,ConcurrentMap<URL, Map<String, List<URL>>>prividerUrl和configurators和override映射
.saveProperties(url):保存本地文件,把override协议列表保存到本地缓存文件中
.文件properties,key为服务名称,value为空格分割的orverride协议地址
.文件地址:用户目录+/.dubbo/dubbo-registry-应用名称—服务地址.cache
.保存文件的目的:服务在启动的时候可以优先从本地加载。AbstractRegistry构造函数中调用
.loadProperties():加载缓存文件中的override配置,不需要zookeeper也可以完成属性覆盖
notify(url.getBackupUrls()):触发覆盖
.listener.notify(categoryList):调用overrideListener,privider端,则只修改dubboExport
.将override的URL集合,专为List<Configurator>
.获取dubbo协议地址
. URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
.当前dubboURL
.URL currentUrl = exporter.getInvoker().getUrl();
.把override协议中的属性合并到dubbo协议中Configurator中
.URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
.如果当前url和新url不同,则重新创建DubboExport对象,持有inoker执行链,invoker中有新的url
.RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
总结:
1.verride协议中修改或者添加的属性合并到originUrl中,生成新的URL
2.比较新的URL和当前URL是否相同
3.不相同则重新完成服务暴露:其实就是创建一个新的DubboExport对象。
1.Override,Configurators在服务端触发
.notify方法,服务端只做了一件事,根据新的URL,创建新的DubboExport对象。
7.Dubbo Comsumer Ref服务引用流程
1.ReferenceAnnotationBeanPostProcessor.postProcessPropertyValues()负责对收集好的@refer注解进行依赖注入
2.遍历收集的注解集合
3.属性注入.方法上注解和字段上注解
.AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement.inject()
.AnnotationInjectedBeanPostProcessor.AnnotatedMethodElement.inject()
4.获取依赖bean:getInjectedObject()获取需要依赖注入的实例。创建了两层代理
.优先缓存中取,缓存没有则创建对象
.创建对象:doGetInjectedBean(annotation, bean, beanName, injectedType, injectedElement);
.创建referencedbean的名称:ServiceBean:+接口名称
.创建ReferenceBean:buildReferenceBeanIfAbsent()。每个@Reference注解都对应一个ReferenceBean对象
.创建对象
.解析注解中的属性,配置到ReferenceBean对象
.injectedElement,和ReferenceBean建立缓存
.创建需要依赖注入的代理实例:buildProxy(referencedBeanName, referenceBean, injectedType)JDK方式创建代理。
.创建InvokerHandler:ReferenceBeanInvocationHandler,持有ReferenceBean和bean对象,通过handler.init初始化bean,构建mockCluserInvoker代理对象,复给bean.
.生成bean对象:handler.init()->referenceBean.get()->referenceConfig.init();初始化远程接口
以下几步和@Servcie过程一样
.checkDefault():从系统属性和dubbo.properties中拿属性设置到ConsumerConfig中
.appendProperties(this):系统属性和dubbo.properties中拿属性设置到ReferenceConfig中
.初始化各种配置:application,module,registries,monitor等
.校验各种配置
.生成map,把配置属性专为map.
.创建ref代理: createProxy(Map<String, String> map)
.判断是否本地injvm(以下远程为主来介绍)
.获取所有注册协议,进行遍历urls=loadRegistries(false)。registry协议。
.registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo_consumer&dubbo=2.0.2&owner=world&pid=52033®istry=zookeeper×tamp=1710587235093
.将map变为String绑定到注册协议的URL上,key为“refer”,并复制给全局urls集合。registry协议。
.urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
.StringUtils.toQueryString(map)
.application=dubbo_consumer&check=false&default.check=false&dubbo=2.0.2&interface=com.xiangxue.jack.service.UserService&methods=doKill,queryUser&owner=world&pid=52033®ister.ip=192.168.42.25&revision=0.0.1-SNAPSHOT&side=consumer×tamp=1710587116529
.registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo_consumer&dubbo=2.0.2&owner=world&pid=52033&refer=application%3Ddubbo_consumer%26check%3Dfalse%26default.check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dcom.xiangxue.jack.service.UserService%26methods%3DdoKill%2CqueryUser%26owner%3Dworld%26pid%3D52033%26register.ip%3D192.168.42.25%26revision%3D0.0.1-SNAPSHOT%26side%3Dconsumer%26timestamp%3D1710587116529®istry=zookeeper×tamp=1710587134398
.创建invoker():refprotocol.refer(interfaceClass, urls.get(0))
开始处理regsitry协议Url
.QosProtocolWrapper.refer():启动QosServer
.RegistryProtocol.refer(class,url):
.修改url协议为:zookeeper
.doRefer()
.RegistryDirectory服务列表集合:该类非常重要。用来存储服务列表directory = new RegistryDirectory<T>(type, url)
.RegistryDirectory实现了NotifyListener。监听zookeeper的Notify事件,来刷新本地服务列表
.methodInvokerMap:缓存本地服务列表,建立method和invokers的映射关系
.持有type,url(zookeeper协议),zookeeperRegistry,
.生成订阅url:subscribeUrl,consumer协议正好与生产端协议相反(provider://)
.consumer://192.168.42.25/com.xiangxue.jack.service.UserService?application=dubbo_consumer&check=false&default.check=false&dubbo=2.0.2&interface=com.xiangxue.jack.service.UserService&methods=doKill,queryUser&owner=world&pid=52033&revision=0.0.1-SNAPSHOT&side=consumer×tamp=1710587116529
.生成注册url: registeredConsumerUrl,多了个目录属性category
.consumer://192.168.42.25/com.xiangxue.jack.service.UserService?application=dubbo_consumer&category=consumers&check=false&default.check=false&dubbo=2.0.2&interface=com.xiangxue.jack.service.UserService&methods=doKill,queryUser&owner=world&pid=52033&revision=0.0.1-SNAPSHOT&side=consumer×tamp=1710587116529
.把消费端注册到zookeeper中的consumers节点
./dubbo/com.*.AsyncService/consumers
.将registeredConsumerUrl绑定到RegistryDirectory属性中
.监控 providers,configurators,routers节点的变化
.directory.subscribe(url,this)。
.监控了3个节点,代码和@Servcie流程一样
.创建节点
.订阅节点事件
.发布empty协议消息。如果节点下无子节点,则处罚empty协议,有子节点则正常触发(服务端和消费端启动时都会触发),如服务端启动是,没有provider,则触发empty协议,有provider则将provider协议加载到服务列表
.RegistryDirectory继承了notifylistenner。通知形式和ServiceBean export流程一样。
.返回MockClusterInvoker对象。持有RegistryDirectory和FailbackClusterInvoker。
.得到MockClusterInvoker:支持mock降级和failover集群容错
.返回mockCluserInvoker代理。javassisProxy创建mockCluserInvoker代理。
.赋给ReferenceBeanInvocationHandler的bean属性
.创建代理:Proxy.newProxyInstance
.设置缓存
5.反射注入bean
总结:
其实就是注册节点到consumers节点下,然后订阅了providers/configurators/routers节点事件,创建Invoker代理。
1.ReferenceAnnotationBeanPostProcessor。负责@Reference属性的收集和依赖注入
2.postProcessPropertyValues()负责对收集好的@refer注解进行依赖注入。
3.AnnotatedFieldElement和.AnnotatedMethodElement。inject()负责方法和属性上的依赖注入。下面以字段依赖注入为例
4.接下来创建依赖bean。会使用双重代理。
1.JDK代理bean。JDK->ReferenceBeanInvocationHandler->bean
2.javassis创建invoker代理,复制给bean。Javassis->InvokerInvocationHandler->MockClusterInvoker
5.创建Invoke代理
.封装配置属性到map
.获取所有注册协议,进行遍历urls=loadRegistries(false);registry协议
.将map变为String绑定到注册协议的URL上,key为“refer”。:regiestry协议
.QosProtocol 启动QosServer
.RegistyProtocol 创建RegistryDirectory服务列表集合:该类非常重要。用来存储服务列表
,实现NotifyListener监听zk节点变化。
.注册节点到zookeeper中的consumers节:./dubbo/com.*.AsyncService/consumers
.注册节点监听事件。
.节点:providers,configurators,routers节点的变化代码和@Servcie流程一样。
.RegistryDirectory继承了notifylistenner。通知形式和ServiceBean export流程一样。
.返回MockClusterInvoker对象。持有RegistryDirectory和FailbackClusterInvoker
.返回mockCluserInvoker代理。javassisProxy创建mockCluserInvoker代理
6.JDK创建代理bean代理
创建InvokerHandler:ReferenceBeanInvocationHandler,持有Bean就是5中生成的invoke代理。
Dubbo之Mock原理和消费端服务列表刷新
1.注解方式和xml方式,referenc依赖注入的对象不同。xml是ref对象,是通过javassis生成的代理对象,生成的对象,存入到了Spring容器,@autowire可以直接引用。注解方式是JDK生成的代理对象,未放入到Spring容器,不能@autowire可以引用。
2.RegistryDirectory:服务列表,服务列表刷新,监听,动态服务列表。
.注册节点监听事件。
.节点:providers,configurators,routers节点的变化代码和@Servcie流程一样。
.RegistryDirectory继承了notifylistenner。通知形式和ServiceBean export流程一样。
3.RegistryProtocol.refer.生成的invoker,是MockClusterInvoker代理类,持有FailoverClusterInvoker和RegistryDirectory
4.Reference引用类代理关系:JDk->ReferenceBeanInvocationHanlder->Javassis->InvokerInvocationHandler->MockClusterInvoker
5.InvokerInvocationHander将method和args参数封装称RpcInvocation对象。
6.MockClusterInvoker是consumer 和provider的桥了。决定是否需要掉用远程服务。
7.消费者调用过程肯定不需要过注册中心。
。MockClusterInvoker.invoker->FailOverCluster.invoker()
。通过持有的RegsitryDirectory对象从methodInvokerMap方法映射缓存获取服务列表
。然后调用Invoker,在调用过程中,不需要和注册中心打交道。
6.消费端调用流程
1.ReferenceBeanInvocationHandler.invoker()->InvokerInvocationHandler.invoker()->MockClusterInvoker.invoke()->FailOverClusterInvoker.invoker()[或者是其他集群容错配置类]->DubboInvoker.doInvoke()->
2.MockClusterInVoker.invoker():消费端调用核心流程
1.获取mock配置,是否有配置mock。
。directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY
2.如果没有,直接RPC调用
。this.invoker.invoke(invocation);其中this.invoker指FailoverClusterInvokerr
.AbstractClusterInvoker.invoke()
.获取服务列表List<Invoker>
.RegistryDiretory.List
.doList()
.从缓存methodInvokerMap中,根据方法名称,获取对应的List<Invoker>
.根据routers进行过滤
.返回服务列表List<Invoker>
.SPI根据负载均衡算法:默认是random加全随机
.FailoverClusterInvoker.doInvoke(invocation, invokers, loadbalance)核心逻辑调用
.获取重试次数配置
.根据重试次数循环调用
。每次重试都重新获取最新的服务列表
.根据负载均衡算法选取一个服务
。invoker = select(loadbalance, invocation, copyinvokers, invoked);
。此处的invoked是指RPC失败后,重试过的invoker,用于下次重试时,排除已经试过的节点
.sticky判断是否粘带连接,是则返回之前调用过的invoker
。doSelect()
。如果invokers只有一个则直接返回。
。调用具体负载均衡算法,选择一个服务
。Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
。如果当前节点是已经试过的节点,则重新选一个invoker.
。返回当前选中的invoker
.记录已经调用过的invoker
.调用Invoker: Result result = invoker.invoke(invocation);
.InvokerDelegate->Filter1-》Filter2-〉DubboInvoker
.DubboInvoker.doInvoker
.单工通信:消费端注解配置。method:isReturn=false;
.不需要返回值,减少一次服务端给客户端响应的TCP通信
.不需要创建DefaultFeature
.异步处理:
.netty本来就是异步通信,不等待返回结果。返回ResponseFuture设置到Rpc上下文。
.需要返回结果时,调用ResponseFuture.get()阻塞等待即可
.双端通信
.等待服务端响应。服务端Netty通讯,有返回值的时候,服务端会回调Netty客户端的Read方法,会唤醒get等待的方法
.return (Result) currentClient.request(inv, timeout).get();
3.如果是force:则强制降级,调用本地方法
。doMockInvoke():其实这里会从缓存服务列表methodInvokerMap取,不过取不到mock类型的Invoker.
.创建MockInvoker对象
.MockInvoker.invoker()
.获取mock配置,并处理
.return:截取return后内容,包装数据类型直接返回RpcResult
.throw:截取throw后内容,实例化异常类型,直接抛出
.其他:调用具体的mock实现类中的方法进行降级。
.mock=true:消费端实现类名称为接口名称+Mock。
.具体实现类。mock="com.ab.c.servcie":消费端实现类名称为mock中配置的借口
.通过代理工厂javassisProxyFactrory将mock具体类专为Invoker,包装成abstractProxyInvoker.持有Wrapper,包装目标类。返回AbstractProxyInvoker
.调用:AbstractProxyInvoker.invoker->doinvoker()->wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
.RPC或本地Mock方法的调用都会通过Invoker调用wrapper再调用被代理的方法。
4.其他则走RPC异常降级,先调用远程服务,如果发生RPC异常,则调用本地mock方法(2+3)
总结:入口为MockClusterInvoker.invoke持有集群容错类Cluster,默认FailOverCluster
1.MockClusterInvoker.invoke
2.从Url上获取配置的mock信息
3.如果没有mock走远程RPC。
1.获取服务列表:
2.获取负载均衡算法。抽象类AbstractClusterInvoker
3.调用子类具体集群容错类的doInvoke(),实现远程服务调用。
1.调用负载均衡算法选择一个Invoker.
1.FailOverClusterInvoker:根据retry循环重试
2.FailBackClusterInvoker:调用失败,则返回空new RpcResult(),并记录失败请求,异步定时调用
3.FailFastClusterInvoker:如果RpcExpection,则直接抛出,不重试。
4.FailSafeClusterInvoker:如果RpcExpection,记录日志,返回空new RpcResult()
5.ForkingClusterInvoker:同时调用多台主机,返回响应最快主机的结果。
6.BroadcastClusterInvoker:服务列表挨个调用一边,场景:刷新本地缓存。
7.AvailableClusterInvoker.
4.如果force是强制降级,则走本地mock.
5.其他。走RPC异常降级。即先远程,如果发生RPC异常,则走本地(3+4)
7.consummer端的notify订阅事件(服务列表刷新)。
1.RegistryDirectory.notify(List<URL> invokerUrls)
2.根据Url中的目录(category),设置具体协议url
。invokerUrls:dubbo协议:providers目录
。routerUrls:router协议:routes目录
。configuratorsUrl:overide协议:configurators
3.协议处理
。routerUrls(动态修改路由信息):router:协议
.URls专为Router对象
.List<Router> routers = toRouters(routerUrls);
.遍历Urls
.route://0.0.0.0/com.xiangxue.jack.service.UserService?category=routers&dynamic=false&enabled=true&force=false&name=routeTest&priority=0&router=condition&rule=method+=+queryUser+=>+provider.cluster+=+failover+&+provider.host+=+192.168.90.126+&+provider.protocol+=+dubbo&runtime=false
.获取router属性。值为condition
.设置协议为condition
.routerFacotry获取Router对象
.调用ConditionRouterFactory.getRouter(URL url)
.new ConditionRouter(url)
。获取key=rule字符串:method+=+queryUser+=>+provider.cluster+=+failover+&+provider.host+=+192.168.90.126+&+provider.protocol+=+dubbo
。解析whencondition:=>箭头之前
。解析thencondition:=>箭头之后
.返回ConditionRouter对象。持有whenCondition和thenCondition
.集合添加MockInvokersSelector和TagRouter两个router
.configuratorUrls(动态修改配置信息):之前provider端notify有详细介绍
。转urls为List<Configurator>对象
。将override协议中的属性合并到overrideDirectoryUrl中。
.invokerUrls(动态修改providers节点,比如服务上线下线):url为dubbo协议,处理Dubbo协议
.refreshInvoker(invokerUrls):动态修改providers节点,比如服务上线下线
.如果只有一个empty协议。消费端启动的时候,先会是empty协议清空服务列表.forbid设置为true.
.否则:
.forbiden设置为false.
.urlInvokerMap:建立url和invoker的映射关系
.Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
.遍历所有invokerUrls。最终每一个invokerUrl都会对应一个Invoker对象
.根据协议过滤invokerUrls
.合并消费端配置属性,消费端配置优先:URL url = mergeUrl(providerUrl);
.生成缓存key,String key = url.toFullString()
.protocol.refer(serviceType, url),url为Dubbo协议:protocol.refer(serviceType, url)。调用protocol包装类QosProtocolWrapper->ProtocolFilterWrapper->ProtocolListenerWrapper->DubboProtocol
.DubboProtocol:
.获取ExchangeClient对象getClients(url)。
.获取connections配置参数,配置客户端跟服务端建立几个长连接
.初始化长链接
.绑定 requestHandler,是最后调用的handler
.创建DubboInvoker对象,并返。
.ProtocolListenerwrapper:包装ListenerInvokerWrapper,持有Invoker和List<InvokerListener>列表
.ProtocolFilterWrapper:根据Filter构建invoker执行链
.创建InvokerDelegate,持有ListenerInvokerWrapper,ProviderUrl和合并消费端属性的url
。invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
.建立url和invoke对象的映射:newUrlInvokerMap.put(key, invoker);
.返回映射newUrlInvokerMap
.methodInvokerMap:建立服务中方法和invoke的映射关系
. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
.循环所有的注册的invoke
.循环所有方法,建立方法和invoke的list的映射关系
.设置*和服务列表的映射
.遍历所有方法,根据路由规则获取匹配的Invokers集合,重新放入映射关系中
.route(methodInvokers, method)
.首先经过方法匹配:whencondition匹配
.然后其他条件匹配:thenCondition匹配
.method对应的invokers排序。
.返回映射关系
.删除不可用的invoker
。【总结】
。如果是消费端先启动,此时providers下无节点,接受empty协议。否则接受providers节点下所有注册的dubbo协议,即提供者节点
。empty协议,设置forbiden为true.
。List<invokerUrls:设置forbiden为false.
.遍历List<invokerUrls>,将URL专为Invoker。每个Url节点都会对应一个Invoker对象InvokerDelegate
.Protocol链路调用。
.DubboProtocol:创建长链接,生成DubboInvoker。
.ProtocolFilterWrapper:根据Filter创建Invoker执行链
.ProtocolListenerwrapper:包装ListenerInvokerWrapper
.urlInvokerMap:建立url和invoke的映射关系,toInvokers()
.methodInvokerMap:建立服务中方法和invoke的映射关系,并进行router过滤,toMethodInvokers()
.删除不可用的invoker。老的oldUrlInvokerMap中多余的invoker
总结:
1.RegistryDirectory.notify(List<URL> invokerUrls)监听三个节点providers,configurators,routers
2.根据Url中的目录(category),设置具体协议url
。invokerUrls:dubbo协议:providers目录
。routerUrls:router协议:routes目录
。configuratorsUrl:overide协议:configurators
3.处理协议:
。routerUrls:routes目录变动
。将协议专为Router对象,ConditionRouter对象。持有whenCondition和thenCondition
.集合添加MockInvokersSelector和TagRouter两个router
。configuratorUrls(动态修改配置信息):configurators节点变动
。urls为List<Configurator>对象
。将override协议中的属性合并到overrideDirectoryUrl中。
。invokerUrls:(providers节点变动,比如服务上线下线),dubbo协议.【其实就是生成DubboInvoker,建立了两个映射关系】
.urlInvokerMap:建立url和invoker的映射关系
。合并Url属性,消费端配置优先
。protocol.refer(serviceType, url):protocol执行链,最终DubboProtocol
.创建NettyClient和handler连
.创建DubboInvoker
。methodInvokerMap:建立服务中方法和invoker的映射关系
8.Consumer端Exchangeclient初始化
1.属于信息交换层,用于消费者发送请求,提供者接受请求,数据编码解码等,以request/response为中心。
2.初始化:
1.consummer端的notify订阅事件(服务列表刷新)
2.最后会调用到DubboProtocol.Refer(),初始化client
.DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
3.getClients(url)
4.获取连接个数配置:connections
5.一个ExchangeClient,就对应一个TCP长链接。
ExchangeClient[] clients = new ExchangeClient[connections];
6.创建长连接initClient(url);服务端列表中,没一个服务端对应一个长链接。
.url设置编解码codec
.url设置心跳时间。heartbeat
.返回HeaderExchangeClient,TCP长链接,由这个client发起调用。这里建立了一个很长的链式调用的对象关系。
.持有NettyClient,Netty客户端
。构造函数:构造Handler调用
。doOpen():开启Netty客户端
.connect():连接到netty服务端
.连接成功后的channel赋值一下
.开启心跳线程
9.DubboInvoker的调用流程
.DubboInvoker对象初始化时,getClient()创建Client和handler的执行连
。HeaderExchangeClient-》
.主要类:
HeaderExchangeChannel.request()
HeaderExchangeHandler.receive()
DefaultFeature.doReceive()
.发起请求是会生成一个流水号ID,即request对象id属性。服务端响应也会带上这个ID
.DefaultFeature有两个全局变量
。FUTURES:请求ID和Feature
。CHANNELS:请求ID和NettyClient
。get():阻塞等待
。doReceive()收到服务端响应,唤醒get()方法
。callBack.实现消费端回调。
.一个请求,对应一个应答。所有请求创建的defaultFeature保存在DefaultFeature里面。消费端应答时,通过请求流转过程中的ID,获取对应的defaultFeature。再进对get()方法唤醒。
消费端链路:
.请求链路:HeaderExchangeClient.request()->HeaderExchangeChannel.request()->NettyClient.send()->nettyChannel
.响应链路:NettyClientHandler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->defalutFutrue.doReceive()
服务端链路:
.请求链路:HeaderExchangeServer->*->NettyServer->NettyServerHandler
.响应链路:NettyServerHandler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter.reply()->DubboExporter.getInvoker->Invoker执行[Filter1-》Filter2-〉invokerDelegete->AbstractProxyInvoker->wrapper->ServiceClass]
.服务端接收请求和消费端接收响应的入口
消费端:NettyClientHandler.channelRead(),然后调用上面Handler执行链,最后调用defaultfuture.doRecieve()设置response,唤醒get方法
服务端:NettyServerHandler.channelRead(),然后调用上面Handler执行链,接着调用ExchangeHandlerAdapter.reply()方法->找到DubboExport,获取Invoker执行链,挨个调用。最后钓到abstractProxyInvoker持有的被代理对象,也就是对应Service的方法。
.序列化方式:默认Hession(原因:目前为止最稳定且效率最高的)
.doInvoke()调用流程:
.单工通信:消费端注解配置。method:isReturn=false;
.不需要返回值,减少一次服务端给客户端响应的TCP通信
.不需要创建DefaultFeature
.异步处理:
.netty本来就是异步通信,不等待返回结果。返回ResponseFuture设置到Rpc上下文。
.需要返回结果时,调用ResponseFuture.get()阻塞等待即可
.双端通信
.总结:消费端分两个流程发送请求和接受服务端应答
。发送请求:
。HeaderExchangeChannel.request():创建DefaultFuture类并返回,调用NettyClient发送请求。
DefaultFuture:持有持有FUTURES:请求ID和Feature。
CHANNELS:请求ID和NettyClient。
。调用default.get()进行阻塞等待。
。接受应答:
。服务端有数据返回时,回调用NettyClientHandler.channelRead()方法。
。开始调用Handler的执行链。
。HeartbeatHandler:发送心跳应答
。AllChannelHandler:服务隔离思想,获取线程池异步执行
。DecodeHandler:编解码,Hession进行反序列化
。HeaderExchangeHandler:
。服务端:
双工通信:处理请求,调用Filter执行链,最终调用Service方法。并将结果通过channel返回给客户端。
单工通信:直接处理请求
。消费端:
。根据请求ID获取对应的defaultFuture
.设置Response
.唤醒get方法()
详细流程
.return (Result) currentClient.request(inv, timeout).get()
.currentClient.request(inv, timeout)返回DefaultFeature对象。
.get()同步阻塞等待服务端响应。
。备注:服务端Netty通讯,有返回值的时候,服务端会回调Netty客户端的Read方法,会唤醒get等待的方法
.currentClient.request(inv, timeout)调用Client和handler的执行链,最后返回DefaultFeature
.HeaderExchangeChannel.request():
.创建DefaultFeature,持有两个全局变量映射关系,
。Map<Long, DefaultFuture> FUTURES:请求ID和Feature。
。Map<Long, Channel> CHANNELS:请求ID和NettyClient
。ID主要用来在Netty服务端回调回来的时候,通过ID找到对应的Feature,唤起get方法()。
。静态代码块中开启守护线程。用来扫描每一次调用是否超时
.调用NettyClient.send()发送请求
.调用NettyChannel.send()
.NioSocketChannel.writeAndFlush(message):nettyApi,netty的Nio远程调用
.发送数据到此结束,开始等待接收数据
.Netty回调write方法
.NettyClientHandler.channelRead():服务端有数据返回时,会调用此方法。服务端回传对象里面会包含请求ID。
.handler.received(channel, msg);
.MultiMessageHandler.received()
.HeartbeatHandler.received():服务端/客户端共用
.设置这次心跳读的时间:nettyChannel中设置一个
.channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
.如果是客户端:发送心跳应答。
.AllChannelHandler.received()
.服务隔离思想,获取一个线程池
。ChannelEventRunnable.run()
.state==recieved
.调用DecodeHandler
.DecodeHandler.received():实现请求/响应编解码
。返回解码数据
。DecodeRpcResult.decode()
.获取序列化器Serialization,默认Hession。
.反序列化
.结果写入RpcResult的value
.响应结果转为map写入AttachMent
.HeaderExchangeHandler.receive():【核心】DefaultFuture.receive()
.如果是服务端收到消息(服务端)
。双工通信:
。处理请求:
.创建根据请求ID创建相应对象Response
.调用DubboProtocol中匿名ExchangeHandlerAdapter.reply()
.根据映射关系找到DubboExporter,获取包装的Invoker[Invoker执行链接]
.执行链:Filter1-》Filter2-〉invokerDelegete->AbstractProxyInvoker->wrapper->ServiceClass(被代理类)。
。将请求结果,通过channel发送给客户端
。单工通信:
。直接处理消息
.如果是客户端收到消息
.设置response到DefaultFuture的全局变量,并唤醒get中的await
.DefaultFuture.receive():
。根据ID找到对应请求对应的defaultFuture
。defaultFuture.doReceive():设置Response全局变量,唤醒get中的await。此时get()方法解除阻塞,获取到Response返回值。
.如果配置了回调,则调用回调。(客户端配置了回调通知,则回调客户端配置的API,实际就是调用客户端配置的某个类的某个方法)
7.15
消费者服务列表加载和刷新
服务的Router路由和过滤
负载均衡算法。
7.20
Dubbo集群容错
Exchangeclient初始化
7.25
服务端exchange
dubbo同步调用流程
dubbo序列化对比
多协议实战
QOS运维
dubbo总结
其他:
1.长链接唯一标示:4维坐标:客户端IP,客户端Port,服务端IP,服务端Port
2.一台主机端口上线:65535(其中有1024个是内部使用的)
3.Linux默认文件句柄上线1024,可以修改
4.服务器的TCP的文件句柄理论上是无限的,只收内存约束,一个链接大概占用4-10KB内存。
5.长链接并不是越多越好。能够决定并发吞吐量,不是TCP长链接个数,不是带宽,而是后端服务器的处理能力。