首页 > 编程语言 >Nacos深入原理从源码层面讲解

Nacos深入原理从源码层面讲解

时间:2023-09-17 22:14:38浏览次数:40  
标签:String service Nacos instance serviceName 讲解 put 源码

目录

1 Nacos原理

1.1 Nacos架构

图片

  • Provider APP:服务提供者
  • Consumer APP:服务消费者
  • Name Server:通过VIPVirtual IP)或DNS的方式实现Nacos高可用集群的服务路由
  • Nacos ServerNacos服务提供者,里面包含的Open API是功能访问入口,Conig ServiceNaming ServiceNacos提供的配置服务、命名服务模块。Consitency Protocol是一致性协议,用来实现Nacos集群节点的数据同步,这里使用的是Raft算法(Etcd、Redis哨兵选举)
  • Nacos Console:控制台

1.2 注册中心原理

注册中心原理:

  • 服务实例在启动时注册到服务注册表,并在关闭时注销
  • 服务消费者查询服务注册表,获得可用实例
  • 服务注册中心需要调用服务实例的健康检查API来验证它是否能够处理请求
    在这里插入图片描述

1.3 SpringCloud服务注册

Spring-Cloud-Common包中有一个类org.springframework.cloud. client.serviceregistry .ServiceRegistry ,它是Spring Cloud提供的服务注册的标准。集成到Spring Cloud中实现服务注册的组件,都会实现该接口。
在这里插入图片描述
该接口有一个实现类是 NacoServiceRegistry

SpringCloud集成Nacos的实现过程:
spring-clou-commons包的META-INF/spring.factories中包含自动装配的配置信息如下:

图片
其中AutoServiceRegistrationAutoConfiguration就是服务注册相关的配置类:

@Configuration(proxyBeanMethods = false)
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value ="spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration{
	@Autowired(required = false)
	private AutoServiceRegistration autoServiceRegistration;
	@Autowired
	private AutoServiceRegistrationProperties properties;
	@PostConstruct
	protected void init() {
		if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
			throw new IllegalStateException("Auto Service Registration has been requested,but there is no AutoServiceRegistration bean");
}}}

AutoServiceRegistrationAutoConfiguration配置类中,可以看到注入了一个AutoServiceRegistration实例,该类的关系图如下所示。
图片

可以看出, AbstractAutoServiceRegistration 抽象类实现了该接口,并且最重要的是NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration

看到EventListener我们就应该知道,Nacos是通过Spring的事件机制集成到SpringCloud中去的。

AbstractAutoServiceRegistration实现了onApplicationEvent抽象方法,并且监听WebServerInitializedEvent事件(当Webserver初始化完成之后) , 调用this.bind ( event )方法。

@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
	bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
	ApplicationContext context = event.getApplicationContext();
	if (context instanceof ConfigurableWebServerApplicationContext){
		if ("management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace )))
			return;
		}
	}
	this.port.compareAndSet( 0,event.getWebServer() .getPort());
	this.start();
}

最终会调用NacosServiceREgistry.register()方法进行服务注册。

public void start() {
	if (!isEnabled()) {
		if (logger.isDebugEnabled()) [
			logger.debug("Discovery Lifecycle disabled. Not starting");
		}
		return;
	}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
	if (!this.running.get()){
		this.context.publishEvent(new InstancePreRegisteredEvent(this,getRegistration()));
		register();
		if (shouldRegisterManagement()){
			registerManagement();
		}
		this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
		this.running.compareAndSet(false,true);
	}
}

protected void register(){
	this.serviceRegistry.register(getRegistration());
}

1.4 NacosServiceRegistry实现

NacosServiceRegistry.registry方法中,调用了Nacos Client SDK中的namingService.registerInstance完成服务的注册。

@Override
public void register(Registration registration){
	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}
	String serviceId = registration.getServiceId();
	Instance instance = getNacosInstanceFromRegistration(registration):
	try{
		namingService.registerInstance(serviceId,instance);
		log.info("nacos registry,{} {} : {}register finished", serviceId,instance.getIp(),instance.getPort());
	}catch (Exception e) {
		log.error("nacos registry, {} register failed... {},",serviceId,registration.toString(),e);
	}
}

