首页 > 编程语言 >Nacos源码(五):服务端健康检查源码分析

Nacos源码(五):服务端健康检查源码分析

时间:2023-12-08 17:01:10浏览次数:59  
标签:实例 service Nacos client instance 源码 健康检查 服务端 客户端

  服务注册到Nacos后,其他服务就可以获取该服务的实例信息,调用此服务;当服务宕机,Nacos会将该服务信息从维护的服务实例列表中删除,此时,其他服务获取不到该服务的实例信息,无法调用该服务。该服务是否应该被删除,取决于该服务是否健康,Nacos提供健康检查机制,判断服务是否有问题,将不健康的服务剔除下线。

  在Nacos1.x版本中临时实例通过Distro协议内存进行存储,客户端向注册中心发送心跳来维持自身healthy状态;永久实例通过Raft协议进行永久化存储,服务端定时与客户端建立tcp连接做健康检查。

  在Nacos2.x版本后临时实例不再使用心跳,而是通过GRPC长连接是否存活来判断实例是否健康,但仍然保留对Nacos1.x的http客户端的支持。

1、客户端健康检查

  在Nacos2.x临时节点的探活通过GRPC长连接实现,长连接是指在一个连接上可以连续发送多个数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包。

  只要Client与Server的连接存在,就表明客户端是healthy状态。下面来看看长连接是怎样的建立起来的。

  在 Nacos源码(二):客户端服务注册源码分析 中提到,客户端临时节点通过代理NamingClientProxyDelegate调用 NamingGrpcClientProxy 中的方法完成实例注册功能,NamingGrpcClientProxy对象是在 NamingClientProxyDelegate 的构造方法中创建。

0

  NamingGrpcClientProxy构造方法详情如下:

0

  RpcClient#start() 的主要步骤如下:

0

1.1、连接Nacos服务

  连接Nacos服务并设置客户端状态,将连接事件添加到阻塞队列中,便于回调处理:

0

1.2、Nacos连接事件回调

  客户端连接、断开连接事件回调基于阻塞队列 eventLinkedBlockingQueue 完成的,在连接Nacos服务后,会将连接事件添加到 eventLinkedBlockingQueue 队列中,等待线程池的回调处理。

0

1.3、探活处理

  永真循环检测长连接,若Naocs服务端无法连接,连接集群中的其他Nacos服务节点。

0

  至此,临时节点健康检查,涉及GRPC长连接启动,及探活处理部分分析完成,下面来看看永久节点的健康检查是如何处理的。

2、服务端节点健康检查

  在Nacos源码(四):服务端服务注册源码分析中提到,客户端管理器 ClientManagerDelegate 实现 ClientManager 接口,持有 EphemeralIpPortClientManager、 PersistentIpPortClientManage 客户端管理器。PersistentIpPortClientManage对应的是基于Ip、端口的永久节点客户端管理器。

  在注册实例,根据客户端Id获取连接时,会设置心跳检测的定时任务:

0

  最终执行到 PersistentIpPortClientManager 的 clientConnected 方法:

 1 public boolean clientConnected(final Client client) {
 2     clients.computeIfAbsent(client.getClientId(), s -> {
 3         Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
 4         IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
 5         // 设置心跳检查定时任务
 6         ipPortBasedClient.init();
 7         return ipPortBasedClient;
 8     });
 9     return true;
10 }

  IpPortBasedClient#init() 心跳检查定时任务如下:

 1 /**
 2  * 初始化client,设置心跳检测定时任务
 3  * Init client.
 4  */
 5 public void init() {
 6     // 临时节点
 7     if (ephemeral) {
 8         // ClientBeatCheckTaskV2 作为心跳检测的任务
 9         beatCheckTask = new ClientBeatCheckTaskV2(this);
10         HealthCheckReactor.scheduleCheck(beatCheckTask);
11     // 永久节点
12     } else {
13         // HealthCheckTaskV2 作为心跳检测的任务
14         healthCheckTaskV2 = new HealthCheckTaskV2(this);
15         HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
16     }
17 }

  临时实例:使用 ClientBeatCheckTaskV2 处理健康检查。

  永久实例:使用 HealthCheckTaskV2 处理健康检查。

2.1、永久实例健康检查

  永久节点的健康检查是由服务端定时与客户端建立tcp连接做健康检查,是服务端主动发起的探测,服定时请求客户端判断是否健康。

  永久实例使用 HealthCheckTaskV2 处理健康检查,HealthCheckTaskV2类图如下:

0

  通过实现的run()方法得知,执行 HealthCheckTaskV2#doHealthCheck() 完成发起永久实例的健康检测。

0

  最终执行 HealthCheckProcessorV2Delegate#process(),HealthCheckProcessorV2Delegate是一个代理类,具体的健康检查由内部维护的healthCheckProcessorMap中的具体实现类完成。

 1 /**
 2  * 健康检查处理代理类
 3  */
 4 public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
 5 
 6     // 健康检查实现类 容器
 7     private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
 8 
 9     @Override
