文本使用的 Nacos 版本为 2.2.2
1. 概述
通过前两篇文章,我们已经大致了解了 Nacos Client 的服务订阅流程以及 EventListener 的回调机制。然而,Nacos Client 对配置信息的变更的监听,并没有采用和服务订阅相同的基于事件的发布订阅模式,而是使用了另一种监听机制。下面进行简要分析。
2. 添加 Listener
采用如下代码,添加一个Listener
动态监听配置变更。
@Test
public void addListenerTest() throws NacosException, InterruptedException {
String dataId = "mysql";
String group = "test";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println(configInfo);
}
});
while (true) {
Thread.sleep(10000);
}
}
addListener()
方法在 NacosConfigService
中如下所示。worker
是一个ClientWorker
类。
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
addTenantListeners()
方法如下。这里有两个类需要关注,一个是CacheDate
,一个是ConfigRpcTransportClient
也就是agent
。
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
group = blank2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setSyncWithServer(false);
agent.notifyListenConfig();
}
}
CacheDate
缓存了服务端配置信息,同时管理对该配置进行监听的所有Listener
,部分字段如下所示。
public class CacheData {
// 配置标识
public final String dataId;
public final String group;
public final String tenant;
// 监听者集合
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// 配置内容缓存
private volatile String content;
// 当前配置的 md5
private volatile String md5;
}
addTenantListeners()
会先尝试创建CacheDate
,然后将Listener
加入CacheDate
管理。ClientWorker
内部维护了一个cacheMap
来管理所有的CacheDate
,键为由(dataId, group, tenant)
构成的配置标识字符串。
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<>(new HashMap<>());
2. 配置监听
ConfigRpcTransportClient
是在ClientWork
中定义的一个内部类,其继承了ConfigTransportClient
抽象类。ConfigRpcTransportClient
在内部定义了对配置变更请求的处理器。当一个请求是ConfigChangeNotifyRequest
时,就会从cacheMap
中取出缓存,若缓存存在则将其设为与服务不同步并通知监听线程更新配置。
private void initRpcClientHandler(final RpcClient rpcClientInner) {
/*
* Register Config Change /Config ReSync Handler
*/
rpcClientInner.registerServerRequestHandler((request) -> {
// 1. 判断请求类型
if (request instanceof ConfigChangeNotifyRequest) {
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
// 2. 计算配置标识
String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
// 3. 获得缓存信息
CacheData cacheData = cacheMap.get().get(groupKey);
if (cacheData != null) {
synchronized (cacheData) {
cacheData.getLastModifiedTs().set(System.currentTimeMillis());
// 4. 设为与服务端不同步
cacheData.setSyncWithServer(false);
// 5. 通知配置监听
notifyListenConfig();
}
}
return new ConfigChangeNotifyResponse();
}
return null;
});
// ...
}
notifyListenConfig()
方法本身并没有去调用Listener
,也没有去更新配置。实际上,Nacos Client 是定时去获取服务端发生变化的配置,然后更新配置并回调Listener
,notifyListenConfig()
只是立刻解除配置监听线程的阻塞状态。listenExecutebell
是一个容量为 1 的BlockingQueue
。
@Override
public void notifyListenConfig() {
listenExecutebell.offer(bellItem);
}
在ConfigRpcTransportClient
的startInternal()
方法中,监听线程会在listenExecutebell
上至多阻塞 5 秒,其会循环执行executeConfigListen()
方法检查配置状态。notifyListenConfig()
会立刻解除当前线程的阻塞状态。
@Override
public void startInternal() {
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
continue;
}
executeConfigListen();
} catch (Throwable e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
在executeConfigListen()
方法中,会遍历所有的CacheData
并按照CacheData
的taskId
进行分组,最终组装成一个listenCachesMap
。然后遍历listenCachesMap
的CacheData
列表,用listenCaches
生成ConfigBatchListenRequest
请求,查询其中的配置是否变更。
Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
// ...
List<CacheData> listenCaches = entry.getValue();
// ...
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
// ...
服务端返回ConfigChangeBatchListenResponse
响应,从响应中取出发生变化的配置标识changeKey
,执行refreshContentAndCheck()
方法。
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
refreshContentAndCheck(changeKey);
refreshContentAndCheck()
会取出配置标识的缓存,然后调用refreshContentAndCheck()
方法的重载。
private void refreshContentAndCheck(String groupKey) {
CacheData cache = cacheMap.get().get(groupKey);
if (cache != null) {
boolean notify = !cache.isInitializing();
refreshContentAndCheck(cache, notify);
}
}
重载方法如下。会从服务端拉取配置信息并更新缓存,然后执行CacheData
的checkListenerMd5()
方法。
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
// 1. 从服务端获取配置信息
ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
notify);
// 2. 更新缓存内容
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
cacheData.setContent(response.getContent());
if (null != response.getConfigType()) {
cacheData.setType(response.getConfigType());
}
if (notify) {
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
ContentUtils.truncateContent(response.getContent()), response.getConfigType());
}
// 3. 检查 md5
cacheData.checkListenerMd5();
} catch (Exception e) {
LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
cacheData.group, cacheData.tenant, e);
}
}
checkListenerMd5()
会检查当前配置内容的 md5 与上一次调用Listener
时的 md5 是否相同,如果不同则说明配置发生了变化,需要调用Listener
。
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
最终在safeNotifyListener()
中执行了回调,如果Listener
有线程池就在其线程池中执行, 否则在 CacheData 的 INTERNAL_NOTIFIER
线程池中中执行。INTERNAL_NOTIFIER
是一个静态变量,所有CacheData
共享。
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = () -> {
listener.receiveConfigInfo(contentTmp); // 回调
listenerWrap.lastCallMd5 = md5; // 更新 md5
}
try {
if (null != listener.getExecutor()) {
// 如果 Listener 有线程池就在其线程池中执行
listener.getExecutor().execute(job);
} else {
try {
// 否则在 CacheData 的 INTERNAL_NOTIFIER 中执行。
INTERNAL_NOTIFIER.submit(job);
} catch (RejectedExecutionException rejectedExecutionException) {
// ...
job.run();
} catch (Throwable throwable) {
// ...
job.run();
}
}
} catch (Throwable t) {
// ...
}
}
static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), internalNotifierFactory);
4. 总结
- Nacos Client 通过轮询的方式查询服务端的配置内容是否发生变更。
- 当服务端配置变更后,也会向客户端主动推送一个通知,该通知会使客户端立刻检查配置是否变更。
- 客户端通过对比当前配置的 md5 与上一次调用
Listener
时的 md5 是否相同,来判断是否需要调用Listener
。