首页 > 编程语言 >Nacos Client 源码分析(二)服务订阅与推送消息处理

Nacos Client 源码分析(二)服务订阅与推送消息处理

时间:2023-05-12 22:46:26浏览次数:56  
标签:订阅 serviceInfoHolder 服务 String serviceInfo Nacos Client 源码 客户端

1. 概述

在上一篇文章《Nacos Client 源码分析(一)事件的发布与订阅》分析了 Nacos Client 的发布订阅机制,但我们现在还不清楚NotifyCenterpublishEvent方法是怎么被调用的以及客户端向服务端订阅服务的具体流程。下面我们对继续分析 Nacos 的源码。

2. 服务订阅

还是从NacosNamingServiceinit方法开始分析,注意到以下几句代码。notifierEventScope标识一个事件的作用范围,也可以理解为事件是面向哪一个客户端,后面会用到。ServiceInfoHolder保存了客户端请求到的服务信息。NamingClientProxyDelegate是命名服务客户端代理的委托,其内部实际使用的是NamingHttpClientProxyNamingGrpcClientProxy

    private void init(Properties properties) throws NacosException {
        // ...
        this.notifierEventScope = UUID.randomUUID().toString();
        // ...
        this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
    }

NamingClientProxyDelegate的构造方法如下。

    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,
            InstancesChangeNotifier changeNotifier) throws NacosException {
        // ...
        this.serviceInfoHolder = serviceInfoHolder;
        // ...
        this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
                serviceInfoHolder);
    }

我们调用的subscribe方法最终是调用了NamingClientProxyDelegate类的subscribe方法。

    @Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
            throws NacosException {
        if (null == listener) {
            return;
        }
        String clusterString = StringUtils.join(clusters, ",");
        changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
        // 发起订阅
        clientProxy.subscribe(serviceName, groupName, clusterString);
    }

NamingClientProxyDelegate类的subscribe方法中,首先会计算出要订阅的服务标识(组名+服务名+集群),然后在serviceInfoHolder 中取出服务信息的缓存,如果服务信息不存在或是这个服务没有被订阅就会使用grpcClientProxy发起订阅请求,最后serviceInfoHolder会对返回的服务信息ServiceInfo进行处理。服务信息中包含了订阅服务的实例。

    @Override
    public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
        NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
        // 1. 计算服务标识
        String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
        String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
        serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
        // 2. 获取服务信息缓存
        ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        // 3. 发起订阅请求
        if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
            result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
        }
        // 4. 处理服务信息
        serviceInfoHolder.processServiceInfo(result);
        return result;
    }


ServiceInfoHolder在内部维护了一个ConcurrentMap用于缓存服务信息。

processServiceInfo方法的如下,如果服务信息变更会触发事件发布。现在我们可以知道,NotifyCenterpublishEvent方法是由 ServiceInfoHolder调用的,并基于新的ServiceInfo的内容构建一个InstancesChangeEvent。也就是所我们在订阅成功后会立即触发一个实例变更事件。

    public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        // 1. 获取服务标识
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        // 2. 获取旧的服务信息
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            //empty or error push, just ignore
            return oldService;
        }
        // 3. 缓存新的服务信息
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        // 4. 判断服务信息是否发生变更
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) {
            NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                    JacksonUtils.toJson(serviceInfo.getHosts()));
            // 5. 当服务信息发生变更时,发布一个实例变化事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }

3. 处理服务端消息推送

以上是在首次订阅过程中进行的事件发布,那么其他情况下服务实例变更事件又是如何被发布的呢。比如我们手动让一个实例下线。

publishEvent方法开始查看栈帧调用,我们发现了 gRPC 的onNext方法。在 gRPC 的流式 RPC 中,客户端在接收到服务端发送的流式数据时,可以通过onNext()方法来处理每一条接收到的数据。onNext()方法会接收一个消息对象,这个对象就是服务端发送给客户端的一条数据。当客户端接收到消息时,会自动调用onNext()方法来进行处理。

