首页 > 其他分享 >Rpc-实现Client对ZooKeeper的服务监听

Rpc-实现Client对ZooKeeper的服务监听

时间:2023-02-20 14:00:19浏览次数:65  
标签:缓存 addressList String ZooKeeper Rpc serviceName static path Client

1、前言

在上一篇文章中,完成了ZooKeeper注册中心,添加了一个简单的本地缓存

但是,存在一些问题:

  1. 当本地缓存OK,ZooKeeper对应服务有新的实例时,本地缓存不会自动更新
  2. 当ZooKeeper对应服务实例关闭,本地缓存不会监控到实例消失

2、编写

之前我们是将缓存直接放在ZooKeeperClientUtils中的,维护一个Map集合。我们将缓存部分移动到ZooKeeperClientCache中,缓存数据从这里获取:

我们监听树上所有节点的变化情况,对于包含实例的变化,每次获取对应的服务信息,然后通过Clinet查询现存的对应服务的实例,进行更新。

watchPathSet维护了Client调用过的服务集合,对于调用过的服务才开启本地的缓存,并且进行更新。

instances即为本地缓存集合

@Slf4j
public class ZookeeperClientCache {

    private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>();

    private static final Set<String> watchPathSet=new ConcurrentHashSet<>();

    private static CuratorFramework zookeeperClient;

    private static boolean isListening=false;


    //将服务加入监听set中
    public static void addListenService(String service){
        //开启服务监听
        openListen();
        //path路径放入
        watchPathSet.add(ZookeeperUtil.serviceName2Path(service));
    }

    //添加本地缓存,同时开启监听服务
    public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
        //直接替换原本的缓存
        instances.put(serviceName,addressList);
        //将服务加入监听set
        addListenService(serviceName);
    }

    public static void cleanLocalCache(String serviceName){
        log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
        instances.remove(serviceName);
    }


    public static boolean containsKey(String serviceName){
        return instances.containsKey(serviceName);
    }

    public static List<InetSocketAddress> getOrDefault(String serviceName){
        return instances.getOrDefault(serviceName,null);
    }

    public static List<InetSocketAddress> getInstances(String serviceName){
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //获取路径下所有的实现
            List<String> instancePaths = zookeeperClient.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = zookeeperClient.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            return addressList;
        } catch (Exception e) {
            log.error("服务获取失败====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }

    private static synchronized void openListen(){
        //已初始化过
        if (isListening){
            return;
        }
        //注入client
        if (zookeeperClient==null) {
            zookeeperClient=ZookeeperUtil.getZookeeperClient();
        }
        TreeCache cache = TreeCache.newBuilder(zookeeperClient, "/cn/zko0/myRpc/api").setCacheData(true).build();
        cache.getListenable().addListener((c, event) -> {
            if ( event.getData() != null )
            {
                System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
                //可以通过event.type来进行节点的处理,我这里直接多节点每次行为做reload
                if (event.getData().getPath().contains("Service/")){
                    //是服务节点,做更新
                    String path = event.getData().getPath();
                    //去除尾部实例段
                    path=path.substring(0,path.lastIndexOf("/"));
                    String serviceName = ZookeeperUtil.path2ServiceName(path);
                    if (watchPathSet.contains(path)) {
                        log.info("更新本地缓存");
                        List<InetSocketAddress> addressList = getInstances(serviceName);
                        addLocalCache(serviceName,addressList);
                    }
                }
            }
            else
            {
                System.out.println("type=" + event.getType());
            }
        });
        try {
            cache.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        isListening=true;
    }
}

创建完Cache类,只需要修改之前ZooKeeperClientUtils中,从当前类改为Cache类获取即可:

image-20230220133343196

完整代码:

@Slf4j
public class ZookeeperClientUtils {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
        InetSocketAddress address;
        //本地缓存查询
        if (ZookeeperClientCache.containsKey(serviceName)){
            List<InetSocketAddress> addressList = ZookeeperClientCache.getOrDefault(serviceName);
            if (!addressList.isEmpty()){
                //使用lb进行负载均衡
                return loadBalancer.select(addressList);
            }
        }
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //获取路径下所有的实现
            List<String> instancePaths = client.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = client.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            ZookeeperClientCache.addLocalCache(serviceName,addressList);
            return loadBalancer.select(addressList);
        } catch (Exception e) {
            log.error("服务获取失败====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }
}

3、测试

实现上述代码,下面是服务监听的简单测试

开启Server,Client:

image-20230220134850987

关闭Server,Server自动进行服务的注销:

image-20230220135154126

Client服务监控:

image-20230220135320003

标签:缓存,addressList,String,ZooKeeper,Rpc,serviceName,static,path,Client
From: https://www.cnblogs.com/zko0/p/17137124.html

相关文章

  • 解决Mac下pip install mysqlclient 时的报错
    Django使用Mysql需要安装mysqlclient,在Mac下pipinstallmysqlclient时部分报错如下:Completeoutput(15lines):/bin/sh:mysql_config:commandnotfound/......
  • client启动过程源码剖析
    1实例化EurekaInstanceConfig对象instanceConfig2先初始化InstanceInfo对象,再基于构建好的instanceConfig和instanceInfo构建applicationInfoManager对象3......
  • Zookeeper特性与节点数据
    CAP理论在一个分布式计算系统中,不可能同时满足以下三点:C一致性:每个节点读写数据时,保证各个节点上的数据是一致的。P分区容错性:当系统中的节点故障时,系统就不再联通,系统......
  • 4次迭代,让我的 Client 优化 100倍!泄漏一个 人人可用的极品方案!
    4次迭代,让我的HttpClient提速100倍在大家的生产项目中,经常需要通过Client组件(HttpClient/OkHttp/JDKConnection)调用第三方接口。尼恩的一个生产项目也不例外。在一个......
  • 深度剖析 ZooKeeper 核心原理 学习笔记
    什么是ZooKeeper假设对ZooKeeper中的数据做了变更(比如新增了一台Kafka或者挂掉了一个Kafka节点),这时候ZooKeeper会主动通知其他监听这个数据的客户端,立即告诉其他......
  • ZooKeeper面试题
    ZooKeeper面试题1、什么是Zookeeper?ZooKeeper是一个开放源码的分布式协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。......
  • centos7安装mysqlclient报错
    错误如图  解决办法#先安装mysql-devel,后安装mysqlclientyuminstallmysql-develpip3installmysqlclient ......
  • docker push 到私有仓库提示(Client.Timeout exceeded while awaiting headers)
    如果docker在上传镜像的时候出现该问题,那么大概率是私有仓库的docker不通[root@localhostdocker]#dockerpush192.168.223.136:5000/xiaoniao:v1Thepushrefersto......
  • reactor rabbitmq 实现RPC远程调用
    照着官方文档上写,最后发现在消费端怎么也返回不了数据。在文档中也找不到怎么返回数据,查看官方demo也没有案例,各种搜索都找不到。最后在源码中发现有一个RpcServer类,经过......
  • http client
    HttpClientHttpClient是idea的插件简单案例:@PostMapping("/greeting")publicStringpostGreeting(@RequestBodyUseruser){return"HelloWorl......