注册表结构
在先说注册源码之前,先要对注册表结构有一个印象,注册表结构如下:
- 最外层使用
NameSpace
用来隔离环境 NameSpace
下有不同的Group
,Group
用来管理Service
Service
下有不同的Cluster
,Cluster
包含多个Instance
实例
服务端注册源码解析
注:本文不说集群信息同步、服务心跳源码、服务一致性协议实现源码。
createEmptyService 创建空Service
接着前文:Nacos1.4源码(1):客户端注册源码 提到注册请求走到InstanceController
便走到了服务端注册的源码。
下面就开始分析:
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校验服务名
NamingUtils.checkServiceNameFormat(serviceName);
//从HttpServletRequest中获取到Instance
//无非就是获取请求参数再封装成对象
final Instance instance = parseInstance(request);
//调用serviceManager,注册Instance
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
再看serviceManager
的registerInstance
方法:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建空的Service
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//获取到服务名称对应的Service
Service service = getService(namespaceId, serviceName);
//校验服务不为空
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//添加服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
主要有两大步骤:
createEmptyService
:创建空服务,意思是第一次注册该服务就会默认创建一个空的服务addInstance
:添加实例到创建的Service
中
先看createEmptyService
方法:主要逻辑是先获取,如果查不到对应的Service
就会创建一个新的Service
并设置一些属性,然后放到注册表并进行初始化。
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
//如果Service不存在会创建一个Service
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//根据命名空间和服务名称获取Service
Service service = getService(namespaceId, serviceName);
//service为空,则会创建
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
//设置一些Service的基本属性
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//校验
service.validate();
//把service放到注册表并初始化
putServiceAndInit(service);
if (!local) {
//集群信息同步
addOrReplaceService(service);
}
}
}
比较引人注意的就是putServiceAndInit
,总共做了三步,
- 把服务放到注册表
serviceMap
中 - 执行
Service
的init
初始化方法,主要是开启了Service
的心跳检查任务 - 最后一步是添加服务监听,非常重要的一步,也是nacos提前留下的伏笔,是什么伏笔,我们后面揭晓。
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
//真正的注册表
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
private void putServiceAndInit(Service service) throws NacosException {
//1.把Service放到注册表
putService(service);
//2.Service初始化操作
service.init();
//3.添加服务监听
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
//先判断Namespace是否存在,如不存在先创建一个空的命名空间
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
//把Service放到命名空间下
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
public void init() {
//启动客户端心跳检查任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
前面两步比较简单,注释已经写好,看最后一步,添加服务监听做了什么事情?
ConsistencyService
是一个接口,有很多的实现,但是我们注意到该类是注入到ServiceManager
中,bean的名称是"consistencyDelegate",不难找到该类是DelegateConsistencyServiceImpl
而实际上该类实际上是nacos服务一致性协议的顶级接口,
DelegateConsistencyServiceImpl
实际上是委托类,主要委托了持久化一致性协议实现和临时一致性协议实现。
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
因为默认情况下,服务实例都是临时的Ephemeral
,进入DelegateConsistencyServiceImpl
,会走到EphemeralConsistencyService
,而该类的实现只有一个DistroConsistencyServiceImpl
,使用的是Distro
一致性协议,是阿里自研的一致性协议。
再看DistroConsistencyServiceImpl
类的listen
方法:
注意这里传来的
RecordListener
实际上就是Service
,因为Service
实现了RecordListener
接口这里只做了一个事情:把服务的key和对应的
RecordListener
保存起来放到Map中了服务key保存了服务的名称空间、服务名、服务是否是临时的 等信息
//存放了所有key对应的监听器,监听器主要用于监听数据的变化
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
@Override
public void listen(String key, RecordListener listener) throws NacosException {
//判断map中是否包含对应的key
if (!listeners.containsKey(key)) {
//不包含,则初始化一个Queue
listeners.put(key, new ConcurrentLinkedQueue<>());
}
//判断当前Listener是否存在
if (listeners.get(key).contains(listener)) {
return;
}
//把listener放到队列中
listeners.get(key).add(listener);
}
至此创建空Service
的步骤结束。
addInstance 添加服务实例
源码如下:
核心有两步,addIpAddresses
和consistencyService.put
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//ephemeral 默认传true
//创建key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//获取对应服务
Service service = getService(namespaceId, serviceName);
synchronized (service) {
//获取到所有注册表的实例(做一些Cluster初始化、属性比对修改、增删等操作)
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
//创建Instances对象,把实例列表封装到Instances中
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//调用put方法
consistencyService.put(key, instances);
}
}
先看addIpAddresses
源码:
底层最后调用
updateIpAddresses
方法,而移除实例和添加实例由参数action
控制addIpAddresses:实际上是拷贝了一份注册表,然后做了在这个副本中做添加删除操作然后并返回,并没有直接修改注册表中的实例
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
//从nacos集群中获取到该服务对应的所有实例列表
//第一次进来,没有实例列表
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
//获取当前nacos所有的指定ephemeral的实例
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
//instanceMap 用来装当前nacos集群中已存在的服务实例
//第一次进来为空
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
//创建一个空Map
instanceMap = new HashMap<>(ips.length);
}
for (Instance instance : ips) {
//判断Service是否包含instance对应的cluster,第一次进来没有为true
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
//创建cluster
Cluster cluster = new Cluster(instance.getClusterName(), service);
//cluster 初始化
cluster.init();
//把cluster放到service中
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
//删除操作,移除instanceMap中对应的instance
instanceMap.remove(instance.getDatumKey());
} else {
//添加操作
Instance oldInstance = instanceMap.get(instance.getDatumKey());
//设置InstanceId
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
//默认情况:getIp() + "#" + getPort() + "#" + getClusterName() + "#" + getServiceName();
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
//把传入进来的instance放到instanceMap
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException(
"ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
.toJson(instanceMap.values()));
}
//返回所有服务实例列表
return new ArrayList<>(instanceMap.values());
}
执行完addIpAddresses
放到后,紧接着就会执行consistencyService.put(key, instances)
,但是这一行执行完成后,整个注册就走完了,所以我们可以猜测真正的注册到注册表的逻辑就在这一行。
前面分析过,ConsistencyService
是一个接口,调用的实现是DelegateConsistencyServiceImpl
,而真正执行put
方法的是DistroConsistencyServiceImpl
:
而该类的
put
方法看起来很简单,就两行,第二行调用DistroProtocol
的sync
方法向其他nacos节点同步注册信息不再本文讨论之内。所以核心就是onPut
方法
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
//同步操作(向其他nacos节点同步信息)
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
onPut
方法源码如下:
这里核心也有两步
- 第一步:构建一个
Datum
对象,把key
和Instances
设置到Datum
中,调用dataStore
的put方法- 第二步:调用
notifier
的addTask
方法
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
//把服务实例放到Map中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
先看dataStore,代码非常简单:
弄了一个
Map
,把key
和Datum
放到Map
中就完事了,所以这里不是实例注册到注册表的逻辑
@Component
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
dataMap.put(key, value);
}
//后面代码省略
再看notifier.addTask
方法:
先说
Notifier
这个类,是DistroConsistencyServiceImpl
类的内部类,并且实现了Runnable接口
addTask
方法也非常简单,就是直接把key
和action
放到BlockingQueue
中,也看不出什么名堂。到这里,注册逻辑就完全结束了,所以注册逻辑很可能就在
Runnable
的run
方法中。
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//放到队列中
tasks.offer(Pair.with(datumKey, action));
}
真正注册到注册表的逻辑 - Notifier 异步注册
上文说到执行到Notifier
的addTask
方法后,注册就走完了,同时向客户端响应ok,但是貌似服务实例还是没有注册到真正的注册表中。
这时候我们就需要分析Notifier
类,先看DistroConsistencyServiceImpl
类:
- 该类标注了@Service注解,被IOC容器所管理
- Notifier是该类的属性
- 有一个@PostConstruct方法,在该类创建时被调用,执行了
GlobalExecutor
的submitDistroNotifyTask
,而这个方法走进去也很简单,就是把Notifier
这个线程提交到线程池。
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private volatile Notifier notifier = new Notifier();
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
所以我们这时候就要分析Notifier
的run
方法:
是一个死循环,先从队列里
take
,这是阻塞队列的阻塞方法,拿不到东西就会一直阻塞,直到队列有值当
take
返回时,调用handle
方法,所以猜测注册逻辑就在handle
方法中
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
//从队列中拿并处理
Pair<String, DataOperation> pair = tasks.take();
//执行handle方法
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
再看handle
方法:
这里就可以回收前面说到的伏笔了,忘记了可以翻到上面标红的位置
这里根据key获取到之前存进来的
RecordListener
,实际上就是Service
,然后根据action的不同回调不同的方法。我们这里时注册,调用的是
RecordListener
的onChange
方法。
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
//拿到datumKey对应的所有RecordListener
// 实际上就是前面注册执行consistencyService.listen方法时,添加的Service
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
//执行onChange方法,从dataStore拿到datumKey对应的instances服务实例集
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
执行到 listener.onChange
这一行时,实际上就是执行的Service
的onChange
方法:
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
//设置权重
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//执行注册逻辑
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
//重新计算服务的checksum
recalculateChecksum();
}
核心就是updateIPs方法
:大致就是把需要注册的服务实例分分类,然后注册到指定名称的Cluster
下面,也是符合前面第一节提到的注册表结构,而真正的注册逻辑就在Cluster
的updateIps
方法中。
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
//先初始化空的ipMap
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
//Cluster不存在时先创建Cluster并初始化
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
//根据集群名称获取当前集群下的服务实例列表
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
//添加到clusterIPs中,实际上就放到了ipMap中
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
//遍历ipMap。拿到需要注册的服务实例
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
//把实例列表注册到指定名称的Cluster下
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
//发布服务改变事件
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
Cluster
的updateIps
方法源码如下:
这里设计的非常巧妙,是copy on write的思想,整个注册逻辑没有修改注册表,先拷贝了一个
Set
副本toUpdateInstances
,而实际上注册表中的实例的新增和删除并不是在这里做的,在前面ServiceManager
中的updateIpAddresses
方法中也是先拷贝了注册表的副本,然后修改的副本,然后放到队列中,这里异步取出来即入参ips
,然后做服务实例的状态更新,最后直接替换注册表的引用,达到更新注册表的目的。
public void updateIps(List<Instance> ips, boolean ephemeral) {
//先根据ephemeral参数获取到已经注册到到注册表中的instance集合
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
//把instance集合放到Map中
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
//获取需要更新的服务实例
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
ip.toString());
}
}
}
//获取新增的服务实例
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
//获取删除的服务实例
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
//放到toUpdateInstances中
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
//直接替换注册表
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
至此,整个 nacos 服务端服务实例注册全部结束。
标签:Nacos1.4,Service,service,instance,源码,key,注册表,new,服务端 From: https://www.cnblogs.com/wwjj4811/p/17013271.html