1 前言
我们看过很多的时机,比如服务数据源的关闭时机、服务正式开始处理请求的时机或者Tomcat和SpringBoot的协同、还有 mybatis等一些插件的入场时机等,这节我们要再看一个时机,就是关于跟注册中心(Eureka、Nacos)的时机,比如你有没有思考过:
我服务还没起来,你就到注册中心上线了,那请求过来岂不是就乱了,或者我服务要停止了我数据源都释放了,你还没从注册中心下线,那请求还过来是不是也会乱,所以我们就要看看微服务里上线或者叫注册和下线的时机都是什么时候,做到心中有数。本节我们先看注册时机。
环境的话,我本地有 Eureka,本节我们就拿 Eureka 注册中心调试看看。
服务的话,我有几个简单的微服务,都是平时用来调试的哈
2 注册时机
先来看个效果,当我的 demo 服务起来后:
问题来了,我怎么知道它的注册时机,从哪看呢?我从官网的文档里看了看:中文官网、英文官网,发现它只是从使用方式上介绍了怎么使用以及使用上的细节,并没说原理。
那怎么看呢?那就从服务的日志以及代码的依赖看起,当你看的源码多了,大多融合 SpringBoot 的方式都差不多。
找到注册的 Bean了没?就是他:EurekaAutoServiceRegistration
@Bean @ConditionalOnBean({AutoServiceRegistrationProperties.class}) @ConditionalOnProperty( value = {"spring.cloud.service-registry.auto-registration.enabled"}, matchIfMissing = true ) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); }
那我们就从 EurekaAutoServiceRegistration 看起,先看看它的类关系:
正如我图上所联系的,这个类有两个动作来驱动他执行,(1)事件监听(2)生命周期接口
(1)针对事件监听,它主要监听了 WebServerInitializedEvent(Web容器初始化完毕的事件) 和 ContextClosedEvent(上下文关闭事件也就是服务停止的事件) 两个事件
public void onApplicationEvent(ApplicationEvent event) { if (event instanceof WebServerInitializedEvent) { this.onApplicationEvent((WebServerInitializedEvent)event); } else if (event instanceof ContextClosedEvent) { this.onApplicationEvent((ContextClosedEvent)event); } }
(2)SmartLifecycle 生命周期接口,有两个重要的动作就是启动和停止
我们这里关注的是服务注册,也就是关注事件的监听里的 WebServerInitializedEvent 事件和生命周期的 start 方法。
那哪个先执行的呢?是生命周期的 start 先执行。我这里画个图简单回忆下:
看图哈,都是在刷新上下文的最后有个 finishRefresh 即结束上下文中的动作。具体可以看我之前的文章哈,就不阐述了。那我们就先看看 EurekaAutoServiceRegistration 的 start 方法:
// EurekaAutoServiceRegistration 的 start 方法 // private AtomicInteger port = new AtomicInteger(0); public void start() { // 这个时候进来 port 还是0 // 这里提前告诉你 他是通过监听事件 当监听到我们的 web容器启动完毕后,接收到监听拉更改端口的 此时还没启动web容器 所以这里还是0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // 这里的 nonSecurePort 默认就是我们服务的端口号 // 那么当刷新上下文的时候,这里会执行 if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent(new InstanceRegisteredEvent(this, this.registration.getInstanceConfig())); this.running.set(true); } }
this.registration.getNonSecurePort()的来源:
this.registration.getNonSecurePort(),这个端口号是来自 registration 我们这里是 Eureka 即 EurekaRegistration,而它又来自于:CloudEurekaInstanceConfig instanceConfig,
那我们看到第一次到 start 会执行这段代码:
// 这里的 nonSecurePort 默认就是我们服务的端口号 // 那么当刷新上下文的时候,这里会执行 if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent(new InstanceRegisteredEvent(this, this.registration.getInstanceConfig())); this.running.set(true); }
那么看到这里,我们得先了解下 serviceRegistry(EurekaServiceRegistry)和 registration(EurekaRegistration)的来源,都是在自动装配类里 EurekaClientAutoConfiguration:
EurekaServiceRegistry 的来源比较简单:
EurekaRegistration的来源:
可以看到 EurekaRegistration 有很多的依赖:EurekaClient、CloudEurekaInstanceConfig、ApplicationInfoManager、HealthCheckHandler。
那我们看看这四个怎么来的,先看 EurekaClient:
// 见 EurekaClientAutoConfiguration @Bean( destroyMethod = "shutdown" ) @ConditionalOnMissingBean( value = {EurekaClient.class}, search = SearchStrategy.CURRENT ) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) { ApplicationInfoManager appManager; // 拿到原始对象 if (AopUtils.isAopProxy(manager)) { appManager = (ApplicationInfoManager)ProxyUtils.getTargetObject(manager); } else { appManager = manager; } // 创建一个 client 对象 CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context); // 注册健康检查 cloudEurekaClient.registerHealthCheck(healthCheckHandler); return cloudEurekaClient; }
可以看到它也依赖很多: EurekaInstanceConfig、ApplicationInfoManager、HealthCheckHandler、EurekaClientConfig ,它跟 EurekaRegistration 的依赖差不多。
EurekaClientConfig 是 eureka.client 开头的配置 Bean:
EurekaInstanceConfig 是 eureka.instance 开头的配置 Bean:
HealthCheckHandler 健康检查是来源于 EurekaDiscoveryClientConfiguration 自动装配,当你开启了 eureka.client.healthcheck.enabled 的配置(默认=false 不开启)就会注册一个健康检查的 Bean:
ApplicationInfoManager 是对当前服务信息的一个管理 Bean,来源于 EurekaClientAutoConfiguration 自动装配类:
@Bean @ConditionalOnMissingBean( value = {ApplicationInfoManager.class}, search = SearchStrategy.CURRENT ) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) { // EurekaInstanceConfig eureka.instance 开头的配置Bean // InstanceInfoFactory 工厂来创建 InstanceInfo 也就是针对当前服务的信息封装到 InstanceInfo里 InstanceInfo instanceInfo = (new InstanceInfoFactory()).create(config); // 实例化 return new ApplicationInfoManager(config, instanceInfo); }
了解完这四个依赖,我们继续看 EurekaClient 的创建:
// 创建一个 client 对象 CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
CloudEurekaClient 最后会走到 DiscoveryClient 的构造器:
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // 省略... logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion()); // 不开启注册或者服务发现的话走这里 这个不看 我们重点看 else 的逻辑 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); this.scheduler = null; this.heartbeatExecutor = null; this.cacheRefreshExecutor = null; this.eurekaTransport = null; this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion()); DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); this.initTimestampMs = System.currentTimeMillis(); this.initRegistrySize = this.getApplications().size(); this.registrySize = this.initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.initRegistrySize); } else { try { // 调度线程池 this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build()); // 健康检查的线程池 this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); this.eurekaTransport = new DiscoveryClient.EurekaTransport(); this.scheduleServerEndpointTask(this.eurekaTransport, args); Object azToRegionMapper; if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(this.clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(this.clientConfig); } if (null != this.remoteRegionsToFetch.get()) { ((AzToRegionMapper)azToRegionMapper).setRegionsToFetch(((String)this.remoteRegionsToFetch.get()).split(",")); } this.instanceRegionChecker = new InstanceRegionChecker((AzToRegionMapper)azToRegionMapper, this.clientConfig.getRegion()); } catch (Throwable var12) { throw new RuntimeException("Failed to initialize DiscoveryClient!", var12); } // 服务注册并且强制在初始化的时候就注册 默认是false 也就是不会在初始化的时候注册 if (this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldEnforceRegistrationAtInit()) { try { // 调用注册 if (!this.register()) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable var10) { logger.error("Registration error at startup: {}", var10.getMessage()); throw new IllegalStateException(var10); } } // 初始化线程任务 this.initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable var9) { logger.warn("Cannot register timers", var9); } DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); this.initTimestampMs = System.currentTimeMillis(); this.initRegistrySize = this.getApplications().size(); this.registrySize = this.initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.initRegistrySize); } }
在这个构造器里,我调试发现它默认是不在初始化进行强制注册到 Eureka的,并且创建了几个线程池,那我们继续看看初始化线程任务里都要做什么事情:
// TimedSupervisorTask 它这个看似是一个体系用于做线程任务的 我们本节暂时不看它原理 private void initScheduledTasks() { int renewalIntervalInSecs; int expBackOffBound; // 默认是开启的 if (this.clientConfig.shouldFetchRegistry()) { // 间隔时间 默认30秒 renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds(); expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // cacheRefresh 续租任务 类似告诉服务器我还活着 别把我下掉 this.cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()); this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS); } // 默认开启的 注册到 eureka 这个是我们本节关注的 if (this.clientConfig.shouldRegisterWithEureka()) { renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs); // heartbeat 健康检查的 this.heartbeatTask = new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()); this.scheduler.schedule(this.heartbeatTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfoReplicator 实现了 Runnable this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); this.statusChangeListener = new StatusChangeListener() { public String getId() { return "statusChangeListener"; } public void notify(StatusChangeEvent statusChangeEvent) { DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent); DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; // 默认开启 往 applicationInfoManager 注册了一个监听 if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } // 启动 我们的服务注册就在这里 this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
关于 EurekaClient 的构造我们就暂时看到这里,我们继续看一个 InstanceInfoReplicator,看看这个任务干了些什么:
class InstanceInfoReplicator implements Runnable { ... // 构造器 InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; // 初始化了一个线程池 this.scheduler = Executors.newScheduledThreadPool(1, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d").setDaemon(true).build()); this.scheduledPeriodicRef = new AtomicReference(); this.started = new AtomicBoolean(false); this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", this.allowedRatePerMinute); } // start 方法就是往线程池里提交了一个任务 任务的内容就是自己的 run 方法 public void start(int initialDelayMs) { if (this.started.compareAndSet(false, true)) { this.instanceInfo.setIsDirty(); Future next = this.scheduler.schedule(this, (long)initialDelayMs, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); } } // this.instanceInfo.setIsDirty() 方法内容 // public synchronized void setIsDirty() { // this.isInstanceInfoDirty = true; // this.lastDirtyTimestamp = System.currentTimeMillis(); // } // ... public void run() { boolean var6 = false; ScheduledFuture next; label53: { try { var6 = true; this.discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime(); // 不为空 执行注册 if (dirtyTimestamp != null) { this.discoveryClient.register(); this.instanceInfo.unsetIsDirty(dirtyTimestamp); var6 = false; } else { var6 = false; } break label53; } catch (Throwable var7) { logger.warn("There was a problem with the instance info replicator", var7); var6 = false; } finally { if (var6) { ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); } } next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); return; } next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); } }
哎哟,这个注册,第一次的落点在 EurekaClient 的构造器里,通过启动 InstanceInfoReplicator 来进行注册的。
但是有个重要的信息,就是它都是带 @Lazy 标记的,也就是不会在刷新上下文的最后阶段主动初始化这些类,而是在第一次使用的时候才进行实例化的。
那么回到我们本节的主题,它的注册时机在哪里?或者第一次调用这就要回到我们最初的EurekaAutoServiceRegistration 生命周期 start 方法里,就是我下边红色加粗的这里:
// EurekaAutoServiceRegistration 的 start 方法 // private AtomicInteger port = new AtomicInteger(0); public void start() { // 这个时候进来 port 还是0 // 这里提前告诉你 他是通过监听事件 当监听到我们的 web容器启动完毕后,接收到监听拉更改端口的 此时还没启动web容器 所以这里还是0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // 这里的 nonSecurePort 默认就是我们服务的端口号 // 那么当刷新上下文的时候,这里会执行 也就是第一次在这里的执行 if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent(new InstanceRegisteredEvent(this, this.registration.getInstanceConfig())); this.running.set(true); } }
走到这里:this.serviceRegistry.register(this.registration);
// 服务注册 public void register(EurekaRegistration reg) { // 看人家这个取名 maybe 或许 初始化 client this.maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable((healthCheckHandler) -> { reg.getEurekaClient().registerHealthCheck(healthCheckHandler); }); } // getApplicationInfoManager 是不是就会开始创建 我们的 ApplicationInfoManager 它由依赖于 EurekaClient // 是不是就都创建起来了 private void maybeInitializeClient(EurekaRegistration reg) { reg.getApplicationInfoManager().getInfo(); reg.getEurekaClient().getApplications(); }
好啦,至此到这里,创建 EurekaClient 的时候,进行服务注册的。
最后再看看注册方法,哎哟,这里就是第一次的注册:
boolean register() throws Throwable { logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier); EurekaHttpResponse httpResponse; try { // 服务注册 httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo); } catch (Exception var3) { logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3}); throw var3; } if (logger.isInfoEnabled()) { logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
跟我们的日志也能对上:
所以服务第一次注册的时机,默认的情况下是在 刷新上下文的 finishRefresh 里调用 Bean 生命周期的 start,通过 serviceRegistry.registry 方法来第一次加载 EurekaClient 相关的 Bean,在 EurekaClient 的构造器里通过 InstanceInfoReplicator 来进行服务的注册。
3 小结
好啦,本节我们就暂时看到这里,下节我们再看下线时机,有理解不对的地方欢迎指正哈。
标签:clientConfig,DiscoveryClient,SpringBoot,注册,registration,new,时机,logger From: https://www.cnblogs.com/kukuxjx/p/18212295