首页 > 编程语言 >Nacos源码阅读心得

Nacos源码阅读心得

时间:2023-10-25 17:46:31浏览次数:27  
标签:String service Nacos instance 源码 new serviceName 心得 public

Nacos注册中心(1.4.1)源码解读心得

一丶Nacos介绍

  Nacos是阿里巴巴推出的一款新开源项目,是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。它致力于帮助您发现、配置和管理微服务,提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos是构建以“服务”为中心的现代应用架构(例如微服务范式、云原生范式)的服务基础设施。

Nacos支持多种核心特性,包括:

  1. 服务发现:支持DNS与RPC服务发现,也提供原生SDK、OpenAPI等多种服务注册方式和DNS、HTTP与API等多种服务发现方式。
  2. 服务健康监测:提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。
  3. 动态配置服务:提供配置统一管理功能,能够帮助我们将配置以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。
  4. 动态DNS服务:支持动态DNS服务权重路由,能够很容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。
  5. 服务及其元数据管理:支持从微服务平台建设的视角管理数据中心的所有服务及元数据,包括管理服务的描述、生命周期、服务的静态依赖分析、服务的健康状态、服务的流量管理、路由及安全策略、服务的SLA以及最首要的metrics统计数据。

  Nacos可以与Spring、Spring Boot、Spring Cloud集成,并能代替Spring Cloud Eureka和Spring Cloud Config。通过Nacos Server和spring-cloud-starter-alibaba-nacos-config实现配置的动态变更。它提供了一个简洁易用的UI(控制台样例Demo)帮助您管理所有的服务和应用的配置。同时,它也提供了一些简单的DNS APIs TODO帮助您管理服务的关联域名和可用的IP:PORT列表。