跟踪NacosNamingServiceregisterInstance()方法:

@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
	registerInstance(serviceName,Constants.DEFAULT_GROUP,instance);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
	if (instance.isEphemeralO){
		BeatInfo beatInfo = new BeatInfo();
		beatInfo. setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
		beatInfo.setIp(instance.getIp());
		beatInfo.setPort(instance.getPort());
		beatInfo.setCluster(instance.getClusterName());
		beatInfo.setWeight(instance.getWeight());
		beatInfo.setMetadata(instance .getMetadata());
		beatInfo.setScheduled(false);
		long instanceInterval = instance.getInstanceHeartBeatInterval();
		beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
		
		beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
	}
serverProxy.registerService(Namingutils.getGroupedName(serviceName, groupName), groupName, instance);
}

通过beatReactor.addBeatInfo()创建心跳信息实现健康检测, Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务健康检测的手段。
serverProxy.registerService()实现服务注册

1.4.1 心跳机制

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
	NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
	dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
	executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
	MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

从上述代码看,所谓心跳机制就是客户端通过schedule定时向服务端发送一个数据包 ,然后启动一个线程不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障。Nacos服务端会根据客户端的心跳包不断更新服务的状态。

1.4.2 注册原理

Nacos 提供了SDKOpen API两种形式来实现服务注册。

Open API:

curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=192.16813.1&port=8080'

SDK:

void registerInstance(String serviceName, String ip, int port) throws NacosException;

这两种形式本质都一样,底层都是基于HTTP协议完成请求的。所以注册服务就是发送一个HTTP请求:

public void registerService(String serviceName,
String groupName,Instance instance) throws NacosException {

	NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId,serviceName,instance);
	final Map<String,String> params = new HashMap<>(9);
	params.put(CommonParams.NAMESPACE_ID,namespaceId);
	params.put(CommonParams.SERVICE_NAME,serviceName);
	params.put(CommonParams.GROUP_NAME,groupName);
	params.put(CommonParams.CLUSTERNAME,instance.getClusterName());
	params.put("ip",instance.getIp());
	params .put("port",String. valueOf(instance.getPort()));
	params.put("weight",String.valueOf(instance.getWeight()));
	params.put("enable",String.valueOf(instance.isEnabled()));
	params.put("healthy",String.valueOf(instance.isHealthy()));
	params.put("ephemeral",String.valueOf(instance.isEphemeral()));
	params.put("metadata",JSON.toJSONString(instance.getMetadata()));
	
	regAPI(UtilAndComs .NACOS_URL_INSTANCE,params,HttpMethod.POSD);
}

对于nacos服务端,对外提供的服务接口请求地址为nacos/v1/ns/instance,实现代码在nacos-naming模块下的InstanceController类中:

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT+"/instance")
public class InstanceController{
//省略部分代码
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
	String serviceName = WebUtils.required(request,CommonParams.SERVICENAME);
	String namespaceId = WebUtils.optional(request,CommonParams,NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);
	serviceManager.registerInstance(namespaceId,serviceName,parseInstance(request));
	return"ok";
	}
//省略部分代码
}
  • 从请求参数汇总获得serviceName(服务名)和namespaceId(命名空间Id)

  • 调用registerInstance注册实例

