首页 > 编程语言 >Nacos源码(七):客户端实例变更事件机制源码分析

Nacos源码(七):客户端实例变更事件机制源码分析

时间:2023-12-08 17:37:50浏览次数:126  
标签:订阅 serviceInfo Nacos 源码 NotifyCenter 事件 实例 final 客户端

  在给出的NamingExample示例中,给出客户端订阅的代码,详情如下:

0

  客户端的订阅机制是通过事件完成的, NacosNamingService#subscribe() 详情如下:

0

客户端订阅主要步骤:

  1、注册事件监听器

  2、客户端订阅

  客户端订阅在Nacos源码(六):客户端服务发现源码分析中已经做了简要的分析,得知在Nacos客户端订阅时,通过UpdateTask定时任务从注册中心拉取实例列表,当实例发生变化时,更新本地缓存中的服务实例信息。具体是如何完成本地缓存更新未做详细介绍,下面重点来看看这一步是如何完成的。

1、本地缓存的处理回顾

  先来看看获取到实例列表后,如何处理本地缓存的,从定时同步任务UpdateTask的run()方法中,很容易看出哪一步是处理服务信息的。

0

  ServiceInfoHolder#processServiceInfo() 详情如下:

 1 // 本地缓存服务信息,内存中
 2 private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
 3 
 4 // 缓存磁盘文件路径
 5 private String cacheDir;
 6 
 7 public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
 8     String serviceKey = serviceInfo.getKey();
 9     if (serviceKey == null) {
10         return null;
11     }
12     // 未能从Nacos服务获取到信息,则返回本地缓存的服务信息
13     ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
14     if (isEmptyOrErrorPush(serviceInfo)) {
15         //empty or error push, just ignore
16         return oldService;
17     }
18     // 更新本地缓存服务信息
19     serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
20     // 判断注册的实例信息是否已变更
21     boolean changed = isChangedServiceInfo(oldService, serviceInfo);
22     if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
23         serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
24     }
25     // 监控服务监控缓存Map的大小
26     MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
27     // 服务实例已变更
28     if (changed) {
29         NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
30                 JacksonUtils.toJson(serviceInfo.getHosts()));
31         // 添加实例变更事件,通知订阅者执行业务处理
32         NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
33                 serviceInfo.getClusters(), serviceInfo.getHosts()));
34         // 记录Service本地文件
35         DiskCache.write(serviceInfo, cacheDir);
36     }
37     return serviceInfo;
38 }

  本地缓存由两部分,一是内存,存储在Map类型的ServiceInfoHolder#serviceInfoMap里;一是磁盘文件中,存储路径在 ServiceInfoHolder#cacheDir,存储在磁盘文件中的服务信息用于故障转移处理。

  当服务实例发生变更,更新内存、磁盘文件的实例列表信息,同时发布InstancesChangeEvent事件,通知订阅者进行业务处理。

2、发布实例变更事件

  发布实例变更事件最终执行到 NotifyCenter#publishEvent 详情如下:

 1 // INSTANCE是NotifyCenter的单例对象
 2 private static final NotifyCenter INSTANCE = new NotifyCenter();
 3 
 4 /**
 5  * 事件发布者管理器
 6  * Publisher management container.
 7  */
 8 private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
 9 
10 // 发布事件
11 private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
12    // ...
13     // 根据事件类型获取canonicalName
14     final String topic = ClassUtils.getCanonicalName(eventType);
15 
16     // 根据 canonicalName 获取 事件发布者
17     EventPublisher publisher = INSTANCE.publisherMap.get(topic);
18     if (publisher != null) {
19         // 发布事件
20         return publisher.publish(event);
21     }
22     // ...
23     return false;
24 }

  根据InstancesChangeEvent事件类型,获取CanonicalName;将CanonicalName作为key,从NotifyCenter.publisherMap中获取对应的事件发布者(EventPublisher);EventPublisher将InstancesChangeEvent事件进行发布。