10     public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
11         // 从元数据中获取当前客户端的健康检查类型,默认TCP (HTTP、TCP、MYSQL、NONE) => HealthCheckType 枚举类
12         String type = metadata.getHealthyCheckType();
13         // 根据类型从缓存中获取 健康检查具体处理类
14         HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
15         // 找不到处理类 使用 NoneHealthCheckProcessor 做健康检查,及什么都不做
16         if (processor == null) {
17             processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
18         }
19         processor.process(task, service, metadata);
20     }
21 }

  HealthCheckProcessorV2类图如下:

0

  子类实现与 HealthCheckType 枚举中的健康检查类型一一对应,下面来看下具体实现:

2.1.1、HttpHealthCheckProcessor

  HttpHealthCheckProcessor的process是通过 RestTemplate 完成健康检测请求。

0

2.1.2、MysqlHealthCheckProcessor

  MysqlHealthCheckProcessor的process是执行配置中的sql,完成健康检测。

0

  健康检测任务 MysqlCheckTask#run(),通过执行SQL来完成健康检测,执行过程不报异常,证明实例健康。

0

2.1.3、TcpHealthCheckProcessor

  TcpHealthCheckProcessor是通过构建Socket,然后对连接或读入事件进行监听,完成健康检测。

0

  需要进行健康检测的实例添加到队列后,是在哪里执行的呢?

  TcpHealthCheckProcessor实现了Runables接口,在TcpHealthCheckProcessor构造函数中启动了该任务。

0

  在run方法中,执行了 processTask处理任务:

0

  TcpHealthCheckProcessor#processTask() 详情如下:

 1 /**
 2  * 处理任务
 3  * @throws Exception
 4  */
 5 private void processTask() throws Exception {
 6     Collection<Callable<Void>> tasks = new LinkedList<>();
 7     do {
 8         // 获取队列中需要进行心跳检测的实例
 9         Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
10         if (beat == null) {
11             return;
12         }
13 
14         // 添加到任务队列
15         tasks.add(new TaskProcessor(beat));
16     } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
17 
18     // 调用队列中任务,并获取执行结果
19     for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
20         f.get();
21     }
22 }

2.1.4、NoneHealthCheckProcessor

  NoneHealthCheckProcessor什么都不做,不对示例进行健康检测。

2.2、临时实例健康检查

  临时实例的 ClientBeatCheckTaskV2 处理健康检查在,是通过责任链完成的健康检测。

0

  AbstractNamingInterceptorChain#doInterceptor 详情如下:

0

  最终会调用传入的参数 InstanceBeatCheckTask#passIntercept()

0

  passIntercept方法实际上会遍历 statics 代码块中的 UnhealthyInstanceChecker、ExpiredInstanceChecker 和 用户自定义实现了InstanceBeatChecker接口的检测。

2.2.1、UnhealthyInstanceChecker

  UnhealthyInstanceChecker 是不健康实例的检查器,详情如下:

 1 // 健康检测
 2 public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
 3     // 调整实例健康状态
 4     if (instance.isHealthy() && isUnhealthy(service, instance)) {
 5         // 将当前不健康实例的状态调整为为 不健康
 6         changeHealthyStatus(client, service, instance);
 7     }
 8 }
 9 
10 /**
11  * 判断实例是否健康
12  * @param service
13  * @param instance
14  * @return
15  */
16 private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
17     // 获取超时时间 默认 15 秒;可通过 preserved.heart.beat.timeout 配置更改
18     long beatTimeout = getTimeout(service, instance);
19     // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点不健康了
20     return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
21 }
22 
23 /**
24  * 调整实例健康状态
25  */
26 private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
27     // 改为false
28     instance.setHealthy(false);
29     Loggers.EVT_LOG
30             .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),
31                     instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,
32                     instance.getLastHeartBeatTime());
33     NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
34     NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
35     NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
36             service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(), instance.getPort(),
37             false, "client_beat"));
38 }

2.2.2、ExpiredInstanceChecker

  UnhealthyInstanceChecker 是过期实例的检查器,详情如下:

 

 1 public class ExpiredInstanceChecker implements InstanceBeatChecker {
 2 
 3     @Override
 4     public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
 5         // 帕努单当前实例是否过期
 6         boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
 7         // 若实例过期,直接剔除实例
 8         if (expireInstance && isExpireInstance(service, instance)) {
 9             deleteIp(client, service, instance);
10         }
11     }
12 
13     private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
14         // 获取超时时间 默认 30 秒;可通过 preserved.ip.delete.timeout 配置更改。
15         long deleteTimeout = getTimeout(service, instance);
16         // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点过期了,需要进行节点剔除操作
17         return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
18     }
19 
20     /**
21      * 剔除服务
22      * @param client
23      * @param service
24      * @param instance
25      */
26     private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
27         Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.toString(), JacksonUtils.toJson(instance));
28         client.removeServiceInstance(service);
29         // 注销客户端
30         NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
31         // 修改实例元数据
32         NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
33         // 注销实例
34         NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
35                 false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),
36                 service.getName(), instance.getIp(), instance.getPort()));
37     }
38 }

 