二丶客户端注册流程

  • 在SpringCloudAlibaba这一套微服务组件中,Nacos作为注册中心向其他的微服务提供信息,业务服务通过从Nacos中拉取所需要的服务信息,再通过Ribbon在本地做负载均衡之后通过Feign组件发起接口调用。例如在电商系统中的下单服务,库存服务,支付服务等。完成一个下单过程中,下单服务需要调用库存服务减库存,调用支付服务完成支付等,而库存服务,支付服务的信息都会存储在注册中心即Nacos服务中。服务之间的调用只需要通过注册中心获取,不再需要每个服务都去存储需要调用的服务信息了,完成了解耦。
  • 客户端想要注册到注册中心去就要先引入Nacos的客户端依赖spring-cloud-starter-alibaba-nacos-discovery,并在配置文件中配上Nacos的服务地址和命名空间等信息。想要知道Nacos客户端的注册流程就得从引入的依赖入手,从maven依赖库中找到nacos的jar包,下面有一个META-INF文件夹,里面的spring.factory文件就是springboot自动装配过程中会装配的类:

     可以看到,红框标注的类就是跟服务发现自动装配相关性比较大的类文件了,直接在项目中搜索这个类可以看到以下文件:

    @Configuration
    @EnableConfigurationProperties
    @ConditionalOnNacosDiscoveryEnabled
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
            AutoServiceRegistrationAutoConfiguration.class })
    public class NacosDiscoveryAutoConfiguration {
    
        @Bean
        public NacosServiceRegistry nacosServiceRegistry(
                NacosDiscoveryProperties nacosDiscoveryProperties) {
            return new NacosServiceRegistry(nacosDiscoveryProperties);
        }
    
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        public NacosRegistration nacosRegistration(
                NacosDiscoveryProperties nacosDiscoveryProperties,
                ApplicationContext context) {
            return new NacosRegistration(nacosDiscoveryProperties, context);
        }
    
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        public NacosAutoServiceRegistration nacosAutoServiceRegistration(
                NacosServiceRegistry registry,
                AutoServiceRegistrationProperties autoServiceRegistrationProperties,
                NacosRegistration registration) {
            return new NacosAutoServiceRegistration(registry,
                    autoServiceRegistrationProperties, registration);
        }
    }

    在这个类中有三个@Bean注解,仔细观察前两个bean都在第三个bean的参数中,所以第三个bean就是比较重要的Bean了。

  • 直接搜索NacosAutoServiceRegistry这个类可以看到其中有一个方法名为register(),通过类名和方法名大概能猜到这就是注册的主逻辑了。
    @Override
        protected void register() {
            if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
                log.debug("Registration disabled.");
                return;
            }
            if (this.registration.getPort() < 0) {
                this.registration.setPort(getPort().get());
            }
            super.register();
        }
  • 直接点到注册方法中去是NacosAutoServiceRegistry的抽象父类,抽象父类中有一个对象serviceRegistry,注册方法就是这个对象的register()方法,继续跟到这个register()方法中去:
    @Override
        public void register(Registration registration) {
    
            if (StringUtils.isEmpty(registration.getServiceId())) {
                log.warn("No service to register for nacos client...");
                return;
            }
    
            String serviceId = registration.getServiceId();
    
            Instance instance = getNacosInstanceFromRegistration(registration);
    
            try {
                namingService.registerInstance(serviceId, instance);
                log.info("nacos registry, {} {}:{} register finished", serviceId,
                        instance.getIp(), instance.getPort());
            }
            catch (Exception e) {
                log.error("nacos registry, {} register failed...{},", serviceId,
                        registration.toString(), e);
            }
        }

    可以看到通过getNacosInstanceFromRegistration()方法将Registration 转换成为了Instance 对象,后续通过namingService.registerInstance(serviceId, instance);进行了注册的动作,而这个Instance 对象其实就是我们Nacos服务端所存储的微服务相关的一些信息。

  • 继续跟namingService.registerInstance(serviceId, instance);方法:
     @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    
            if (instance.isEphemeral()) {
                BeatInfo beatInfo = new BeatInfo();
                beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
                beatInfo.setIp(instance.getIp());
                beatInfo.setPort(instance.getPort());
                beatInfo.setCluster(instance.getClusterName());
                beatInfo.setWeight(instance.getWeight());
                beatInfo.setMetadata(instance.getMetadata());
                beatInfo.setScheduled(false);
                long instanceInterval = instance.getInstanceHeartBeatInterval();
                beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
    
                beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
            }
    
            serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
        }

    这是NamingService的实现类NacosNamingService中的实现,可以看到进行了一个if判断,这个其实是判断是否是一个临时的实例,如果是临时实例做一些处理,最后的注册请求是在serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);方法中的:

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
            NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
                namespaceId, serviceName, instance);
    
            final Map<String, String> params = new HashMap<String, String>(9);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put(CommonParams.GROUP_NAME, groupName);
            params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
            params.put("ip", instance.getIp());
            params.put("port", String.valueOf(instance.getPort()));
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("enable", String.valueOf(instance.isEnabled()));
            params.put("healthy", String.valueOf(instance.isHealthy()));
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    
            reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    
        }

    组装了一些参数最后发起了POST请求.其中UtilAndComs.NACOS_URL_INSTANCE这个常量最后拼接出来是/nacos/v1/ns/instance

  • 得出结论:客户端通过将自己服务的信息包括ip端口,命名空间,服务名等信息组装好,向Nacos服务发起POST请求注册到注册中心去。

