首页 > 编程语言 >Nacos Client 源码分析(三)回调机制

Nacos Client 源码分析(三)回调机制

时间:2023-05-13 21:48:43浏览次数:40  
标签:dataId group String Nacos final Listener Client cacheData 源码

文本使用的 Nacos 版本为 2.2.2

1. 概述

通过前两篇文章,我们已经大致了解了 Nacos Client 的服务订阅流程以及 EventListener 的回调机制。然而,Nacos Client 对配置信息的变更的监听,并没有采用和服务订阅相同的基于事件的发布订阅模式,而是使用了另一种监听机制。下面进行简要分析。

2. 添加 Listener

采用如下代码,添加一个Listener动态监听配置变更。

@Test
    public void addListenerTest() throws NacosException, InterruptedException {
        String dataId = "mysql";
        String group = "test";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println(configInfo);
            }
        });

        while (true) {
            Thread.sleep(10000);
        }
    }

addListener()方法在 NacosConfigService中如下所示。worker是一个ClientWorker类。

   @Override
    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }

addTenantListeners()方法如下。这里有两个类需要关注,一个是CacheDate,一个是ConfigRpcTransportClient也就是agent

    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
            throws NacosException {
        group = blank2defaultGroup(group);
        String tenant = agent.getTenant();
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        synchronized (cache) {
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setDiscard(false);
            cache.setSyncWithServer(false);
            agent.notifyListenConfig();
        }
    }

CacheDate缓存了服务端配置信息,同时管理对该配置进行监听的所有Listener,部分字段如下所示。

public class CacheData {
    // 配置标识
    public final String dataId;
    public final String group;
    public final String tenant;
    // 监听者集合
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    // 配置内容缓存
    private volatile String content;
    // 当前配置的 md5
    private volatile String md5;
}

addTenantListeners()会先尝试创建CacheDate,然后将Listener加入CacheDate管理。ClientWorker内部维护了一个cacheMap来管理所有的CacheDate,键为由(dataId, group, tenant)构成的配置标识字符串。

private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<>(new HashMap<>());

2. 配置监听

ConfigRpcTransportClient是在ClientWork中定义的一个内部类,其继承了ConfigTransportClient抽象类。ConfigRpcTransportClient在内部定义了对配置变更请求的处理器。当一个请求是ConfigChangeNotifyRequest时,就会从cacheMap中取出缓存,若缓存存在则将其设为与服务不同步并通知监听线程更新配置。

        private void initRpcClientHandler(final RpcClient rpcClientInner) {
            /*
             * Register Config Change /Config ReSync Handler
             */
            rpcClientInner.registerServerRequestHandler((request) -> {
                // 1. 判断请求类型
                if (request instanceof ConfigChangeNotifyRequest) {
                    ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
                    LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
                            rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
                            configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
                    // 2. 计算配置标识
                    String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),
                            configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
                    // 3. 获得缓存信息
                    CacheData cacheData = cacheMap.get().get(groupKey);
                    if (cacheData != null) {
                        synchronized (cacheData) {
                            cacheData.getLastModifiedTs().set(System.currentTimeMillis());
                            // 4. 设为与服务端不同步
                            cacheData.setSyncWithServer(false);
                            // 5. 通知配置监听
                            notifyListenConfig();
                        }
                    }
                    return new ConfigChangeNotifyResponse();
                }
                return null;
            });
            // ...
    }

notifyListenConfig()方法本身并没有去调用Listener,也没有去更新配置。实际上,Nacos Client 是定时去获取服务端发生变化的配置,然后更新配置并回调ListenernotifyListenConfig()只是立刻解除配置监听线程的阻塞状态。listenExecutebell是一个容量为 1 的BlockingQueue

        @Override
        public void notifyListenConfig() {
            listenExecutebell.offer(bellItem);
        }

ConfigRpcTransportClientstartInternal()方法中,监听线程会在listenExecutebell上至多阻塞 5 秒,其会循环执行executeConfigListen()方法检查配置状态。notifyListenConfig()会立刻解除当前线程的阻塞状态。

        @Override
        public void startInternal() {
            executor.schedule(() -> {
                while (!executor.isShutdown() && !executor.isTerminated()) {
                    try {
                        listenExecutebell.poll(5L, TimeUnit.SECONDS);
                        if (executor.isShutdown() || executor.isTerminated()) {
                            continue;
                        }
                        executeConfigListen();
                    } catch (Throwable e) {
                        LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
                    }
                }
            }, 0L, TimeUnit.MILLISECONDS);
        }

executeConfigListen()方法中,会遍历所有的CacheData并按照CacheDatataskId进行分组,最终组装成一个listenCachesMap。然后遍历listenCachesMapCacheData列表,用listenCaches生成ConfigBatchListenRequest请求,查询其中的配置是否变更。

    Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
    for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
        // ...
        List<CacheData> listenCaches = entry.getValue();
        // ...
        ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
        // ...

服务端返回ConfigChangeBatchListenResponse响应,从响应中取出发生变化的配置标识changeKey,执行refreshContentAndCheck()方法。

     ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
    refreshContentAndCheck(changeKey);

refreshContentAndCheck()会取出配置标识的缓存,然后调用refreshContentAndCheck()方法的重载。

    private void refreshContentAndCheck(String groupKey) {
        CacheData cache = cacheMap.get().get(groupKey);
        if (cache != null) {
            boolean notify = !cache.isInitializing();
            refreshContentAndCheck(cache, notify);
        }
    }