2.1、eventType 与 EventPublisher 映射关系的建立

  CanonicalName与EventPublisher是怎么建立起联系的呢?

  在Nacos源码(二):客户端服务注册源码分析中,NamingService进行实例化时,NacosNamingService构造函数中调用了init方法,NacosNamingService#init() 详情如下:

0

  在init方法中,注册了InstancesChangeEvent的事件发布者,同时也注册了实例列表变化事件的订阅者InstancesChangeNotifier。

2.1.1、注册事件发布器

  NotifyCenter#registerToPublisher利用 NotifyCenter#publisherMap 维护了 ChangeEvent 与 Publisher 的映射关系。,并启动了 EventPublisher 处理事件线程,等待订阅者启动成功后,将事件通知到订阅者处理。

  NotifyCenter#registerToPublisher 详情如下:

 1 // 事件发布工厂
 2 private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;
 3 
 4 /**
 5  * 注册事件发布者
 6  * eventType 为  InstancesChangeEvent.class
 7  */
 8 public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
 9     // 根据 DEFAULT_PUBLISHER_FACTORY 获取 EventPublisher
10     return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
11 }
12 
13 /**
14  * 注册发布者
15  */
16 public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
17         final EventPublisherFactory factory, final int queueMaxSize) {
18     if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
19         return INSTANCE.sharePublisher;
20     }
21     
22     // 根据eventType获取CanonicalName
23     final String topic = ClassUtils.getCanonicalName(eventType);
24     synchronized (NotifyCenter.class) {
25         // 维护 INSTANCE.publisherMap 中  CanonicalName 与 EventPublisher 的关系
26         MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
27     }
28     return INSTANCE.publisherMap.get(topic);
29 }

  EventPublisher 是通过 EventPublisherFactory 创建的,在 NotifyCenter 的静态代码块中利用反射机制进行实例化,默认使用的是 DefaultPublisher。

0

2.1.2、注册事件订阅者

  NotifyCenter#addSubscriber 详情如下:

0

  将事件订阅者添加到DefaultPublisher的 subscribers 属性中。

1 // 添加到订阅者的集合
2 public void addSubscriber(Subscriber subscriber) {
3     subscribers.add(subscriber);
4 }

  NotifyCenter#addSubscriber确定了事件、事件发布者、事件订阅者三者的关系,关联的关键是事件发布者 EventPublisher,

  通过事件类型EventType获取DefaultPublisher发布事件,再遍历DefaultPublisher中维护的subscribers订阅者集合,通知订阅者处理事件。

2.2、发布实例列表变更事件

  DefaultPublisher#publish 详情如下:

 1 /**
 2  * 发布事件任务
 3  */
 4 @Override
 5 public boolean publish(Event event) {
 6     checkIsStart();
 7     // 将事件添加进任务队列,等待执行
 8     boolean success = this.queue.offer(event);
 9     // 添加失败,直接处理事件
10     if (!success) {
11         // 通知订阅者处理事件
12         receiveEvent(event);
13         return true;
14     }
15     return true;
16 }

  实例列表变更事件添加到任务队列中,若添加失败,直接通知订阅者处理该事件。

3、通知订阅处理事件

  发布实例变更事件,实际上是将事件添加到任务队列queue中,队列在 NotifyCenter 的静态代码块中利用反射机制对 EventPublisherFactory 进行实例化时,执行 DefaultPublisher#init() 方法,详情如下:

0

  DefaultPublisher继承 Thread 类,在 init 方法中初始化了存放事件的任务队列,同时启动了事件发布者线程,有事件发布,则通知订阅者处理事件。

  接下来我们来看看是如何通知订阅者处理的,DefaultPublisher的 run 方法:

 1 // 开启事件处理
 2 public void run() {
 3     openEventHandler();
 4 }
 5 
 6 // 开始时间处理
 7 void openEventHandler() {
 8     try {
 9         // 等待次数,1次等待 1s
10         int waitTimes = 60;
11         // 死循环延迟,线程启动最大延时60秒,为了解决消息积压的问题。
12         while (!shutdown && !hasSubscriber() && waitTimes > 0) {
13             ThreadUtils.sleep(1000L);
14             waitTimes--;
15         }
16 
17         //  死循环不断的从任务队列中取出Event,并通知订阅者Subscriber执行Event
18         while (!shutdown) {
19             final Event event = queue.take();
20             // 从队列中取出Event并处理
21             receiveEvent(event);
22             UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
23         }
24     } catch (Throwable ex) {
25         LOGGER.error("Event listener exception : ", ex);
26     }
27 }

  第一个循环会等待60s,当无订阅者处理事件时,结束;

  第二个循环会不断从任务队列中获取事件处理,若无任务,阻塞等待,直到有事件发布后执行。