三丶服务端存储客户端注册的服务信息

  •  在上面客户端注册流程最后我们得到了一个URI:/nacos/v1/ns/instance  通过这个URI我们可以去服务端的源码中搜索这个接口:
     @CanDistro
        @PostMapping
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public String register(HttpServletRequest request) throws Exception {
            
            final String namespaceId = WebUtils
                    .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
            final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            NamingUtils.checkServiceNameFormat(serviceName);
            
            final Instance instance = parseInstance(request);
            
            serviceManager.registerInstance(namespaceId, serviceName, instance);
            return "ok";
        }

    以上是服务端的注册实例的接口,主要完成了三个动作:检查服务名信息,将请求中的参数转换为Instance实例对象,注册实例,继续跟 serviceManager.registerInstance(namespaceId, serviceName, instance)注册的方法:

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
            
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
            
            Service service = getService(namespaceId, serviceName);
            
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                        "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
            
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }

     

    这个方法中主要动作有:创建空的Service对象,获取Service对象,注册实例等动作。

     

    createEmptyService(namespaceId, serviceName, instance.isEphemeral())方法:
     public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
                throws NacosException {
            Service service = getService(namespaceId, serviceName);
            if (service == null) {
                
                Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
                service = new Service();
                service.setName(serviceName);
                service.setNamespaceId(namespaceId);
                service.setGroupName(NamingUtils.getGroupName(serviceName));
                // now validate the service. if failed, exception will be thrown
                service.setLastModifiedMillis(System.currentTimeMillis());
                service.recalculateChecksum();
                if (cluster != null) {
                    cluster.setService(service);
                    service.getClusterMap().put(cluster.getName(), cluster);
                }
                service.validate();
                
                putServiceAndInit(service);
                if (!local) {
                    addOrReplaceService(service);
                }
            }
    
    

    先调用了getService方法:

    public Service getService(String namespaceId, String serviceName) {
            if (serviceMap.get(namespaceId) == null) {
                return null;
            }
            return chooseServiceMap(namespaceId).get(serviceName);
        }

    其中的serviceMap结构是

    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    这个map其实就是注册中心保存所有实例的map,最外层的key一般为namespace,里层的key一般为定义的group名,根据业务需要定义。

    public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
     ...private Map<String, Cluster> clusterMap = new HashMap<>();
     ...
    }

    map中的Service对象其中又有一个clusterMap,而Cluster对象的结构如下

    public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
        ...
        
        @JsonIgnore
        private Set<Instance> persistentInstances = new HashSet<>();
        
        @JsonIgnore
        private Set<Instance> ephemeralInstances = new HashSet<>();
        
        @JsonIgnore
        private Service service;
    ...
    }

    看到这里的两个HashSet中的Instance对象是否有些眼熟?他就是客户端注册到注册中心的服务实例信息

  • 所以整体来看,服务端存储各个微服务的结构如图所示:
  •  那么再回到上述的getService方法中,通过命名空间就可以得到同一组下面的服务,而chooseServiceMap(namespaceId).get(serviceName)又通过服务名来获取Service,其实得到的结构就是一个个的service其中还有一层Cluster;

  • 那么此时我的微服务还并未完成注册获取到的Service肯定是Null,继续往下走就会新构建一个Service,经过前面的赋值校验方法,会走到putServiceAndInit(service)方法中去:
    private void putServiceAndInit(Service service) throws NacosException {
            putService(service);
            service.init();
            consistencyService
                    .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
            consistencyService
                    .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
            Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
        }

    这个方法中做了两步很重要的操作,其中putService()方法:

    public void putService(Service service) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                synchronized (putServiceLock) {
                    if (!serviceMap.containsKey(service.getNamespaceId())) {
                        serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
                    }
                }
            }
            serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
        }

    就是将我们的service放到serviceMap中,源码中就用到了双检锁。

  • 而在service.init()方法中:
     public void init() {
            HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
            for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
                entry.getValue().setService(this);
                entry.getValue().init();
            }
        }

    第一行的 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);看类名是健康检查相关的,那么可以想到注册实例肯定要把自己的健康信息更新到注册中心去,再看参数:clientBeatCheckTask 服务心跳检查任务,点到这个类中:

    public class ClientBeatCheckTask implements Runnable {
        
        private Service service;
        
        @Override
        public void run() {
            try {
                if (!getDistroMapper().responsible(service.getName())) {
                    return;
                }
                
                if (!getSwitchDomain().isHealthCheckEnabled()) {
                    return;
                }
                
                List<Instance> instances = service.allIPs(true);
                
                // first set health status of instances:
                for (Instance instance : instances) {
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                        if (!instance.isMarked()) {
                            if (instance.isHealthy()) {
                                instance.setHealthy(false);
                                Loggers.EVT_LOG
                                        .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                                instance.getIp(), instance.getPort(), instance.getClusterName(),
                                                service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                                instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                                getPushService().serviceChanged(service);
                                ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                            }
                        }
                    }
                }
                
                if (!getGlobalConfig().isExpireInstance()) {
                    return;
                }
                
                // then remove obsolete instances:
                for (Instance instance : instances) {
                    
                    if (instance.isMarked()) {
                        continue;
                    }
                    
                    if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                        // delete instance
                        Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                JacksonUtils.toJson(instance));
                        deleteIp(instance);
                    }
                }
                
            } catch (Exception e) {
                Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
            }
            
        }
    }

    实现了Runnable接口说明是一个线程,直接看run方法

     List<Instance> instances = service.allIPs(true);
    public List<Instance> allIPs(boolean ephemeral) {
            List<Instance> result = new ArrayList<>();
            for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
                result.addAll(entry.getValue().allIPs(ephemeral));
            }
            
            return result;
        }
    public List<Instance> allIPs(boolean ephemeral) {
            return ephemeral ? new ArrayList<>(ephemeralInstances) : new ArrayList<>(persistentInstances);
        }

    获取了service中的所有Instance,然后做一系列的心跳检查,发布事件等。到此就做完了实例初始化的动作。

  • 上述是createEmptyService(namespaceId, serviceName, instance.isEphemeral())方法:
  • 那么接下来
     Service service = getService(namespaceId, serviceName);

    肯定可以获取到service,直接看最后的addInstance(namespaceId, serviceName, instance.isEphemeral(), instance)方法:

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
                throws NacosException {
            
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
            
            Service service = getService(namespaceId, serviceName);
            
            synchronized (service) {
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
                
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
                
                consistencyService.put(key, instances);
            }
        }
  • String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral)这个方法通过nameSpaceId,服务名信息组建了一个key:
    public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {
            return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName)
                    : buildPersistentInstanceListKey(namespaceId, serviceName);
        }
    可以看到是根据ephemeral来判断的,这个值是控制是否临时实例的,Instance中的默认值是true表示默认新建的就是临时实例,那么构建出来的字符串:"com.alibaba.nacos.naming.iplist.ephemeral."+ namespaceId +"##"+serviceName
    中间是有ephemeral.的;下面又将初始化了的Instance获取到加锁执行注册逻辑:
    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
                throws NacosException {
            
            Datum datum = consistencyService
                    .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
            
            List<Instance> currentIPs = service.allIPs(ephemeral);
            Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
            Set<String> currentInstanceIds = Sets.newHashSet();
            
            for (Instance instance : currentIPs) {
                currentInstances.put(instance.toIpAddr(), instance);
                currentInstanceIds.add(instance.getInstanceId());
            }
            
            Map<String, Instance> instanceMap;
            if (datum != null && null != datum.value) {
                instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
            } else {
                instanceMap = new HashMap<>(ips.length);
            }
            
            for (Instance instance : ips) {
                if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                    Cluster cluster = new Cluster(instance.getClusterName(), service);
                    cluster.init();
                    service.getClusterMap().put(instance.getClusterName(), cluster);
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                }
                
                if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                    instanceMap.remove(instance.getDatumKey());
                } else {
                    Instance oldInstance = instanceMap.get(instance.getDatumKey());
                    if (oldInstance != null) {
                        instance.setInstanceId(oldInstance.getInstanceId());
                    } else {
                        instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
                    }
                    instanceMap.put(instance.getDatumKey(), instance);
                }
                
            }
            
            if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
                throw new IllegalArgumentException(
                        "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                                .toJson(instanceMap.values()));
            }
            
            return new ArrayList<>(instanceMap.values());
        }

    这个方法返回值是一个List<Instance>直接看返回值是instanceMap.values(),这个map的来源是通过第一个

    Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral))

     

     

    的结果判断的,流程是先判断这个实例是否已经注册到注册中心了,没有的话新增一个instanceMap,有将其中的旧元素都放到新的instanceMap中,再对传过来的instance做一系列的检查注册操作,返回现有的instanceMap中的元素集合。

  • 通过返回的List<Instance>构建一个Instances对象,这个对象里面的结构:
    public class Instances implements Record {
        
        private static final long serialVersionUID = 5500823673993740145L;
        
        private List<Instance> instanceList = new ArrayList<>();
    }

    最后一步consistencyService.put(key, instances)方法:

    public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                    globalConfig.getTaskDispatchPeriod() / 2);
        }

    onPut方法就是注册的核心逻辑了:

     public void onPut(String key, Record value) {
            
            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
                Datum<Instances> datum = new Datum<>();
                datum.value = (Instances) value;
                datum.key = key;
                datum.timestamp.incrementAndGet();
                dataStore.put(key, datum);
            }
            
            if (!listeners.containsKey(key)) {
                return;
            }
            
            notifier.addTask(key, DataOperation.CHANGE);
        }

    其中notifier的结构:

     public class Notifier implements Runnable {
            
            private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
            
            private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
    }

    说明又是一个线程类,成员变量有一个tasks的阻塞队列。

     public void addTask(String datumKey, DataOperation action) {
                
                if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                    return;
                }
                if (action == DataOperation.CHANGE) {
                    services.put(datumKey, StringUtils.EMPTY);
                }
                tasks.offer(Pair.with(datumKey, action));
            }

    通过观察addTask方法可以看出向上面提到的阻塞队列tasks中添加了一个对象:

    public static <A, B> Pair<A, B> with(A value0, B value1) {
            return new Pair(value0, value1);
        }

    这里的Pair.with()可以不用管,只需要知道返回的一个Pair对象包含了前面生成的key和action,action代表了操作类型是新增或者编辑删除之类的。

          那么这个阻塞队列里面就包含了我们的服务信息,服务名,nameSpaceId,是否是临时实例等。再回去看notifier的run方法:

