前言:上一篇我们分析 Nacos 配置中心服务端源码的时候,多次看到有去读取本地配置文件,那本地配置文件是何时加载的?本篇我们来进行详细分析。

Nacos 本地配置的加载无疑肯定是 Nacos Server 启动时候加载的,Nacos 本地配置的加载和 DumpService 有莫大的关系,翻看源码可以看到 DumpService 是一个抽象类,它有两个子类,分别是 EmbeddedDumpService 和 ExternalDumpService,接下来我们将根据这两个类来展开分析。


EmbeddedDumpService 和 ExternalDumpService 源码

关于 EmbeddedDumpService 和 ExternalDumpService 类,这里我们先做一个初步认识,各自动的功能及源码如下:

  • EmbeddedDumpService : 是用本地存储的处理类,本存储是基于 derby 数据库, 是一种内嵌式数据库,和 JVM 共享内存。
  • ExternalDumpService :使用外部存储的处理类,比如集群情况下使用 MySQL 做存储。
//本地存储 基于 derby 数据库 内嵌式数据库
public class EmbeddedDumpService extends DumpService {

//外部存储 mysql
public class ExternalDumpService extends DumpService {

ExternalDumpService#init 方法源码解析

ExternalDumpService#init 方法被 @PostConstruct 注解修饰,因此 init方法会在 ExternalDumpService 类实例化后执行,而该方法有调用了 DumpService#dumpOperate 方法。

protected void init() throws Throwable {
	dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);

EmbeddedDumpService#init 方法源码解析

EmbeddedDumpService#init 方法会判断是 Nacos 是单机模式还是集群模式,如果是单机模式,会直接调用 DumpService#dumpOperate 方法,完成配置文件加载到本地,如果是集群模式,默认获取 CP 协议,然后会先观察 Leader 节点是否有配置值,有值才会直接调用 DumpService#dumpOperate 方法,完成配置文件加载到本地。

protected void init() throws Throwable {
	if (EnvUtil.getStandaloneMode()) {
		//是  调用 DumpService#dumpOperate 方法
		dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
	//走到这里表示不是单机模式 也就是是集群模式
	//获取 CP 协议
	CPProtocol protocol = protocolManager.getCpProtocol();
	AtomicReference<Throwable> errorReference = new AtomicReference<>(null);
	//等待文件转储完成的 CountDownLatch
	CountDownLatch waitDumpFinish = new CountDownLatch(1);
	// watch path => /nacos_config/leader/ has value ?
	//观察 leader 总的 nacos_config 是否有值
	Observer observer = new Observer() {
		public void update(Observable o) {
			if (!(o instanceof ProtocolMetaData.ValueItem)) {
			final Object arg = ((ProtocolMetaData.ValueItem) o).getData();
			GlobalExecutor.executeByCommon(() -> {
				// must make sure that there is a value here to perform the correct operation that follows
				//为空 判断
				if (Objects.isNull(arg)) {
				// Identify without a timeout mechanism
				EmbeddedStorageContextUtils.putExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA, "true");
				// Remove your own listening to avoid task accumulation
				boolean canEnd = false;
				//自旋 重试读取数据
				for (; ; ) {
					try {
						//调用 DumpService#dumpOperate 方法
						dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
								.unSubscribe(Constants.CONFIG_MODEL_RAFT_GROUP, MetadataKey.LEADER_META_DATA, this);
						canEnd = true;
					} catch (Throwable ex) {
						if (!shouldRetry(ex)) {
							canEnd = true;
					//标识改变 结束自旋
					if (canEnd) {

			.subscribe(Constants.CONFIG_MODEL_RAFT_GROUP, MetadataKey.LEADER_META_DATA, observer);
	// We must wait for the dump task to complete the callback operation before
	// continuing with the initialization
	// If an exception occurs during the execution of the dump task, the exception
	// needs to be thrown, triggering the node to start the failed process
	final Throwable ex = errorReference.get();
	if (Objects.nonNull(ex)) {
		throw ex;

DumpService#dumpOperate 方法源码解析

DumpService#dumpOperate 方法的作用是把 Nacos 配置信息转储到本地文件中,主要做了一下操作:

  • 创建导出任务,包括配置信息、beta、tag 任务。
  • 清除历史配置信息(本地存储和外部存储会调用不通的方法处理),采用了分页处理的方式,一次处理 1000 条。
  • 转储配置信息,也就是将配置信息写入到本地文件中(重点关注)。
  • 更新 beta、tag 缓存。
  • 异步线程 10个为一组合并配置信息数据。
  • 集群模式保存心跳文件到本地磁盘,然后启动三个定时任务,分别去更新配置信息、beta、tag。

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
		DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
	String dumpFileContext = "CONFIG_DUMP_TO_FILE";
	try {
		LogUtil.DEFAULT_LOG.warn("DumpService start");
		Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
		Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
		Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
		Runnable clearConfigHistory = () -> {
			LOGGER.warn("clearConfigHistory start");
			//本地存储 单机模式默认 true 集群模式只有 leader 节点才可以执行
			//外部存储 本机可执行
			if (canExecute()) {
				try {
					//获取 前6个小时的时间戳
					Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
					int totalCount = persistService.findConfigHistoryCountByTime(startTime);
					if (totalCount > 0) {
						int pageSize = 1000;
						int removeTime = (totalCount + pageSize - 1) / pageSize;
								"clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}",
								startTime, totalCount, pageSize, removeTime);
						while (removeTime > 0) {
							// delete paging to avoid reporting errors in batches
							persistService.removeConfigHistory(startTime, pageSize);
				} catch (Throwable e) {
					LOGGER.error("clearConfigHistory error : {}", e.toString());
		try {
			//转储配置信息 重点关注
			// update Beta cache
			//更新 beta 缓存
			LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
			if (persistService.isExistTable(BETA_TABLE_NAME)) {
				dumpAllBetaProcessor.process(new DumpAllBetaTask());
			// update Tag cache
			//更新 tag 缓存
			LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
			if (persistService.isExistTable(TAG_TABLE_NAME)) {
				dumpAllTagProcessor.process(new DumpAllTagTask());
			// add to dump aggr
			List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
			if (configList != null && !configList.isEmpty()) {
				total = configList.size();
				//每 10 个为一组
				List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
				for (List<ConfigInfoChanged> list : splitList) {
					MergeAllDataWorker work = new MergeAllDataWorker(list);
				LOGGER.info("server start, schedule merge end.");
		} catch (Exception e) {
					.error("Nacos Server did not start because dumpservice bean construction failure :\n" + e
			throw new NacosException(NacosException.SERVER_ERROR,
					"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
		if (!EnvUtil.getStandaloneMode()) {
			Runnable heartbeat = () -> {
				String heartBeatTime = TimeUtils.getCurrentTime().toString();
				// write disk
				try {
				} catch (IOException e) {
					LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
			ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
			long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
			LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
			//首次延迟随机时间 后每6小时保存一次所有配置文件
			ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
			//首次延迟随机时间 后每6小时保存一次所有Beta缓存
					.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
			//首次延迟随机时间 后每6小时保存一次配置标签缓存
					.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
		//延迟10 分钟 每10分钟执行一次清除配置历史信息
		ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
	} finally {
		TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);

DumpService#dumpConfigInfo 方法源码解析

DumpService#dumpConfigInfo 方法主要判断是否需要全量转储配置文件,如果最后一次全量转储的事件戳小于6小时,则不需要全量转储,否则全量转储配置文件,我们重点关注 process 方法。

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
	int timeStep = 6;
	Boolean isAllDump = true;
	// initial dump all
	FileInputStream fis = null;
	Timestamp heartheatLastStamp = null;
	try {
		if (isQuickStart()) {
			File heartbeatFile = DiskUtil.heartBeatFile();
			if (heartbeatFile.exists()) {
				fis = new FileInputStream(heartbeatFile);
				String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
				heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
				if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime()
						< timeStep * 60 * 60 * 1000) {
					isAllDump = false;
		//如果最后一次全量转储的事件戳小于6小时 则不需要全量转储
		if (isAllDump) {
			LogUtil.DEFAULT_LOG.info("start clear all config-info.");
			dumpAllProcessor.process(new DumpAllTask());
		} else {
			Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep);
			DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp,
			dumpChangeProcessor.process(new DumpChangeTask());
			//每12 小时执行一次 MD5 值比较
			Runnable checkMd5Task = () -> {
				LogUtil.DEFAULT_LOG.error("start checkMd5Task");
				List<String> diffList = ConfigCacheService.checkMd5();
				for (String groupKey : diffList) {
					String[] dg = GroupKey.parseKey(groupKey);
					String dataId = dg[0];
					String group = dg[1];
					String tenant = dg[2];
					ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
					ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(),
				LogUtil.DEFAULT_LOG.error("end checkMd5Task");
			ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);
	} catch (IOException e) {
		LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
		throw e;
	} finally {
		if (null != fis) {
			try {
			} catch (IOException e) {
				LogUtil.DEFAULT_LOG.warn("close file failed");

DumpAllProcessor#process 方法源码解析

DumpAllProcessor#process 方法全量转储配置文件到本次磁盘,这里也会采用分页模式处理,一次处理 1000 条数据,并且地白名单进行了处理,最终调用 ConfigCacheService#dump 方法完成文件转储。

public boolean process(NacosTask task) {
	long currentMaxId = persistService.findConfigMaxId();
	long lastMaxId = 0;
	while (lastMaxId < currentMaxId) {
		//分页查询配置信息 一次 1000条
		Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
		if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
			for (ConfigInfoWrapper cf : page.getPageItems()) {
				long id = cf.getId();
				//比较配置id 赋值
				lastMaxId = id > lastMaxId ? id : lastMaxId;
				if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
				if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
				if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
				boolean result = ConfigCacheService
						.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),
				final String content = cf.getContent();
				final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
				LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
						GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
			DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
		} else {
			lastMaxId += PAGE_SIZE;
	return true;

DumpChangeProcessor#process 方法源码解析

DumpChangeProcessor#process 方法是转储部分配置信息的实现方法,主要做了一下几件事情:

  • 找出更新的配置信息,发布 LocalDataChangeEvent 事件。
  • 找出需要删除的配置信息,删除并发布 LocalDataChangeEvent 事件。
  • 对修改了的配置信息调用 ConfigCacheService#dumpChange 方法进行转储操作,并刷新配置信息。
public boolean process(NacosTask task) {
	LogUtil.DEFAULT_LOG.warn("quick start; startTime:{},endTime:{}", startTime, endTime);
	LogUtil.DEFAULT_LOG.warn("updateMd5 start");
	//更新 md5
	long startUpdateMd5 = System.currentTimeMillis();
	List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5();
	LogUtil.DEFAULT_LOG.warn("updateMd5 count:{}", updateMd5List.size());
	for (ConfigInfoWrapper config : updateMd5List) {
		final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());
		//执行更新 发布 LocalDataChangeEvent 事件
		ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified());
	long endUpdateMd5 = System.currentTimeMillis();
	LogUtil.DEFAULT_LOG.warn("updateMd5 done,cost:{}", endUpdateMd5 - startUpdateMd5);
	LogUtil.DEFAULT_LOG.warn("deletedConfig start");
	long startDeletedConfigTime = System.currentTimeMillis();
	List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime);
	LogUtil.DEFAULT_LOG.warn("deletedConfig count:{}", configDeleted.size());
	for (ConfigInfo configInfo : configDeleted) {
		if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant())
				== null) {
			ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());
	long endDeletedConfigTime = System.currentTimeMillis();
	LogUtil.DEFAULT_LOG.warn("deletedConfig done,cost:{}", endDeletedConfigTime - startDeletedConfigTime);
	LogUtil.DEFAULT_LOG.warn("changeConfig start");
	final long startChangeConfigTime = System.currentTimeMillis();
	List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime);
	LogUtil.DEFAULT_LOG.warn("changeConfig count:{}", changeConfigs.size());
	for (ConfigInfoWrapper cf : changeConfigs) {
		//修改了的配置 执行 dump 操作
		boolean result = ConfigCacheService
				.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
		final String content = cf.getContent();
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
		LogUtil.DEFAULT_LOG.info("[dump-change-ok] {}, {}, length={}, md5={}",
				new Object[] {GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(),
						content.length(), md5});
	long endChangeConfigTime = System.currentTimeMillis();
	LogUtil.DEFAULT_LOG.warn("changeConfig done,cost:{}", endChangeConfigTime - startChangeConfigTime);
	return true;

ConfigCacheService#dump 方法源码解析

ConfigCacheService#dump方法会获取写锁,来保证线程安全和不被重复操作,获取锁成功后,则会获取 MD5 值进行比较,如果 MD5 值一致且配置文件存在,不做处理,否则会判断是否是本地读取,如果是将配置信息写入本地磁盘,更新 MD5值,发布 LocalDataChangeEvent 事件,并释放锁。

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
		String type) {
	//获取 group
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	//groupKey 存在则更新 不存在加入到 CACHE
	CacheItem ci = makeSure(groupKey);
	final int lockResult = tryWriteLock(groupKey);
	assert (lockResult != 0);
	if (lockResult < 0) {
		DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
		return false;
	try {
		//获取 md5 值
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
		if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {
			//md5 值一样 且文件存在
			DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
							+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
		} else if (!PropertyUtil.isDirectRead()) {
			//进入 表示不是单机模式 也不是内嵌数据库
			DiskUtil.saveToDisk(dataId, group, tenant, content);
		//更新 md5 值 发布 LocalDataChangeEvent 事件
		updateMd5(groupKey, md5, lastModifiedTs);
		return true;
	} catch (IOException ioe) {
		DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
		if (ioe.getMessage() != null) {
			String errMsg = ioe.getMessage();
			if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
					.contains(DISK_QUATA_EN)) {
				// Protect from disk full.
				FATAL_LOG.error("磁盘满自杀退出", ioe);
		return false;
	} finally {

ConfigCacheService#dumpChange 方法源码解析

ConfigCacheService#dumpChange 方法会获取写锁,来保证线程安全和不被重复操作,获取锁成功后,会判断是否是本地读取,如果是则会获取 MD5 值进行比较,如果 MD5 值一致,不做处理,否则会将配置信息写入本地磁盘,更新会 MD5值,发布 LocalDataChangeEvent 事件,并释放锁。

public static boolean dumpChange(String dataId, String group, String tenant, String content, long lastModifiedTs) {
	final String groupKey = GroupKey2.getKey(dataId, group, tenant);
	//更新或者加入缓存 CACHE
	final int lockResult = tryWriteLock(groupKey);
	//获取锁结果不为 0  直接返回
	assert (lockResult != 0);
	if (lockResult < 0) {
		//获取锁结果小于 0  返回失败
		DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
		return false;
	try {
		//md5 值
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
		//单机模式 且内嵌数据库
		if (!PropertyUtil.isDirectRead()) {
			//进入 表示不是单机模式 也不是内嵌数据库
			//获取本地配置 md5
			String localMd5 = DiskUtil.getLocalConfigMd5(dataId, group, tenant);
			if (md5.equals(localMd5)) {
				//相等表示没有变化 不处理
				DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
								+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
			} else {
				//MD5 不相等 则保存到磁盘中
				DiskUtil.saveToDisk(dataId, group, tenant, content);
		//更新 MD5 发布 LocalDataChangeEvent 事件
		updateMd5(groupKey, md5, lastModifiedTs);
		return true;
	} catch (IOException ioe) {
		DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
		return false;
	} finally {

至此,Nacos 配置中心配置何时加载到本地磁盘上的源码分析完毕,希望可以帮助到有需要的朋友。


