首页 > 编程语言 >Nacos源码 (4) 配置中心

Nacos源码 (4) 配置中心

时间:2023-08-21 21:01:26浏览次数:47  
标签:dataId group String 配置 Nacos 源码 new properties

本文阅读nacos-2.0.2的config源码,编写示例,分析推送配置、监听配置的原理。

客户端

创建NacosConfigService对象

Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_HOST);
NacosConfigService configService = new NacosConfigService(properties);

构造方法:

public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    
    initNamespace(properties);
    this.configFilterChainManager = new ConfigFilterChainManager(properties);
    ServerListManager serverListManager = new ServerListManager(properties);
    serverListManager.start();
    
    this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
    // will be deleted in 2.0 later versions
    agent = new ServerHttpAgent(serverListManager);
}
  1. 创建ConfigFilterChainManager - 过滤器链
  2. 创建ServerListManager - 服务器列表管理
  3. 创建ClientWorker - 用来发送请求,内部封装了一个ConfigRpcTransportClient类型对象agent,它能够获取到RpcClient与服务端进行通信

推送配置

Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_HOST);

NacosConfigService configService = new NacosConfigService(properties);

StringWriter out = new StringWriter();
properties.store(out, "test config");

// 推送配置到nacos服务器
configService.publishConfig(
    ORDER_SERVICE, Constants.DEFAULT_GROUP, out.toString(), "properties");

推送配置到nacos服务器:

public boolean publishConfig(String dataId,
                             String group,
                             String content,
                             String type) throws NacosException {
    return publishConfigInner(namespace, dataId, group, null, null, null, content, type, null);
}

private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName,
        String betaIps, String content, String type, String casMd5) throws NacosException {
    group = blank2defaultGroup(group);
    ParamUtils.checkParam(dataId, group, content);

    ConfigRequest cr = new ConfigRequest();
    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);
    cr.setContent(content);
    cr.setType(type);
    configFilterChainManager.doFilter(cr, null);
    content = cr.getContent();
    String encryptedDataKey = (String) cr.getParameter("encryptedDataKey");

    return worker.publishConfig(
        dataId, group, tenant, appName, tag, betaIps, content, encryptedDataKey, casMd5, type);
}

worker使用agent推送配置:

// 1. 封装ConfigPublishRequest对象
ConfigPublishRequest request = new ConfigPublishRequest(dataId, group, tenant, content);
request.setCasMd5(casMd5);
request.putAdditionalParam(TAG_PARAM, tag);
request.putAdditionalParam(APP_NAME_PARAM, appName);
request.putAdditionalParam(BETAIPS_PARAM, betaIps);
request.putAdditionalParam(TYPE_PARAM, type);
request.putAdditionalParam(ENCRYPTED_DATA_KEY_PARAM, encryptedDataKey);
// 2. 获取RpcClient对象
// 3. 使用RpcClient发请求
ConfigPublishResponse response = (ConfigPublishResponse) requestProxy(getOneRunningClient(), request);

监听配置

Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_HOST);

NacosConfigService configService = new NacosConfigService(properties);

// 添加监听器
configService.addListener(ORDER_SERVICE, Constants.DEFAULT_GROUP, new AbstractListener() {
  @Override
  public void receiveConfigInfo(String configInfo) {
    System.out.printf(">> config: \n%s\n\n", configInfo);
  }
});
  1. 将监听器注册到本地,本地使用CacheData作为监听器管理器,封装配置名、dataId和监听器集合,内部使用CopyOnWriteArrayList保存监听器,如果是第一次监听,会先拉取一次配置。本地注册表为Map结构,使用dataId+group+tenant作为key,value是CacheData对象
  2. 在client初始化阶段,会注册一个ServerRequestHandler,专门处理服务器端的ConfigChangeNotifyRequest请求,该请求只会推送变化了的配置的基本信息,而不包括内容,所以此处还会触发一次ConfigBatchListenRequest请求
  3. 之后查找到变化的配置后,再发一个查询请求拉取配置

服务端

推送配置处理器

ConfigPublishRequestHandler处理器

配置中心使用ConfigPublishRequestHandler类处理客户端配置推送请求:

  1. 验证请求参数

  2. 封装configAdvanceInfo配置扩展信息

  3. 创建ConfigInfo封装配置信息

  4. 使用持久层对象insert或者update配置

  5. 使用ConfigChangePublisher推送一个ConfigDataChangeEvent事件

    ConfigChangePublisher.notifyConfigChange(
        new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
    

ConfigDataChangeEvent事件处理

ConfigDataChangeEvent事件会触发数据dump操作和集群同步操作。

此处先介绍数据dump操作:

dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
    syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());

dumpService是Dump data service用于备份数据:

// Add DumpTask to TaskManager, it will execute asynchronously.
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    // 推送一个DumpTask
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
}

DumpTask将使用DumpProcessor类处理:

  1. 把数据写到磁盘
  2. 推送LocalDataChangeEvent事件

LocalDataChangeEvent事件会被以下几个类处理:

  • LongPollingService$1会触发DataChangeTask,响应长轮询订阅者
  • RpcConfigChangeNotifier将数据变化推送给rpc订阅者
  • InternalConfigChangeNotifier处理内部配置文件变化

监听配置处理器

ConfigChangeBatchListenRequestHandler处理器

处理客户端的配置监听请求:

public class ConfigChangeBatchListenRequestHandler
        extends RequestHandler<ConfigBatchListenRequest, ConfigChangeBatchListenResponse> {

    @Autowired
    private ConfigChangeListenContext configChangeListenContext;

    @TpsControl(pointName = "ConfigListen")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public ConfigChangeBatchListenResponse handle(
            ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta)
            throws NacosException {
        String connectionId = StringPool.get(meta.getConnectionId());
        String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG);

        ConfigChangeBatchListenResponse configChangeBatchListenResponse = 
            new ConfigChangeBatchListenResponse();
        for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest
                .getConfigListenContexts()) {
            String groupKey = GroupKey2
                    .getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
            groupKey = StringPool.get(groupKey);

            String md5 = StringPool.get(listenContext.getMd5());

            // 监听
            if (configChangeListenRequest.isListen()) {
                // 注册监听器,维护groupKey->connectionId集关系和groupKey->md5关系
                configChangeListenContext.addListen(groupKey, md5, connectionId);
                // 判断变化
                boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
                if (!isUptoDate) {
                    // 把变化的配置基本信息添加到响应
                    configChangeBatchListenResponse
                        .addChangeConfig(listenContext.getDataId(), listenContext.getGroup(),
                            listenContext.getTenant());
                }
            } else {
                // 取消监听
                configChangeListenContext.removeListen(groupKey, connectionId);
            }
        }

        return configChangeBatchListenResponse;
    }
}

标签:dataId,group,String,配置,Nacos,源码,new,properties
From: https://www.cnblogs.com/xugf/p/17647070.html

相关文章

  • A018 《BGM计算器》编程 源码
    一、课程介绍本节课学习新的数据类型float,结合if嵌套和比较运算符,实现一个BMI指数计算器。二、重难点解析浮点数float浮点数其实就是小数,使用float()方法可以把整数和内容为数字的字符串转换为浮点数。比较运算符在Python中,大于、小于等符号叫做比较运算符。比较运算经常......
  • qt 配置过程记录
    1. source命令用于执行被修改的配置文件,使最新配置更新到操作系统 通常有如下命令source~/.profilesource~/.bash_profilesource/etc/profile如果碰到sudo:source:commandnotfound问题,主要是因为系统中的很多命令需要声明式配置,才能在终端调用这些命令。解决方法:su......
  • Apipost中自定义接口字段如何配置
    Apipost项目设置中可以配置接口文档中的自定义接口字段,创建状态码字典。分享分档时会展示到文档页面状态码字典在状态码字典中可以自定义状态码即其含义自定义的状态码会在分享的API文档中展示接口属性接口属性中可以自定义接口和接口文档展示字段,在接口属性中添加一个时间类型字段......
  • Apipost中自定义接口字段如何配置
    Apipost项目设置中可以配置接口文档中的自定义接口字段,创建状态码字典。分享分档时会展示到文档页面状态码字典在状态码字典中可以自定义状态码即其含义 自定义的状态码会在分享的API文档中展示 接口属性接口属性中可以自定义接口和接口文档展示字段,在接口属性中添加一......
  • Albert 源码解析:分组复用
    classAlbertGroup(nn.Module):def__init__(self,config):super(AlbertGroup,self).__init__()self.inner_group_num=config.inner_group_numself.inner_group=nn.ModuleList([AlbertLayer(config)for_inrange(config.inner_group......
  • EventBus源码再分析
    一、概述EventBus是一个开源的用于Android和Java上的一个:订阅--->发布事件总线。优点:1.只要是在一个JVM内,就可以实现通信2.小巧灵活、不占内存3.解耦,切换线程灵活4.库小,不占内存缺点:1.注册和反注册时一对,如果忘记了......
  • linux上SQL Server 配置管理器的使用
    概述我们知道Windows平台上的SQLServer配置管理器是一个图形工具,用于管理与SQLServer关联的服务、配置SQLServer使用的网络协议以及管理SQLServer客户端计算机的网络连接配置。我们还可以使用SQLServer配置管理器来启动、暂停、恢复或停止服务,查看服务属性或更改服务......
  • centos7 DHCP服务器配置
    dhcp服务配置过程1、安装包(插入光盘下载或者上一篇的网络下载都可以)安装: 挂载光盘:mount /dev/cdrom /media     cd /media/Packages   rpm -ivh dhcp-4.....(Tab)2、修改主配置文件:vim /etc/dhcp/dhcpd.conf //打开配置文件subnet20.0.0.0netmask255.......
  • ASP.NET版LIMS系统源码 实验室信息管理系统
    实验室信息管理系统(LaboratoryInformationManagementSystem)简称LIMS系统,是指通过计算机对实验室的各种信息进行管理的计算机软、硬件系统,并将实验室的设备各种信息通过计算机网络连接起来,采用科学的管理思想和先进的数据库技术,实现以实验室为核心,集检验业务管理、检测资源管理、......
  • 从头配置阿里云服务器
    哈喽,大家早上好,愉快的一天又开始了。今天我们就不去抠繁琐的源码和枯燥的原理了,跟大家聊点轻松愉快的话题。心血来潮去年双十一的时候,趁着搞活动买了台阿里云服务器。想着以后自己可以折腾一下,搭建个博客或者搞个小服务玩玩,但是买完之后就把计划搁置了。前几周突然想起来之前搞过一......