public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }

  可以看出是一个线程从阻塞队列中循环拿出实例的信息到后续的handle方法:

private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;
                
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    
                    count++;
                    
                    try {
                        if (action == DataOperation.CHANGE) {
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }

 

 @Override
    public void onChange(String key, Instances value) throws Exception {
        
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        
        for (Instance instance : value.getInstanceList()) {
            
            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }
            
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        
        recalculateChecksum();
    }

在这里拿出所有的Instance实例对象进行权重默认值的设置,之后 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key))方法:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
                
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
                
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
                
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
                
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }
        
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }
        
        setLastModifiedMillis(System.currentTimeMillis());
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();
        
        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
        }
        
        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                stringBuilder.toString());
        
    }

 

其中将Instance实例放入Map中的逻辑:

if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }

可以得出结论:Nacos中有一个线程从阻塞队列notifier中拿出新注册的Instance初始化处理之后根据是否是临时实例放到对应的HashSet中,即保存了新注册的实例信息。而每次注册的结果就是将注册信息放入到阻塞队列中去;这种异步处理的方式使得Nacos的TPS可以达到1w3+,因为其保存的服务信息对实时性要求并不高,这种场景下使用异步处理是合适的。而Nacos在更新实例信息时采用了写时复制的思想,保证了服务上下线修改Map时的效率。由于其写时复制复制的其实是Cluster中的内容,不是复制整个Map,所以它的效率也是很高的。

