一 入口驱动
public static void main(String[] args) throws IOException {
// 服务提供者暴露服务配置 封装了与注册中心的连接
ServiceConfig<DemoService> service = new ServiceConfig<DemoService>();
service.setApplication(new ApplicationConfig("native-provider")); // 应用配置
service.setRegistry(new RegistryConfig("zookeeper://localhost:2181")); // 注册中心
service.setProtocol(new ProtocolConfig("dubbo", 20880)); // 协议
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl()); // 提供的服务实现
// 服务导出
service.export();
System.in.read();
}
二 export
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
this.doExport(); // 前置检查
}
}
三 doExport
protected synchronized void doExport() {
if (unexported) throw new IllegalStateException("Already unexported!");
if (exported)
return;
this.exported = true; // 标识provider已经启动
if (this.interfaceName == null || interfaceName.length() == 0)
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
// 尝试从VM参数找一下是否有对Provider设置的配置项写回到ProviderConfig中去
this.checkDefault();
if (this.provider != null) {
if (application == null)
application = provider.getApplication();
if (module == null)
module = provider.getModule();
if (registries == null)
registries = provider.getRegistries();
if (monitor == null)
monitor = provider.getMonitor();
if (protocols == null)
protocols = provider.getProtocols();
}
if (module != null) {
if (registries == null)
registries = module.getRegistries();
if (monitor == null)
monitor = module.getMonitor();
}
if (application != null) {
if (registries == null)
registries = application.getRegistries();
if (monitor == null)
monitor = application.getMonitor();
}
if (ref instanceof GenericService) { // 要导出的服务是泛化服务类型
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else { // 要导出服务不是泛化类型
try {
// 接口名反射出接口的类
this.interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
this.checkInterfaceAndMethods(interfaceClass, methods);
this.checkRef(); // 校验暴露的服务实现跟接口是合法的
generic = Boolean.FALSE.toString(); // false
}
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
checkApplication(); // 尝试为ApplicationConfig到VM参数中load配置
checkRegistry(); // 尝试为RegistryConfig到VM参数中load配置
checkProtocol(); // 尝试为ProtocolConfig到VM参数中load配置
appendProperties(this); // 尝试为ServiceConfig到VM参数中load配置
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
/**
* 导出服务
*/
this.doExportUrls();
CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
四 doExportUrls
private void doExportUrls() {
/**
* 允许多协议多注册中心导出服务
* - 允许使用不同的协议导出服务
* - 也允许向多个注册中心注册服务
*/
List<URL> registryURLs = super.loadRegistries(true); // 生产者加载注册中心配置
for (ProtocolConfig protocolConfig : protocols) {
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
五 doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 接口中声明的方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
/**
* ConfiguratorFactory这个扩展接口只配置了2个实现
* - AbsentConfiguratorFactory
* - OverrideConfiguratorFactory
* 他们归属于第三优先级缓存 拿着dubbo这个名称查不到
* url.getProtocol()->dubbo
*/
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
/**
* 导出到本地
* dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&bind.ip=10.10.132.185&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=49771&qos.port=22222&side=provider×tamp=1669285279449
*/
this.exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// 导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器连接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器连接作为参数添加到url中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY); // proxy配置项影响到Invoker实现选择
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
/**
* 为服务提供类生成Invoker
* registryURL
* - registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=native-provider&dubbo=2.0.2&pid=88742&qos.port=22222®istry=zookeeper×tamp=1669353705349
* url
* - dubbo://10.10.132.185:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=native-provider&bind.ip=10.10.132.185&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=88742&qos.port=22222&side=provider×tamp=1669353705363
* proxyFactory的实现
* - 默认JavassistProxyFactory
* - URL中配置项proxy指定名称
* 这个地方registryURL中没有配置proxy 使用JavassistProxyFactory
*/
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
/**
* 导出服务生成Exporter
* protocol的实现
* - 默认DubboProtocol
* - URL中协议protocol指定名称选择实现
* 这个地方Invoker中getURL()中的就是上面的RegistryURL 协议是registry 使用的实现是RegistryProtocol
*/
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else { // 没有注册中心 仅仅导出服务
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
六 总结
源码大篇幅在构建URL上面,将必要信息整合写入到URL,将URL流转于服务导出环节
不管是导出服务到本地还是导出服务到远程,代码抽象都是一致的,根据SPI选择不同的实现
步骤都是
- ProxyFactory负责创建Invoker对象,将真正服务对象生成代理对象
- Protocol负责导出Invoker功能
ProxyFactory | Protocol | |
---|---|---|
创建Invoker对象导出本地服务 | JavassistProxyFactory | InjvmProtocol |
创建Invoker对象导出远程服务 | JavassistProxyFactory | RegistryProtocol |