服务注册到Nacos后,其他服务就可以获取该服务的实例信息,调用此服务;当服务宕机,Nacos会将该服务信息从维护的服务实例列表中删除,此时,其他服务获取不到该服务的实例信息,无法调用该服务。该服务是否应该被删除,取决于该服务是否健康,Nacos提供健康检查机制,判断服务是否有问题,将不健康的服务剔除下线。
在Nacos1.x版本中临时实例通过Distro协议内存进行存储,客户端向注册中心发送心跳来维持自身healthy状态;永久实例通过Raft协议进行永久化存储,服务端定时与客户端建立tcp连接做健康检查。
在Nacos2.x版本后临时实例不再使用心跳,而是通过GRPC长连接是否存活来判断实例是否健康,但仍然保留对Nacos1.x的http客户端的支持。
1、客户端健康检查
在Nacos2.x临时节点的探活通过GRPC长连接实现,长连接是指在一个连接上可以连续发送多个数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包。
只要Client与Server的连接存在,就表明客户端是healthy状态。下面来看看长连接是怎样的建立起来的。
在 Nacos源码(二):客户端服务注册源码分析 中提到,客户端临时节点通过代理NamingClientProxyDelegate调用 NamingGrpcClientProxy 中的方法完成实例注册功能,NamingGrpcClientProxy对象是在 NamingClientProxyDelegate 的构造方法中创建。
NamingGrpcClientProxy构造方法详情如下:
RpcClient#start() 的主要步骤如下:
1.1、连接Nacos服务
连接Nacos服务并设置客户端状态,将连接事件添加到阻塞队列中,便于回调处理:
1.2、Nacos连接事件回调
客户端连接、断开连接事件回调基于阻塞队列 eventLinkedBlockingQueue 完成的,在连接Nacos服务后,会将连接事件添加到 eventLinkedBlockingQueue 队列中,等待线程池的回调处理。
1.3、探活处理
永真循环检测长连接,若Naocs服务端无法连接,连接集群中的其他Nacos服务节点。
至此,临时节点健康检查,涉及GRPC长连接启动,及探活处理部分分析完成,下面来看看永久节点的健康检查是如何处理的。
2、服务端节点健康检查
在Nacos源码(四):服务端服务注册源码分析中提到,客户端管理器 ClientManagerDelegate 实现 ClientManager 接口,持有 EphemeralIpPortClientManager、 PersistentIpPortClientManage 客户端管理器。PersistentIpPortClientManage对应的是基于Ip、端口的永久节点客户端管理器。
在注册实例,根据客户端Id获取连接时,会设置心跳检测的定时任务:
最终执行到 PersistentIpPortClientManager 的 clientConnected 方法:
1 public boolean clientConnected(final Client client) { 2 clients.computeIfAbsent(client.getClientId(), s -> { 3 Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId()); 4 IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client; 5 // 设置心跳检查定时任务 6 ipPortBasedClient.init(); 7 return ipPortBasedClient; 8 }); 9 return true; 10 }
IpPortBasedClient#init() 心跳检查定时任务如下:
1 /** 2 * 初始化client,设置心跳检测定时任务 3 * Init client. 4 */ 5 public void init() { 6 // 临时节点 7 if (ephemeral) { 8 // ClientBeatCheckTaskV2 作为心跳检测的任务 9 beatCheckTask = new ClientBeatCheckTaskV2(this); 10 HealthCheckReactor.scheduleCheck(beatCheckTask); 11 // 永久节点 12 } else { 13 // HealthCheckTaskV2 作为心跳检测的任务 14 healthCheckTaskV2 = new HealthCheckTaskV2(this); 15 HealthCheckReactor.scheduleCheck(healthCheckTaskV2); 16 } 17 }
临时实例:使用 ClientBeatCheckTaskV2 处理健康检查。
永久实例:使用 HealthCheckTaskV2 处理健康检查。2.1、永久实例健康检查
永久节点的健康检查是由服务端定时与客户端建立tcp连接做健康检查,是服务端主动发起的探测,服定时请求客户端判断是否健康。
永久实例使用 HealthCheckTaskV2 处理健康检查,HealthCheckTaskV2类图如下:
通过实现的run()方法得知,执行 HealthCheckTaskV2#doHealthCheck() 完成发起永久实例的健康检测。
最终执行 HealthCheckProcessorV2Delegate#process(),HealthCheckProcessorV2Delegate是一个代理类,具体的健康检查由内部维护的healthCheckProcessorMap中的具体实现类完成。
1 /** 2 * 健康检查处理代理类 3 */ 4 public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 { 5 6 // 健康检查实现类 容器 7 private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>(); 8 9 @Override 10 public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) { 11 // 从元数据中获取当前客户端的健康检查类型,默认TCP (HTTP、TCP、MYSQL、NONE) => HealthCheckType 枚举类 12 String type = metadata.getHealthyCheckType(); 13 // 根据类型从缓存中获取 健康检查具体处理类 14 HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type); 15 // 找不到处理类 使用 NoneHealthCheckProcessor 做健康检查,及什么都不做 16 if (processor == null) { 17 processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE); 18 } 19 processor.process(task, service, metadata); 20 } 21 }
HealthCheckProcessorV2类图如下:
子类实现与 HealthCheckType 枚举中的健康检查类型一一对应,下面来看下具体实现:
2.1.1、HttpHealthCheckProcessor
HttpHealthCheckProcessor的process是通过 RestTemplate 完成健康检测请求。
2.1.2、MysqlHealthCheckProcessor
MysqlHealthCheckProcessor的process是执行配置中的sql,完成健康检测。
健康检测任务 MysqlCheckTask#run(),通过执行SQL来完成健康检测,执行过程不报异常,证明实例健康。
2.1.3、TcpHealthCheckProcessor
TcpHealthCheckProcessor是通过构建Socket,然后对连接或读入事件进行监听,完成健康检测。
需要进行健康检测的实例添加到队列后,是在哪里执行的呢?
TcpHealthCheckProcessor实现了Runables接口,在TcpHealthCheckProcessor构造函数中启动了该任务。
在run方法中,执行了 processTask处理任务:
TcpHealthCheckProcessor#processTask() 详情如下:
1 /** 2 * 处理任务 3 * @throws Exception 4 */ 5 private void processTask() throws Exception { 6 Collection<Callable<Void>> tasks = new LinkedList<>(); 7 do { 8 // 获取队列中需要进行心跳检测的实例 9 Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS); 10 if (beat == null) { 11 return; 12 } 13 14 // 添加到任务队列 15 tasks.add(new TaskProcessor(beat)); 16 } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64); 17 18 // 调用队列中任务,并获取执行结果 19 for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) { 20 f.get(); 21 } 22 }
2.1.4、NoneHealthCheckProcessor
NoneHealthCheckProcessor什么都不做,不对示例进行健康检测。
2.2、临时实例健康检查
临时实例的 ClientBeatCheckTaskV2 处理健康检查在,是通过责任链完成的健康检测。
AbstractNamingInterceptorChain#doInterceptor 详情如下:
最终会调用传入的参数 InstanceBeatCheckTask#passIntercept()
passIntercept方法实际上会遍历 statics 代码块中的 UnhealthyInstanceChecker、ExpiredInstanceChecker 和 用户自定义实现了InstanceBeatChecker接口的检测。
2.2.1、UnhealthyInstanceChecker
UnhealthyInstanceChecker 是不健康实例的检查器,详情如下:
1 // 健康检测 2 public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) { 3 // 调整实例健康状态 4 if (instance.isHealthy() && isUnhealthy(service, instance)) { 5 // 将当前不健康实例的状态调整为为 不健康 6 changeHealthyStatus(client, service, instance); 7 } 8 } 9 10 /** 11 * 判断实例是否健康 12 * @param service 13 * @param instance 14 * @return 15 */ 16 private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) { 17 // 获取超时时间 默认 15 秒;可通过 preserved.heart.beat.timeout 配置更改 18 long beatTimeout = getTimeout(service, instance); 19 // 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点不健康了 20 return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout; 21 } 22 23 /** 24 * 调整实例健康状态 25 */ 26 private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) { 27 // 改为false 28 instance.setHealthy(false); 29 Loggers.EVT_LOG 30 .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(), 31 instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, 32 instance.getLastHeartBeatTime()); 33 NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service)); 34 NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client)); 35 NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(), 36 service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(), instance.getPort(), 37 false, "client_beat")); 38 }
2.2.2、ExpiredInstanceChecker
UnhealthyInstanceChecker 是过期实例的检查器,详情如下:
1 public class ExpiredInstanceChecker implements InstanceBeatChecker { 2 3 @Override 4 public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) { 5 // 帕努单当前实例是否过期 6 boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance(); 7 // 若实例过期,直接剔除实例 8 if (expireInstance && isExpireInstance(service, instance)) { 9 deleteIp(client, service, instance); 10 } 11 } 12 13 private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) { 14 // 获取超时时间 默认 30 秒;可通过 preserved.ip.delete.timeout 配置更改。 15 long deleteTimeout = getTimeout(service, instance); 16 // 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点过期了,需要进行节点剔除操作 17 return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout; 18 } 19 20 /** 21 * 剔除服务 22 * @param client 23 * @param service 24 * @param instance 25 */ 26 private void deleteIp(Client client, Service service, InstancePublishInfo instance) { 27 Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.toString(), JacksonUtils.toJson(instance)); 28 client.removeServiceInstance(service); 29 // 注销客户端 30 NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId())); 31 // 修改实例元数据 32 NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true)); 33 // 注销实例 34 NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "", 35 false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(), 36 service.getName(), instance.getIp(), instance.getPort())); 37 } 38 }
标签:实例,service,Nacos,client,instance,源码,健康检查,服务端,客户端 From: https://www.cnblogs.com/RunningSnails/p/17888531.html