public void registerInstance(String namespaceld, String serviceName, Instance instance)throws NacosException{
	createEmptyService(namespaceId,serviceNameinstance.isEphemeral());
	Service service=getService(namespaceId,serviceName);
	if (service== null){
		throw new NacosException(NacosException.INVALID_PARAM,"service not found,namespace:"+namespaceId +",service:"+serviceName);
	}
	addInstance(namespaceId,serviceName,instance.isEphemeral(),instance);
}
  • 创建一个控服务(在Nacos控制台服务列表中展示的服务信息),实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合
  • getService,从serviceMap中根据namespaceIdserviceName得到一个服务对象
  • 调用addInstance添加服务实例
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local,Cluster cluster) throws NacosException {
	Service service = getService(namespaceId,serviceName);
	if(service== null){
		service= new Service();
		service.setName(serviceName);
		service.setNamespaceId(namespaceId);
		service.setGroupName(NamingUtils.getGroupName(serviceName));
		service.setLastModifiedMillis(System.currentTimeMillis());
		service.recalculateChecksum();
		if(cluster != null){
			cluster.setService(service);
			service.getClusterMap().put(cluster.getName(),cluster);
		}
		service.validate();
		putServiceAndInit(service);
		if(!local){
			addOrReplaceService(service);
		}
	}
}
  • 根据namespaceIdserviceName从缓存中获取Service实例
  • 如果Service实例为空,则创建并保存到缓存中
private void putServiceAndInit(Service service) throws NacosException{
	putService(service);
	service.init();
	consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),true),service);
	consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),false),service);
	Loggers.SRV_LOG.info("[NEW-SERVICE]{}",service.toJSON());
}
  • 通过putService()方法将服务缓存到内存
  • service.init()建立心跳机制
  • consistencyService.listen实现数据一致性监听

service.init()方法的如下图所示,它主要通过定时任务不断检测当前服务下所有实例最后发送心跳包的时间。如果超时,则设置healthyfalse表示服务不健康,并且发送服务变更事件。

在这里请大家思考一一个问题,服务实例的最后心跳包更新时间是谁来触发的?实际上前面有讲到, Nacos客户端注册服务的同时也建立了心跳机制。
在这里插入图片描述

putService方法,它的功能是将Service保存到serviceMap中:

public void putService(Service service)(
	if(!serviceMap.containsKey(service.getNamespaceId())){
		synchronized (putServiceLock){
			if(!serviceMap.containsKey(service,getNamespaceId())){
				serviceMap.put(service.getNamespaceId(),new ConcurrentHashMap<>(16));
			}
		}
	}
	serviceMap.get(service.getNamespaceId()).put(service.getName(),service);
}

继续调用addInstance方法把当前注册的服务实例保存到Service中:

addInstance(namespaceId,serviceName,instance.isEphemeral(),instance)

1.4.3 总结

  • Nacos客户端通过Open API的形式发送服务注册请求
  • Nacos服务端收到请求后,做以下三件事:
    • 构建一个Service对象保存到ConcurrentHashMap集合中
    • 使用定时任务对当前服务下的所有实例建立心跳检测机制
    • 基于数据一致性协议服务数据进行同步

1.5 服务提供者地址查询

Open API:

curl -X GET127.00.1:8848/nacos/v1/ns/instance/list?serviceName=example

SDK:

List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException;

InstanceController中的list方法:

@GetMapping("/list")
public JSONObject list(HttpServletRequest request) throws Exception {
	String namespaceId = WebUtils.optional(request,CommonParams,NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);
	String serviceName = WebUtils.required(request,CommonParams.SERVICE_NAME);
	String agent =WebUtils.getUserAgent(request);
	String clusters = WebUtils.optional(request,"clusters",StringUtils.EMPTY);
	String clientIP = WebUtils.optional(request,"clientIp", StringUtils.EMPTY);
	Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort","0"));
	String env= WebUtils.optional(request,"env",StringUtils.EMPTY);
	boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request,"isCheck","false"));
	String app= WebUtils.optional(request,"app",StringUtils.EMPTY);
	String tenant = WebUtils.optional(request,"tid",StringUtils.EMPTY);
	boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request,"healthyOnly""false"));
	return doSrvIPXT(namespaceld, serviceName, agent, clusters, clientIP, udpPort, env,isCheck,app,tenant,healthyOnly);
}
  • 解析请求参数
  • 通过doSrvIPXT返回服务列表数据