标签:实例,service,Nacos,client,instance,源码,健康检查,服务端,客户端
From: https://www.cnblogs.com/RunningSnails/p/17888531.html

相关文章

  • Spring源码系列:MyBatis整合和原理
    前言Mybatis是啥?Mybatis是一个支持普通SQL查询、存储过程以及映射的一个持久层半ORM框架。那么在了解Spring整合Mybatis这部分源码之前,我们先来看下Mybatis的实际运用。一.Mybatis的使用首先,项目的结构如下:pom依赖:<dependency><groupId>org.mybatis</groupId><artifactId......
  • C语言源码的陷波器设计及调试总结
    一前记音频信号处理中,限波器是一个常用的算法。这个算法难度不是很高,可用起来却坑很多。二源码解析1滤波器的核心函数,这里注意两点,一个是带宽不能太宽了,太宽了杀伤力太大了,容易出问题。另外一个就是滤波器的阶数非常重要,假如想滤波宽度尽量窄一些,那就阶数尽量高一些......
  • springboot蜗牛兼职网的设计与实现-计算机毕业设计源码+LW文档
    摘 要随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势,蜗牛兼职网当然也不能排除在外。蜗牛兼职网是以实际运用为开发背景,运用软件工程原理和开发方法,采用springboot框架构建的一个管理系统。整个开发过程首先对软件系......
  • springboot018母婴商城-计算机毕业设计源码+LW文档
    一、选题背景以母婴人群和准母婴人群及其家庭群体为目标用户。站在整个社会产业的角度,有些产业为所有用户提供某类基本需求,有些产业为某类用户提供某类特定需求,而母婴产业是最终满足特定人群相关多元化需求的一个宽辐射市场。母婴产品及服务最终以线上与线下为出口抵达用户,从市场......
  • springboot019高校心理教育辅导设计与实现-计算机毕业设计源码+LW文档
    1.1选题的意义和目的当今社会,经济飞速发展,科技日新月异,人际关系复杂多变,整个社会生活节奏越来越快,人们承受的生活压力越来越大。而对于在校大学生来说,他们还未完全融入到社会之中,但他们却依然要面对这些社会问题,他们要不断地学习新的知识,要开始学会如何解决身边地人际关系问题,要面......
  • 浅谈clickhouse的Mutation机制(附源码分析)
    最近研究了一点ch的代码。发现一个很有意思的词,mutation。google这个词有突变的意思,但更多的相关文章翻译这个为"订正"。上一篇文章分析了background_pool_size参数。这个参数和后台异步工作线程池merge工作有关。ClickHouse内核中异步merge、mutation工作由统一的工作线程池来完成......
  • 【Java语言】区域医院信息系统云HIS源码
    一个好的HIS系统,要具有开放性,便于扩展升级,增加新的功能模块,支撑好医院的业务的拓展,而且可以反过来给医院赋能,最终向更多的患者提供更好地服务。新理念新技术的趋势云计算技术:云计算是一种通过网络将高弹性的共享的物理和虚拟资源,按需进行服务和管理的方式。它具有以下特点:虚拟......
  • 高级实现Java的七大热门技术框架解析源码特性分析
    设计模式是软件开发中常用的解决方案,可以帮助我们构建灵活可扩展的应用程序。本文将深入探讨Java的七大经典设计模式,并提供相关示例代码。一、单例模式单例模式确保一个类只有一个实例,并提供全局访问点。以下是一个简单的单例模式示例代码:publicclassSingleton{privatesta......
  • java云HIS系统源码 区域HIS信息管理系统源码
    医院管理信息系统(HIS)是医院基本、重要的管理系统,是医院大数据的基础。“云”指系统采用云计算的技术和建设模式,具有可扩展、易共享、区域化、易协同、低成本、易维护、体验好的优势。“H”是医疗卫生,由原来医院(Hospital)到现在的医疗卫生(Healthcare),拓展了H的内涵与外延。云......
  • Nacos 2.3.0 正式发布,堪称最强!
    大家好,我是栈长。Nacos2.3.0前几天正式发布了,新增了不少实用性的新功能,真是史上最强版本。Nacos2.3.0还真是一个比较重要的大版本,因为它涉及了太多重大更新,今天栈长给大家来解读下。Nacos先扫个盲:Nacos一个用于构建云原生应用的动态服务发现、配置管理和服务管理平台,......