这就是说客户端向服务端发送订阅请求后,会使用流式 RPC 来处理服务端的消息推送。从 Nacos 定义的 proto 文件来看,采用的应该是 Bidirectional Streaming RPC(双向流式 RPC)。

service BiRequestStream {
  // Sends a biStreamRequest
  rpc requestBiStream (stream Payload) returns (stream Payload) {
  }
}

GrpcClient中。我们找到onNext()方法的定义。Payload是服务端返回的原始信息,先将其转为Request,然后执行handServerRequest()方法。

handleServerRequest()方法会遍历所有的ServerRequestHandler依次处理服务请求。

    protected Response handleServerRequest(final Request request) {
        // ...
        for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
            try {
                Response response = serverRequestHandler.requestReply(request);
                // ...
            } catch (Exception e) {
                 // ...
            }
        }
        return null;
    }

我们要关注的就是这个NamingPushRequestHandler。进入其requestReply()方法,该方法会先判断这是不是一个NotifySubscriberRequest请求,如果是的话就从请求中得到服务信息,并调用serviceInfoHolderprocessServiceInfo方法,从而实现事件的发布。

    @Override
    public Response requestReply(Request request) {
        if (request instanceof NotifySubscriberRequest) {
            NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
            serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
            return new NotifySubscriberResponse();
        }
        return null;
    }

那么GrpcClientNamingPushRequestHandler有是什么时候创建的呢。我们知道NamingClientProxyDelegate实际上是使用了NamingGrpcClientProxy,这也是一个代理类。我们看一下它的构造方法。

    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
            NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        // ...
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels, RpcClientTlsConfig.properties(properties.asProperties()));
        // ...
        start(serverListFactory, serviceInfoHolder);
    }

其在构造方法中创建了一个RpcClient,并且需要传入一个ServiceInfoHolder用于start()方法,这个serviceInfoHolder正是在NacosNamingServic中创建的。
start()方法中,会为rpcClient添加一个NamingPushRequestHandler,serviceInfoHolder作为构造参数被传入。

    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        rpcClient.start();
        NotifyCenter.registerSubscriber(this);
    }

4. 事件作用范围

NotifyCenter中,是一种事件类型对应一个发布者,所有的事件都会经过NotifyCenter来发布。但如果我有两个订阅者订阅都关注了InstancesChangeEvent事件(比如创建两个不同的NacosNamingService),如何确定InstancesChangeEvent事件面向那个订阅者呢。实际上,Nacos Client 使用了 Event Scope,即事件作用范围来标识事件所属的订阅者。
DefaultPublisherreceiveEvent可以看出,通知订阅者前要先进行事件范围的匹配,只有匹配成功了才会继续执行。

  for (Subscriber subscriber : subscribers) {
          if (!subscriber.scopeMatches(event)) {
              continue;
          }
      // ...
  }

下面是InstancesChangeNotifierscopeMatches()方法,可以看出订阅者和事件都有一个事件作用范围。

   @Override
    public boolean scopeMatches(InstancesChangeEvent event) {
        return this.eventScope.equals(event.scope());
    }

前面提到了,NacosNamingServiceinit方法生成一个随机的唯一标识,这就是该客户端的事件范围。首先notifierEventScope会用于构建该客户端的InstancesChangeNotifier订阅者,客户端的ServiceInfoHolder也会使用该标识构建。当客户端收到服务端发送的消息时,使用ServiceInfoHolder处理服务信息,创建的事件都会带上notifierEventScope。这样该客户端的订阅者和其关注的事件就能匹配成功了。

    this.notifierEventScope = UUID.randomUUID().toString();
    this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
    NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), 
                               serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));

标签:订阅,serviceInfoHolder,服务,String,serviceInfo,Nacos,Client,源码,客户端
From: https://www.cnblogs.com/dalelee/p/17395688.html