public JSONObject doSrvIPXT(String namespaceld, String serviceName, String agent, String clusters,String clientIP,int udpPort,String env,boolean isCheck,String app,String tid,boolean healthyonly)
throws Exception {
	//以下代码中移除了很多非核心代码
	ClientInfo clientInfo = new ClientInfo(agent);
	JSONObject result=new JSONObject();
	Service service= serviceManager.getService(namespaceId,serviceName);
	List<Instance> srvedIPs;
	//获取指定服务下的所有实例 IP
	srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters,",")));
	Map<Boolean,List<Instance>>ipMap =new HashMap<>(2);
	ipMap.put(Boolean.TRUE,new ArrayList<>());
	ipMap.put(Boolean.FALSE,new ArrayList<>());
	for (Instance ip : srvedIPs){
		ipMap.get(ip.isHealthy()).add(ip);
	}
	//遍历,完成JSON字管中的纠装
	JSONArray hosts = new JSONArray();
	for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
		List<Instance> ips = entry.getValue();
		if (healthyOnly && !entry.getKey()){
			continue;
		}
		for (Instance instance :ips) {
			if (!instanceisEnabled()) {
				continue;
			}
			JSONObject ipobj=new JSONObject();
			ipobj.put("ip",instance.getIp());
			ipObj.put("port",instance.getPort());
			ipObj.put("valid",entry.getKey());
			ipObj.put("healthy",entry.getKey());
			ipObj.put("marked",instance.isMarked());
			ipObj.put("instanceId",instance.getInstanceId());
			ipObj.put("metadata",instance.getMetadata());
			ipObj.put("enabled",instance.isEnabled());
			ipObj.put("weight",instance.getweight());
			ipObj.put("clusterName",instance.getClusterName());
			if(clientInfo.type== ClientInfo.ClientType.JAVA 
				&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0."))>=0){
				ipObj.put("serviceName",instance.getServiceName());
			}else{
				ipObj.put("serviceName",NamingUtils.getServiceName(instance.getServiceName()));
			}
			ipObj.put("ephemeral",instance.isEphemeral());
			hosts.add(ipobj);
		}
	}
	result.put("hosts",hosts);
	result.put("name",serviceName);
	result.put("cacheMillis",cacheMillis);
	result.put("lastRefTime",System.currentTimeMillis());
	result.put("checksum",service.getChecksum());
	result.put("useSpecifiedURL",false);
	result.put("clusters",clusters);
	result.put("env",env);
	result.put("metadata",service.getMetadata());
	return result;
}
  • 根据namespaceIdserviceName获得Service实例
  • Service实例中基于srvIPs得到所有服务提供者实例
  • 遍历组装JSON字符串并返回

1.6 Nacos服务地址动态感知原理

可以通过subscribe方法来实现监听,其中serviceName表示服务名、EventListener表示监听到的事件:

void subscribe(String serviceName, EventListener listener) throws NacosException;

具体调用方式如下:

NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
naming.subscribe("example",event->(
	if (event instanceof NamingEvent) {
		System.out.println(((NamingEvent) event).getServceName());
		System.out.printIn(((NamingEvent) event).getInstances());
	}
});

或者调用selectInstance方法,如果将subscribe属性设置为true,会自动注册监听:

public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy,boolean subscribe)

在这里插入图片描述

Nacos客户端中有一个HostReactor类,它的功能是实现服务的动态更新,基本原理是:

  • 客户端发起时间订阅后,在HostReactor中有一个UpdateTask线程,每10s发送一次Pull请求,获得服务端最新的地址列表
  • 对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个Push消息给Nacos客户端,也就是服务端消费者
  • 服务消费者收到请求之后,使用HostReactor中提供的processServiceJSON解析消息,并更新本地服务地址列表

标签:String,service,Nacos,instance,serviceName,讲解,put,源码
From: https://www.cnblogs.com/jingzh/p/17709930.html

