首页 > 其他分享 >zk系列二:zookeeper实战之分布式统一配置获取

zk系列二:zookeeper实战之分布式统一配置获取

时间:2022-11-03 13:11:05浏览次数:57  
标签:zk zookeeper param public path import 节点 分布式

前面介绍了zk的一些基础知识,这篇文章主要介绍下如何在java环境下获取zk的配置信息;主要基于zk的监听器以及回调函数通过响应式编程的思想将核心代码糅合成一个工具类,几乎做到了拿来即用;
在分布式集群中,配置信息一般都会拎出来单独配置,这样避免了修改配置信息时候的复杂度,我们期望当有配置变更时集群中的节点能及时得到通知并作出响应;我们可以在每个节点中开启任务定时来zk获取信息,然后根据获取信息的前后是否有差异来判断信息是否变更,但是这种方法明显不及时,且每次轮询zk都会增加不必要的网络开销,特别是节点众多时给ZK增加了不必要的压力;所以我们更倾向于利用ZK的监听和回调机制来实现类似的统一消息配置,客户端只要注册一个监听器当节点信息发生变更时zk会主动触发对应事件,客户端监听对应事件作出响应即可;下面先贴出代码流程图,然后再贴出关键代码
代码执行流程:

关键代码如下:

1、测试类
package com.darling.service.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @description: 使用响应式编程思想实现基于ZK的分布式统一配置功能
 * @author: dll
 * @date: Created in 2022/11/1 12:21
 * @version:
 * @modified By:
 */
@Slf4j
public class ZkConfigTest {

    ZooKeeper zkClient;

    @Before
    public void conn (){
        zkClient  = ZkUtil.getZkClient();
    }

    @After
    public void close (){
        try {
            zkClient.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test() throws InterruptedException {
        ConfigData configData = new ConfigData();
        ZkConfigUtil watchAndCallBack = new ZkConfigUtil();
        watchAndCallBack.setZkClient(zkClient);
        watchAndCallBack.setConfigData(configData);

        watchAndCallBack.await();
        String config = configData.getConfig();

        while(true){

            if(configData.getConfig().equals("")){
                System.out.println("conf diu le ......");
                watchAndCallBack.await();
            }else{
                System.out.println(configData.getConfig());

            }
            // 此处睡眠的原因是为了便于日志打印
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

2、关键工具类
package com.darling.service.zookeeper;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/**
 * @description:
 * @author: dll
 * @date: Created in 2022/11/1 12:40
 * @version:
 * @modified By:
 */
@Data
@Slf4j
public class ZkConfigUtil implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {

    ZooKeeper zkClient;

    ConfigData configData;

    CountDownLatch cc = new CountDownLatch(1);
    
    private final String nodePath = "/config";

    /**
     * 异步获取数据getData方法的回调
     * @param rc        状态码
     * @param path      路径
     * @param ctx       上线文
     * @param data      节点的数据
     * @param stat      元数据信息
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        if (data != null ){
            configData.setConfig(new String(data));
            cc.countDown();
        }
    }


    /**
     * exists方法的回调,当数据存在时会调回调函数
     * @param rc        状态码
     * @param path      路径
     * @param ctx       上下文
     * @param stat      元数据信息,可通过其是否为空判断是否有数据
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        log.info("进入exists方法的回调,stat:{}");
        if (stat != null) {
            log.info("进入exists方法的回调,stat 不为空");
            // 表示path节点有数据了,可以通过getData获取
            zkClient.getData(nodePath, this, this,"123");
        }
    }


    /**
     * 节点变更的监听器
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                log.info("节点被创建,path:{}",event.getPath());
                // 节点创建后获取一遍数据还有个额外的不可忽略的作用:即重新注册了watch,否则根据zk的watch只生效一次的规则,不会再次出发watch
                zkClient.getData(nodePath, this, this,"123");
                break;
            case NodeDeleted:
                log.info("节点被删除,path:{}",event.getPath());
                configData.setConfig("");
                cc = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                log.info("节点被修改,path:{}",event.getPath());
                // 节点修改获取数据的作用同节点创建
                zkClient.getData(nodePath, this, this,"123");
                break;
            case NodeChildrenChanged:
                break;
        }
    }

    public void await() {
        /**
         * 直接获取配置前先判断配置存不存在
         * 第一个参数path:配置存放的节点路径
         * 第二个参数watch:path的监听,当path有变动时回调通知
         * 第三个参数StatCallback:如果path下有数据会回调此方法,可以通过Stat是否为空判断path是否有数据
         */
        zkClient.exists(nodePath, this,this ,"PPP");
        try {
            cc.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

标签:zk,zookeeper,param,public,path,import,节点,分布式
From: https://www.cnblogs.com/darling2047/p/16854117.html

相关文章

  • Redisson 分布式锁实现之前置篇 → Redis 的发布/订阅 与 Lua
    开心一刻我找了个女朋友,挺丑的那一种,她也知道自己丑,平常都不好意思和我一块出门昨晚,我带她逛超市,听到有两个人在我们背后小声嘀咕:“看咱前面,想不到这么丑都有人要。......
  • IM消息ID技术专题(七):网易严选分布式ID的技术选型、优化、落地实践
    1、引言在《IM消息ID技术专题》系列文章的前几篇中,我们已经深切体会到消息ID在分布式IM聊天系统中的重要性以及技术实现难度,各种消息ID生成算法及实现虽然各有优势,但受制......
  • net core实现分布式session,单点登录
    原文网址:http://m.tnblog.net/aojiancc2/article/details/2436要想在netcore中实现分布式session,实现单点登录,我们可以把session保存到redis中,这样就可以多个项目共享s......
  • raft算法(分布式一致性算法)
    一、概要Raft算法属于Multi-Paxos算法,它是在Multi-Paxos思想的基础上,做了一些简化和限制,比如增加了日志必须是连续的,只支持领导者、跟随者和候选人三种状态,在理解和算法实......
  • 万字总结:分布式系统的38个知识点
    大家好我是咸鱼了大半年的一灰灰,终于放暑假了,把小孩送回老家,作为咸鱼的我也可以翻翻身了,接下来将趁着暑假的这段时间,将准备一个全新的分布式专栏,为了给大家提供更好的阅读体......
  • Hadoop安装-分布式-Fully
    Hadoop安装-分布式-Fully〇、所需资料一、配置1、基础配置(1)系统安装(2)hostname主机名配置(3)ip地址、dns、hosts映射文件配置(4)关闭防火墙与selinux(5)开启ntpd时钟同步......
  • 巨杉数据库入围 IDC Innovator榜单,获评分布式数据库创新者
    近日,巨杉数据库凭借「湖仓一体」分布式数据库在金融领域的创新应用获得 IDC Innovator中国分布式数据库创新者殊荣。值得一提的是,这也是IDC在数字化转型盛典会议中首次......
  • springboot使用minio分布式文件上传图片或视频
    Minio搭建先看下前端上传效果日期工具包使用的是hutool的importcn.hutool.core.date.DateUtil;接口可以上传视频和图片暂无做限制pom.xml<!--分布式存储-->......
  • 分布式事务框架 Seata 入门案例
    1. SeataServer部署Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成。首先,下载最新的安装包也可以下载源码,然后本地编译。最新......
  • 腾讯云(阿里云) Jmeter 5.2.1 版本 JDK1.8.053 分布式部署详细步骤
    1,安装JDK使用yum命令工具安装JDK选择适合自己得JDK或者JRE版本进行安装。带“-devel” 标志为JDK安装包1yum-ylistjava*23minstall-yjava-1.8.0-op......