4、通知订阅者处理事件

  在上面已经分析到,订阅者是在构建NamingService实例完成的注册,实际上是将订阅者注册到DefaultPublisher中维护的订阅者集合 subscribers 属性中。

DefaultPublisher#receiveEvent() 详情如下:

 1 /**
 2  * 接收并通知订阅者处理事件
 3  */
 4 void receiveEvent(Event event) {
 5     final long currentEventSequence = event.sequence();
 6     if (!hasSubscriber()) {
 7         LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
 8         return;
 9     }
10     // 遍历订阅者
11     for (Subscriber subscriber : subscribers) {
12         if (!subscriber.scopeMatches(event)) {
13             continue;
14         }
15         // 忽略过期事件
16         if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
17             continue;
18         }
19         // 通知订阅者处理事件
20         notifySubscriber(subscriber, event);
21     }
22 }
23 
24 /**
25  * 通知订阅者处理事件
26  */
27 @Override
28 public void notifySubscriber(final Subscriber subscriber, final Event event) {
29 
30     LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
31 
32     // 执行订阅者事件
33     final Runnable job = () -> subscriber.onEvent(event);
34     final Executor executor = subscriber.executor();
35 
36     if (executor != null) {
37         executor.execute(job);
38     } else {
39         try {
40             job.run();
41         } catch (Throwable e) {
42             LOGGER.error("Event callback exception: ", e);
43         }
44     }
45 }

  遍历注册的事件处理器subscribers,获取指定事件的订阅者,通知订阅者执行onEvent方法,完成事件的处理,至此整个事件便处理完成了。

5、整体流程

5.1、InstanceChangeEvent事件发布

  在订阅后,从注册中心拉取到的实例列表有更新的情况时,通过NotifyCenter发布了InstancesChangeEvent事件,发布事件要流程:根据InstancesChangeEvent事件类型,获得对应的CanonicalName,再将CanonicalName作为Key,从NotifyCenter.publisherMap中获取对应的事件发布者(EventPublisher),EventPublisher将InstancesChangeEvent事件进行发布。

  通过EventPublisher的实现类DefaultPublisher进行InstancesChangeEvent事件发布,DefaultPublisher本身以守护线程的方式运作,在执行业务逻辑前,先判断该线程是否启动,如果启动,则将事件添加到BlockingQueue中,队列默认大小为16384,添加到BlockingQueue成功,则整个发布过程完成。

  如果添加失败,则直接调用DefaultPublisher.receiveEvent方法,接收事件并通知订阅者,通知订阅者时创建一个Runnable对象,执行订阅者的Event。

5.2、InstanceChangeEvent事件消费

  如果添加到BlockingQueue成功,DefaultPublisher初始化时会创建一个阻塞(BlockingQueue)队列,并标记线程启动 ,DefaultPublisher本身是一个Thread,当执行super.start方法时,会调用它的run方法 ,run方法的核心业务逻辑是通过openEventHandler方法处理的 ,openEventHandler方法通过两个for循环,从阻塞队列中获取时间信息 ,第一个for循环用于让线程启动时在60s内检查执行条件 ,第二个for循环为死循环,从阻塞队列中获取Event,并调用DefaultPublisher#receiveEvent方法,接收事件并通知订阅者。

 

标签:订阅,serviceInfo,Nacos,源码,NotifyCenter,事件,实例,final,客户端
From: https://www.cnblogs.com/RunningSnails/p/17888589.html