相关文章

  • Spring源码分析(一)Spring容器及Spring Bean
    (一)Spring容器及SpringBean1.Spring容器1.1什么是容器官网中有一句话Theorg.springframework.context.ApplicationContextinterfacerepresentstheSpringIoCcontainerandisresponsibleforinstantiating,configuring,andassemblingthebeans.翻译下来的意思是:Spr......
  • 讲解和介绍VNC的使用,以及和ssh的对比
    VNC(VirtualNetworkComputing)是一种远程桌面协议,它允许用户通过网络访问和控制远程计算机。通过VNC,用户可以在自己的计算机上查看和操纵远程计算机的桌面界面,就像在本地使用该计算机一样。VNC的使用非常简单。首先,需要在远程计算机上安装VNC服务器软件。然后,用户可以在本地计算机......
  • 【java基础】Token令牌生成 token加密串 生成token Aes加密 Base64加密 JWT 【附
    先看效果:Token令牌-生成工具包括:头部(header)+载荷(payload)+签证(signature) 可以自定义加密盐: 源码:地址一:GitLab地址二:123盘地址三:百度盘提取码:666 ......
  • Qemu源码分析(5)—Apple的学习笔记
    一,前言前言本节主要看线程什么时候创建的及创建的线程有什么作用。二,源码分析第一次进入断点仅主线程4215.root@ubuntu:/home/applecai#ps-T-p4215PIDSPIDTTYTIMECMD42154215pts/800:00:00qemu-system-gnu通过搜索文件名pthread关键字,然后找到qemu-thread-posix.c......
  • linux安装nacos
    一、准备1.安装包nacos-server-2.0.3.tar.gz2.jdk1.8+环境3.mysql二、解压安装#解压tarzxvfnacos-server-2.0.3.tar.gz-C/usr/local/nacos#进入解压目录中bin文件夹cd/usr/local/nacos/nacos-server-2.0.3/bin#试运行nacos服务#启动命令(standalone代表着单机模......
  • 失物招领系统的设计与实现-计算机毕业设计源码+LW文档
    题目的意义、目的:随着互联网的飞速发展,学校也进入了信息化时代。校园中大学生丢失物品的现象较为普遍,虽然目前国内有一些网上校园寻物平台或者是QQ群之类的,但是都不是很成熟,使得失主不能及时甚至找不到失物,给生活带来了极大的不便。通过互联网为在校师生搭建一个发布信息的平台,可......
  • 基于web的客户管理系统-计算机毕业设计源码+LW文档
    一、选题的目的及意义随着商业银行规模的发展,公司业务越来越多,客户和人员的管理也变得越来越困难。在传统的客户管理中,公司往往通过大量的人力和物力进行管理,通过手工记录客户,统计客户订单信息。这种传统的管理方法容易出错,而且不能适应现代化、信息化的发展过程。因此,本基于web的......
  • 基于Python的鲜花在线销售系统-计算机毕业设计源码+LW文档
    摘 要随着信息技术的发展,基于web模式的购物系统逐渐普及,网上购物是一种新型的商务模式,其工作流程和经营模式受到了欢迎。电子商务可以适应现代化快节奏的生活方式,满足各类人群足不出户的在线购物,利用商城使得买卖双方完成线上交易,提高了购买效率。然而像鲜花这种传统的行业,由于......
  • 基于Java Web的陕西旅游网站的设计与实现-计算机毕业设计源码+LW文档
    一、研究的背景和意义研究背景:本文主要是基于旅游业是我国现阶段发展的重要产业,旅游可以推动经济上的发展,通过深入的对当前旅游行业的研究,也随着网络技术的发展,传统的旅游方式游客已经无法满足,游客不再满足于单一路线的线路,无法进行更多的选择,每天日常的行程安排丧失了一定......
  • 基于JavaWeb的校园社团平台设计与开发-计算机毕业设计源码+LW文档
    摘要随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了基于JavaWeb的校园社团平台的开发全过程。通过分析基于JavaWeb的校园社团平台管理的不足,创建了一个计算机管理基于JavaWeb的校园社团平台的方案。文章介绍了基于JavaWeb的校园社......