作用
- 生产者 将Invoker对象导出
- InjvmProtocol 没有真正的干活逻辑 仅仅是一些属性赋值
- DubboProtol 启动服务
- RegistryProtol
- 通过DubboProtocol启动服务
- 注册远程配置中心
- 消费者
1 扩展接口
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
*
* @param <T> Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
/**
* 生产者将Invoker对象进行导出
* - 本地
* - 远程
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
*
* @param <T> Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}
默认实现是dubbo
2 配置文件
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
实现类上没有@Adaptive注解标识
3 通过生成code方式创建自适应扩展实现类对象
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
4 最终实现
- 默认dubbo
- URL中协议protocol指定
4.1 InjvmProtocol
4.1.1 服务导出
// InjvmProtocol
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
/**
* invoker
* - 要暴露的对象的代理
* - javassist编码创建代理对象
* - Java反射
* serviceKey
* - invoker中注入了URL配置
* - 目标服务的全限定名
*/
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
// InjvmExporter.java
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { // 做一些属性赋值
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
}
4.1.2 服务引用
4.2 RegistryProtocol
4.2.1 服务导出
/**
* 生产者将目标服务创建Invoker对象导出到远程
* - 启动服务
* - 通过Protocol的DubboProtocol实现
* - 服务注册
* - 通过RegistryFactory的ZookeeperRegistryFactory注册到远程注册中心
*/
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
/**
* 真正的导出服务
* - 实现在DubboProtocol中
* - 启动目标服务
*/
final ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
/**
* 注册中心的地址
* - zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.132.185%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dnative-provider%26bind.ip%3D10.10.132.185%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D95551%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1669363479100&pid=95551&qos.port=22222×tamp=1669363479082
*/
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
/**
* 注册到远程注册中心
* - registryUrl
* - zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.132.185%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dnative-provider%26bind.ip%3D10.10.132.185%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D95899%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1669364001492&pid=95899&qos.port=22222×tamp=1669364001477
* - registeredProviderUrl
* - dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=95899&side=provider×tamp=1669364001492
*/
this.register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
实现两个功能
- 目标服务的启动
- 注册远程配置中心
4.2.1.1 服务启动
// RegistryProtocol.java
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
/**
* invokerDelegate是真正运行的目标对象的代理
* - invoker的协议已经被篡改了
* - 本地的话是InjvmProtocol 协议是injvm
* - 远程的话是RegistryProtocol 协议是registry
* - 因此现在要真正启动服务需要再用当时的运行协议
* - 这个地方是dubbo协议 使用的实现是DubboRegistry
*/
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
服务启动实现在DubboProtocol中
4.2.1.2 注册远程配置中心
if (register) {
/**
* 注册到远程注册中心
* - registryUrl
* - zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.132.185%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dnative-provider%26bind.ip%3D10.10.132.185%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D95899%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1669364001492&pid=95899&qos.port=22222×tamp=1669364001477
* - registeredProviderUrl
* - dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=95899&side=provider×tamp=1669364001492
*/
this.register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
public void register(URL registryUrl, URL registedProviderUrl) {
/**
* registryUrl
* - zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.132.185%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dnative-provider%26bind.ip%3D10.10.132.185%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D95948%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1669364071921&pid=95948&qos.port=22222×tamp=1669364071907
* registedProviderUrl
* - dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=95948&side=provider×tamp=1669364071921
*
* RegistryFactory实现扩展
* - 默认dubbo
* - URL协议指定
* - 这个地方使用ZookeeperRegistryFactory实现
*/
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
4.2.2 服务引用
4.3 DubboProtocol
4.3.1 服务导出
4.3.1.1 服务启动
// DubboProtocol.java
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&bind.ip=10.10.132.185&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=94935&qos.port=22222&side=provider×tamp=1669362564480
URL url = invoker.getUrl();
// export service.
/**
* key标识着服务的目标对象
* com.alibaba.dubbo.demo.DemoService:20880
*/
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&bind.ip=10.10.132.185&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=94935&qos.port=22222&side=provider×tamp=1669362564480
this.openServer(url);
optimizeSerialization(url);
return exporter;
}
// DubboProtocol.java
private void openServer(URL url) {
// find server.
String key = url.getAddress(); // 10.10.132.185:20880
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, this.createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
// DubboProtocol.java
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
/**
* url
* - dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&bind.ip=10.10.132.185&bind.port=20880&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.2&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=95076&qos.port=22222&side=provider×tamp=1669362762912
*/
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
// header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
// header
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}