首页 > 其他分享 >Spring Cloud全解析:注册中心之Eureka服务获取和服务续约

Spring Cloud全解析:注册中心之Eureka服务获取和服务续约

时间:2024-08-11 17:54:17浏览次数:12  
标签:clientConfig return Spring Eureka applications new instanceInfo logger Cloud

服务获取和服务续约

eureka客户端通过定时任务的方式进行服务获取和服务续约,在com.netflix.discovery.DiscoveryClient类中,启动了两个定时任务来进行处理

private void initScheduledTasks() {
  // 是否需要拉取
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
      // 拉取的定时任务时间间隔是registryFetchIntervalSeconds
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
// 是否需要注册
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

来分别看一下两个定时逻辑

服务获取

线程是CacheRefreshThread,看一下run方法

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();
						// 第一次拉取所有
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
  // 刷新缓存
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }
缓存刷新逻辑
if (event instanceof CacheRefreshedEvent) {
                        if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                            logger.info("an update action is already queued, returning as no-op");
                            return;
                        }

                        if (!refreshExecutor.isShutdown()) {
                            try {
                                refreshExecutor.submit(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                          // 更新负载均衡器中的map
                                            updateAction.doUpdate();
                                            lastUpdated.set(System.currentTimeMillis());
                                        } catch (Exception e) {
                                            logger.warn("Failed to update serverList", e);
                                        } finally {
                                            updateQueued.set(false);
                                        }
                                    }
                                });  // fire and forget
                            } catch (Exception e) {
                                logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                                updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                            }
                        }
                        else {
                            logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                            stop();
                        }
                    }

服务续约

线程是HeartbeatThread,看一下run方法

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
      // 直接发送请求进行续约,很简单的逻辑
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

https://zhhll.icu/2023/框架/微服务/springcloud/注册中心/Eureka/源码分析/1.服务获取和服务续约/

标签:clientConfig,return,Spring,Eureka,applications,new,instanceInfo,logger,Cloud
From: https://blog.csdn.net/Lxn2zh/article/details/140948204

相关文章

  • spring 相关
    1.对spring的理解spring是一个广泛应用于企业级java开发的强大框架,为开发复杂的应用程序提供了全面而高效的解决方案1.spring的核心是控制反转+依赖注入,即IOC和DI,这意味着对象的创建和管理不再由开发者直接负责,而是可以交由Spring容器来处理,通过配置文件或者......
  • 基于Java Springboot“一分钟”寝室小卖部系统
    一、作品包含源码+数据库+设计文档万字+PPT+全套环境和工具资源+部署教程二、项目技术前端技术:Html、Css、Js、Vue、Element-ui数据库:MySQL后端技术:Java、SpringBoot、MyBatis三、运行环境开发工具:IDEA/eclipse数据库:MySQL5.7数据库管理工具:Navicat10以上版本环境......
  • 基于Java Springboot音乐播放器系统
    一、作品包含源码+数据库+设计文档万字+PPT+全套环境和工具资源+部署教程二、项目技术前端技术:Html、Css、Js、Vue、Element-ui数据库:MySQL后端技术:Java、SpringBoot、MyBatis三、运行环境开发工具:IDEA/eclipse数据库:MySQL5.7数据库管理工具:Navicat10以上版本环境......
  • 基于Java Springboot传统戏曲推广微信小程序
    一、作品包含源码+数据库+设计文档万字+PPT+全套环境和工具资源+部署教程二、项目技术前端技术:Html、Css、Js、Vue、Element-ui数据库:MySQL后端技术:Java、SpringBoot、MyBatis三、运行环境开发工具:IDEA/eclipse+微信开发者工具数据库:MySQL5.7数据库管理工具:Navica......
  • 基于Java Springboot宠物中心信息管理app或微信小程序
    一、作品包含源码+数据库+设计文档万字+PPT+全套环境和工具资源+部署教程二、项目技术前端技术:Html、Css、Js、Vue、Element-ui数据库:MySQL后端技术:Java、SpringBoot、MyBatis三、运行环境开发工具:IDEA/eclipse+微信开发者工具数据库:MySQL5.7数据库管理工具:Navica......
  • 时尚美妆化妆品电商商城网站-计算机毕设Java|springboot实战项目
    ......
  • FactoryBean -【Spring底层原理】
    FactoryBean作为一个生产或修饰对象的工厂Bean,那是如何生产Bean的呢,咱们通过实例来进行分析,这里就使用工厂Bean来生产Color对象//启动类publicclassMainTest{@TestpublicvoidTestMain(){AnnotationConfigApplicationContextapplicationContext=newAnnotation......
  • 深入浅出!这份阿里内传的“Spring-MVC源码分析与实践笔记”带你看透Spring-MVC源码!太牛
    第二章常见协议和标准DNS协议TCP/IP协议与SocketHTTP协议Servlet与JavaWeb开发第三章DNS的设置DNS解析Windows7设置DNS服务器Windows设置本机域名和IP的对应关系第四章Java中Socket的用法普通Socket的用法NioSocket的用法第五章自己动手实现HTTP协议第六......
  • [纯干货]SpringCould + 适配器模式 + nacos动态部署 OSS 对接
    一、前言在一个微服务项目里,我们的OSS云存储服务常常需要配置诸如阿里云、腾讯云、minio等多个云存储厂商的业务代码,而且后续无法确保是否会增添新的云存储厂商。此时,倘若我们要修改具体使用的云存储厂商,就会致使controller层和service层发生变动,这并不符合低耦合的理......
  • springMVC 请求流程解析
    @SuppressWarnings("deprecation")protectedvoiddoDispatch(HttpServletRequestrequest,HttpServletResponseresponse)throwsException{ //实际处理时用的请求,如果不是上传请求,则直接使用接收到的request,否则封装成上传的request HttpServletRequestprocessedRequ......