相关文章

  • C#区域医院检验系统源码,SaaS服务
    LIS是将医院检验仪器与计算机组成网络,使得医院检验工作从医生检验申请——收费——样品采样与登记——数据采集与审核——报告单传输与打印——质量控制——统计与分析等一系列流程中,实现智能化、自动化和规范化,大大提高了业务效率。本套LIS检验系统面向医院实验室、医院集团、区域......
  • Nacos源码(五):服务端健康检查源码分析
    服务注册到Nacos后,其他服务就可以获取该服务的实例信息,调用此服务;当服务宕机,Nacos会将该服务信息从维护的服务实例列表中删除,此时,其他服务获取不到该服务的实例信息,无法调用该服务。该服务是否应该被删除,取决于该服务是否健康,Nacos提供健康检查机制,判断服务是否有问题,将不健康......
  • Spring源码系列:MyBatis整合和原理
    前言Mybatis是啥?Mybatis是一个支持普通SQL查询、存储过程以及映射的一个持久层半ORM框架。那么在了解Spring整合Mybatis这部分源码之前,我们先来看下Mybatis的实际运用。一.Mybatis的使用首先,项目的结构如下:pom依赖:<dependency><groupId>org.mybatis</groupId><artifactId......
  • C语言源码的陷波器设计及调试总结
    一前记音频信号处理中,限波器是一个常用的算法。这个算法难度不是很高,可用起来却坑很多。二源码解析1滤波器的核心函数,这里注意两点,一个是带宽不能太宽了,太宽了杀伤力太大了,容易出问题。另外一个就是滤波器的阶数非常重要,假如想滤波宽度尽量窄一些,那就阶数尽量高一些......
  • 构建用于复杂数据处理的高效UDP服务器和客户端
    title:构建用于复杂数据处理的高效UDP服务器和客户端banner_img:https://cdn.studyinglover.com/pic/2023/12/334c0c129076533308cbc7e03f8c55be.pngdate:2023-12-723:03:00tags:-踩坑构建用于复杂数据处理的高效UDP服务器和客户端引言在当今快速发展的网络通信世界......
  • springboot蜗牛兼职网的设计与实现-计算机毕业设计源码+LW文档
    摘 要随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势,蜗牛兼职网当然也不能排除在外。蜗牛兼职网是以实际运用为开发背景,运用软件工程原理和开发方法,采用springboot框架构建的一个管理系统。整个开发过程首先对软件系......
  • C++socket服务器与客户端简单通信流程
    服务器和客户端简单通信的流程,做一个简单的复习:1.服务器创建的流程代码如下,各个重要函数已经写注释:#include<iostream>//推荐加上宏定义#defineWIN32_LEAN_AND_MEAN#include<winsock2.h>#include<Windows.h>#pragmacomment(lib,"ws2_32")usingnamespacestd;i......
  • springboot018母婴商城-计算机毕业设计源码+LW文档
    一、选题背景以母婴人群和准母婴人群及其家庭群体为目标用户。站在整个社会产业的角度,有些产业为所有用户提供某类基本需求,有些产业为某类用户提供某类特定需求,而母婴产业是最终满足特定人群相关多元化需求的一个宽辐射市场。母婴产品及服务最终以线上与线下为出口抵达用户,从市场......
  • springboot019高校心理教育辅导设计与实现-计算机毕业设计源码+LW文档
    1.1选题的意义和目的当今社会,经济飞速发展,科技日新月异,人际关系复杂多变,整个社会生活节奏越来越快,人们承受的生活压力越来越大。而对于在校大学生来说,他们还未完全融入到社会之中,但他们却依然要面对这些社会问题,他们要不断地学习新的知识,要开始学会如何解决身边地人际关系问题,要面......
  • 浅谈clickhouse的Mutation机制(附源码分析)
    最近研究了一点ch的代码。发现一个很有意思的词,mutation。google这个词有突变的意思,但更多的相关文章翻译这个为"订正"。上一篇文章分析了background_pool_size参数。这个参数和后台异步工作线程池merge工作有关。ClickHouse内核中异步merge、mutation工作由统一的工作线程池来完成......