附一张图灵学院诸葛老师总结的流程图:

 

标签:String,service,Nacos,instance,源码,new,serviceName,心得,public
From: https://www.cnblogs.com/ybug/p/17787741.html

相关文章

  • Metasploit Linux Reverse_Tcp Shellcode 源码分析
    分析Metasploitlinux/x64/shell/reverse_tcpshellcodeShellcode生成使用msfvenom生成c格式的stagedshellcode$msfvenom-plinux/x64/shell/reverse_tcp-fc-ax64--platformlinuxLHOST=192.168.48.233LPORT=4444Payloadsize:130bytesFinalsizeofcf......
  • 学习c语言的心得
    培养良好的编程习惯:C语言是一门底层语言,需要严谨的编码和规范的命名,特别是对于变量、函数和数据类型的命名,要清晰明了,易于理解和维护。同时,编写注释也是非常重要的,能够帮助自己和他人更好地理解代码逻辑。多实践多动手:学习C语言最好的办法就是多动手写代码,通过实践来巩固所学的知......
  • 03-6021蓝牙源码分析
    目录一.非定向广播1.voidapp_remote_normal_undir_adv(void)1.1.boolapp_remote_check_bond_state(void)一.非定向广播1.voidapp_remote_normal_undir_adv(void)voidapp_remote_normal_undir_adv(void){//printk("%s\r\n",__FUNCTION__);printk("......
  • 直播商城系统源码,快速滑动条/快速滑块/快速滚动条标准实现
    直播商城系统源码,快速滑动条/快速滑块/快速滚动条标准实现 /* *Copyright2018TheAndroidOpenSourceProject * *LicensedundertheApacheLicense,Version2.0(the"License"); *youmaynotusethisfileexceptincompliancewiththeLicense. *Youmay......
  • 直播app系统源码,bootstrap5 text左对齐右对齐
    直播app系统源码,bootstrap5text左对齐右对齐在bootstrap4中text左/右对齐   <h1class="text-right">右对齐</h1>  <h1class="text-left">左对齐</h1>  <h1class="text-center">居中</h1> ​看了下官网在bootstrap5中就不起作用了换成t......
  • ZB集团的HIS系统的后期管理维护中的一些心得体会
    忙忙碌碌,时间好快,已经三年过去,不经意间发现,留存在草稿箱的第一篇随笔是三年期的,说实话,三年前很忙,现在似乎也没闲下来! 那时候的自己是一名某医疗集团的CTO(信息部主管),设计互联网大大小小的事情都要经手,实在是心力交瘁,尽管如此,我却甘之如饴,因为,在那几年我学到了很多,我相信,每个人在人......
  • springboot生成二维码的正确姿势-附视频附源码
    @目录前言初始化SpringBoot项目引入依赖编码编写工具类生成二维码资源共享二维码的原理是什么,如何保证不重复?你有没有想过这样一件事,二维码是实现原理是什么?如何保证各个平台的二维码是唯一的?就算你的程序停止运行,但是你的二维码依然存在。设计上要保证唯一性,比如在物流等环......
  • javaweb第11章源码
    javaweb第11章源码下载链接:https://wwpv.lanzoue.com/ifkAa1crixqd文件结构CHAPTER11│.classpath│.project│├─.settings│.jsdtscope│org.eclipse.jdt.core.prefs│org.eclipse.wst.common.component│org.eclipse.wst.common.proje......
  • springboot洗衣店管理系统-计算机毕业设计源码+LW文档
    摘 要随着时代的发展,人们的工作也学习压力越来越大,很多时候空闲时间也越来也少,经常没有时间去洗自己的衣服,很多商家在看到这种情况之后开设了洗衣机店专门用于服务这些没有时间洗衣服的人,但是传统的洗衣店都是用手动的的模式在管理,这种模式很落后,为了改善这一情况很多地方开设了......
  • 动漫周边电商网站的设计与实现-计算机毕业设计源码+LW文档
    摘要随着时代的发展,电子商务已经遍布了我们生活的每一个角落,尤其是在移动互联网迅速发展的今天,网上购物基本已经成为了人们生活中的一部分,为了让购物变得更加的方便快捷我们通过Java语言和SpringBoot框架开发了本次的动漫周边电商网站。基于SpringBoot的动漫周边电商网站从实际......