相关文章

  • WEB—源码拓展
    前言:WEB源码在安全测试中是非常重要的信息来源,可以用来代码审计漏洞也可用来做信息突破口,其中WEB源码有很多技术需要简明分析。比如:获取某ASP源码后可以采用默认数据库下载为突破,获取某其他脚本源,码漏洞可以进行代码审计挖掘或分析其业务逻辑等,总之源码的获取将为后期的安全测......
  • 放弃 okhttp、httpClient,选择了这个牛逼的神仙工具
    https://mp.weixin.qq.com/s?__biz=MzAxNjk4ODE4OQ%3D%3D&chksm=9beee439ac996d2f3163a57232cafd44e9142959b5452cc3428123017fc4254a7318e0c3b245&idx=1&mid=2247501131&scene=21&sn=bdafb376cfa57cf3109b099f3feed457#wechat_redirect 在SpringBoot项目直......
  • HashMap中put源码分析
    1.对HashMap插入的元素先对key进行hash计算,然后根据得到的hash值和数组长度-1做&运算得到数组下标。2.如果下标位置没有元素,就直接插入,插入结束。3.如果下标位置有元素,就执行equals值是否相同,不同就和数组元素组成链表,插入到链表末尾,相同就会直接替换。插入结束。4.如果链表的......
  • 怎样开发直播软件?直播源码礼物功能篇
     在这个时代,直播不仅仅是一个人火爆的行业,也是每一个直播行业者们养家糊口的重要工作,而在直播赚钱众多方式中,观众送礼物是重要方式之一,礼物是观众充值购买的,如果观众对一个主播表示赞赏和支持,观众就会送出礼物,主播和平台就可以从中获取利益。当然,这也成为直播源码开发直播平台的......
  • Halcon联合C#开发实用版框架,在2.0版本基础上做了修改的,实际项目应用过的版本,源码,修改
    Halcon联合C#开发实用版框架,在2.0版本基础上做了修改的,实际项目应用过的版本,源码,修改了很多Bug,自带有项目运行流程与图片,很适合学习使用,可修改参考用于项目。注:软件能够正常编译运行,使用中遇到Bug自行摸索解决,主要是源码学习参考为主。ID:2619656657567557......
  • nginx源码安装步骤
    1、安装依赖包yuminstall-ygccyuminstall-ypcrepcre-develyuminstall-yzlibzlib-develyuminstall-yopensslopenssl-devel2、下载nginx源码包并解压 3、进入解压后的包nginx-1.16.1中4、执行命令 ./configure【参数可选:--prefix=/path/可指定ng......
  • 视频直播网站源码,uni-app左右平分九宫格样式
    视频直播网站源码,uni-app左右平分九宫格样式1.template:布局 <template>  <viewclass="content">    <viewclass="cp-xiangmu"v-for="iteminimgs">      <image:src="item.imgurl"class='cp_tupian�......
  • Aptana3 SVN Client安装
    Aptana3SVNClient安装安装SVN Client1.开启Aptana,打开help菜单,点击“InstallNewSoftware”2.点击Add按钮,在AddSite窗口,name输入:SVN ,location输入:http://subclipse.tigris.org/update_1.6.x,点击ok按钮3.全选除了“SubclipseIntegrationforMylyn3.x(Optional)3.0.0“ ......
  • 变频器 源码 MD500程序 svc3,低速转矩大,高速速度波动小 新
    变频器源码MD500程序svc3,低速转矩大,高速速度波动小新的转子电阻、漏感ID:691600669406727509......
  • FPGA IP 源码解密 Vivado加密的IP文件解密复原为Verilog或者VHDL源码 Mo
    FPGAIP源码解密Vivado加密的IP文件解密复原为Verilog或者VHDL源码Modelsim可以编译仿真的vp加密文件均可以解密复原为Verilog或者VHDL源码符合P1735格式保护的代码基本都可以解密还原源代码ID:39188688193060201......