重载方法如下。会从服务端拉取配置信息并更新缓存,然后执行CacheDatacheckListenerMd5()方法。

    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
        try {
            // 1. 从服务端获取配置信息
            ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                    notify);
            // 2. 更新缓存内容
            cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
            cacheData.setContent(response.getContent());
            if (null != response.getConfigType()) {
                cacheData.setType(response.getConfigType());
            }
            if (notify) {
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                        ContentUtils.truncateContent(response.getContent()), response.getConfigType());
            }
            // 3. 检查 md5
            cacheData.checkListenerMd5();
        } catch (Exception e) {
            LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                    cacheData.group, cacheData.tenant, e);
        }
    }

checkListenerMd5()会检查当前配置内容的 md5 与上一次调用Listener时的 md5 是否相同,如果不同则说明配置发生了变化,需要调用Listener

    void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
            }
        }
    }

最终在safeNotifyListener()中执行了回调,如果Listener有线程池就在其线程池中执行, 否则在 CacheData 的 INTERNAL_NOTIFIER线程池中中执行。INTERNAL_NOTIFIER是一个静态变量,所有CacheData共享。

    private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
            final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;
        Runnable job = () -> {
            listener.receiveConfigInfo(contentTmp); // 回调
            listenerWrap.lastCallMd5 = md5; // 更新 md5
        }
        try {
            if (null != listener.getExecutor()) {
                // 如果 Listener 有线程池就在其线程池中执行
                listener.getExecutor().execute(job);
            } else {
                try {
                    // 否则在 CacheData 的 INTERNAL_NOTIFIER 中执行。
                    INTERNAL_NOTIFIER.submit(job);
                } catch (RejectedExecutionException rejectedExecutionException) {
                    // ...
                    job.run();
                } catch (Throwable throwable) {
                    // ...
                    job.run();
                }
            }
        } catch (Throwable t) {
            // ...
        }
    }
static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), internalNotifierFactory);

4. 总结

  1. Nacos Client 通过轮询的方式查询服务端的配置内容是否发生变更。
  2. 当服务端配置变更后,也会向客户端主动推送一个通知,该通知会使客户端立刻检查配置是否变更。
  3. 客户端通过对比当前配置的 md5 与上一次调用Listener时的 md5 是否相同,来判断是否需要调用Listener

标签:dataId,group,String,Nacos,final,Listener,Client,cacheData,源码
From: https://www.cnblogs.com/dalelee/p/17397707.html

相关文章

  • 【毕业设计】基于ssm的零售销售零食网站、零售在线商城管理系统,附源码+文档+PPT
    1、项目介绍该系统可供管理员和用户使用,管理员功能包括:登录、首页、系统设置、用户管理、业务管理、统计分析、个人信息、密码、退出等功能。用户功能包括:登录、注册、首页、资讯信息、商品列表、在线留言、购物车、个人中心、退出等功能。项目获取,看这里2、技术框架前端框架......
  • instantclient12&plsql12下载、安装及配置
    一、oracle11gclient下载安装及配置1.1下载instantclient下载链接根据操作系统和机器位数选择相应的下载链接1.2安装及配置1.在E盘创建文件夹比如oracle_client,将下载的压缩包解压到刚新建的文件夹中(E:\oralce_client\instantclient_11_2)。解压好就算安装完成2.在当前......
  • MT4 CRM 源码
    一套MT4CRM源码,同时支持MT4进行对接使用,支持代理返佣自由进行设置,可自动实时同步manager后台分组、交易品种和客户所有信息。包括带有内部实时内转功能,支持任何第三方支付、区块链和电子钱包。整套系统功能齐全。可节约公司大量租用成本和防止第三方公司泄露客户资料等核心数据......
  • Python源码怎么运行?
    要运行Python源码,您需要安装Python解释器。Python解释器是一种软件,它可以读取Python源代码并将其转换为计算机可以理解和执行的指令。在Windows操作系统上运行Python源代码的步骤:在您的计算机上下载并安装Python解释器。您可以从Python官方网站(https://www.python.org/downloads/)下......
  • Python源码怎么运行?
    要运行Python源码,您需要安装Python解释器。Python解释器是一种软件,它可以读取Python源代码并将其转换为计算机可以理解和执行的指令。在Windows操作系统上运行Python源代码的步骤:在您的计算机上下载并安装Python解释器。您可以从Python官方网站(https://www.python.org/downloads/)下......
  • java基于springboot+vue的农机电招平台、农机租赁管理系统,附源码+数据库+文档+PPT,适合
    1、项目介绍该系统包括前台操作和后台管理两个部分,一方面,为用户提供首页,农机,系统公告,个人中心,后台管理等功能;另一方面,为管理员提供首页,个人中心,农机机主管理,使用者管理,农机类型管理,农机管理,农机预约管理,系统管理等功能。项目获取,看这里2、技术框架编程语言:java系统架构:B/S......
  • FPGA以SDIO模式读写SD卡源码,可移植到任何FPGA中。 在S
    FPGA以SDIO模式读写SD卡源码,可移植到任何FPGA中。在SDIO模式下,SD卡读写速率50Mbps以上。文件里包含tb和说明文档,已经下板验证通过。YID:17200653395070926......
  • FPGA读写IIC驱动源码,源码包含iic驱动,testbench以及eeprom模型。
    FPGA读写IIC驱动源码,源码包含iic驱动,testbench以及eeprom模型。该代码已经下板验证通过。ID:17100653395566310......
  • 雷赛轨迹卡源码。 买来后发现核心功能缺失,功能无法验证成功,现低价转
    雷赛轨迹卡源码。买来后发现核心功能缺失,功能无法验证成功,现低价转让。ID:6920000662735768959......
  • 商业级别的ethercat主站源码(倍福架构),有文档。
    商业级别的ethercat主站源码(倍福架构),有文档。ID:1220000670391592121......