1.简介
1)简介
Zookeeper,为分布式框架提供协调服务,基于观察者模式。
负责存储管理大家关心的数据,接受观察者的注册,当数据状态发生变化,Zookeeper负责同志在Zookeeper上注册的观察者。
Zookeeper=文件系统+监听机制
2)特点
- 一个Leader,多个Follower组成为集群
- 集群存活节点超过1/2,就能正常服务(适合安装奇数台服务器)
- 全局数据一致
- 同一个Client更新请求按顺序依次执行
- 数据更新原子性
- 实时性,Client能读到最新的数据
3)数据结构
Zookeeper数据模型类似Unix的文件系统,每个节点叫做ZNode。
每个ZNode能存储1MB数据(存储数据量小),能通过路径唯一标识
4)应用场景
-
统一命名服务
分布式环境,需要对服务进行统一的命名,便于识别。
例:通过域名,分发到不同IP的服务器上(这点类似于Nginx),当一台挂了,其他的还能工作
-
统一配置管理
- 对所有节点的配置信息,做统一的配置
- 配置修改后,快速同步各节点
实现方式:
- 配置信息写入到一个Znode上
- 各个客户端监听这个Znode
- 当Znode被修改了,Zookeeper通知各客户端
-
统一集群管理
- Client把节点信息写入Znode
- 监听这个Znode获取实时状态变化
-
服务器动态上下线
-
软负载均衡
-
分布式锁
2.Zookeeper安装
略
3.Zookeeper命令操作
端口
1.服务端命令
- 启动 ./zkServer.sh start
- 查看状态 ./zkServer.sh status
- 停止 ./zkServer.sh stop
- 重启 ./zkServer.sh restart
2.客户端命令
-
连接 ./zkCli.sh (-server ip:port)
-
查看节点(类似于unix的ls命令)
-
查看根节点
ls / #查看目录下节点
-
查看节点及节点信息
ls -s / #老版本为ls2 / #后面会有很多节点信息
-
-
创建节点(类似于mkdir,但可以在创建时写入数据)(-e -s可以组合)
-
持久化节点
create /test fuck #在根节点下创建test,并写入数据为fuck
-
临时节点(ephemeral)
create /test2 -e #创建test2节点,当Client断开后,节点消失
-
顺序节点(Serialize)
create /test2 -s #创建test2节点,会自动对节点加上编号 #比如此节点可能创建为test20000000013,为创建的第13个节点,所有节点共用同一个编号
-
-
获取数据
create /test fuck #获取fuck节点的数据
-
写入数据
set /test fuckTheWorld #将test节点数据设置为fuckTheWorld
-
删除节点
delete /test #删除test节点(当子节点不为空,无法删除) deleteAll /test #删除test及其所有的子节点
-
退出 连接后quit
4.JavaApi
API库:
- 原生API
- ZkClient
- Curator(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)
在这里使用Curator进行学习
1)连接
package com.zko0;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
/**
* 创建client连接
*/
public class ConnectTest {
@Test
public void method1(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
//客户端连接建立
CuratorFramework client = CuratorFrameworkFactory.newClient("101.43.244.40", retryPolic);
//开启连接
client.start();
}
@Test
public void method2(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
}
2)创建
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* 创建节点的测试类
*/
@Slf4j
public class CreateNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
//1.基本创建
//如果没有指定数据,那么会将客户端的ip作为存储的数据
public void basicCreate() throws Exception {
String path = client.create().forPath("/test1");
log.info(path);
}
@Test
//2.创建,同时添加数据
public void createWithMessage() throws Exception {
String path = client.create().forPath("/test2","fuck".getBytes());
log.info(path);
}
@Test
//3.创建临时节点
//Client会话结束节点会消息,所以创建完就消失了
public void createEphemeral() throws Exception {
client.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/test3");
}
@Test
//3.创建多级节点
//creatingParentsIfNeeded如果需要创建多级节点
public void create2() throws Exception {
client.create()
.creatingParentsIfNeeded()
.forPath("/directory/test5");
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
3)检查
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
@Slf4j
public class CheckNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
//get方法
public void get() throws Exception {
byte[] bytes = client.getData().forPath("/test1");
log.info(new String(bytes));
}
@Test
//ls方法
public void getChildren() throws Exception {
List<String> list = client.getChildren().forPath("/directory");
log.info(list.toString());
}
@Test
//查询节点信息
//把状态信息保存在Stat对象中 storingStatIn(xx)
public void getAbout() throws Exception {
Stat stat = new Stat();
client.getData()
.storingStatIn(stat)
.forPath("/test1");
log.info(stat.toString());
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
4)修改
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@Slf4j
public class EditNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
//修改节点数据内容
public void edit() throws Exception {
client.setData()
.forPath("/test1","fuckU".getBytes());
}
@Test
//乐观锁,如果edit的时候version有修改了不匹配,那么setData会失败
public void editWithVersion() throws Exception {
Stat stat = new Stat();
client.getData()
.storingStatIn(stat)
.forPath("/test1");
client.setData()
.withVersion(stat.getVersion())
.forPath("/test1","fuckMe".getBytes());
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
5)删除
package com.zko0;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@Slf4j
public class DeleteNodeTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
//删除单个节点
public void deleteOne() throws Exception {
client.delete()
.forPath("/test1");
}
@Test
//删除带有子节点的节点
public void deleteOnehasChildren() throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.forPath("/directory");
}
//必须成功的删除,如果失败会反复重试
@Test
public void deleteMustSucc() throws Exception {
client.delete()
.guaranteed()//必须的
.forPath("/test2");
}
@Test
//删除回调
public void callBack() throws Exception {
client.delete()
.guaranteed()//一般都会加上
.inBackground((client,event)->{
//client和evnet都可以进行操作
log.info("删除Ok");
})
.forPath("/test1");
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
6)Watch事件监听
zookeeper提供三种不同的Watcher:
- NodeCache:只监听某一个节点
- PathChildrenCache:监听一个Znode的子节点
- TreeCache:监听树上的所有节点
1.NodeCache
@Slf4j
public class NodeCacheTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
/**
* 写法1.通过addListener将new CuratorCacheListener() 加入,对于create,update,delete等事件
* 都能进行监听
*/
public void testNodeCache() throws Exception {
CuratorCache curatorCache = CuratorCache.build(client, "/test1");
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData beforeData, ChildData afterData) {
// 第一个参数:事件类型(枚举)
// 第二个参数:节点更新前的状态、数据
// 第三个参数:节点更新后的状态、数据
// 创建节点时:节点刚被创建,不存在 更新前节点 ,所以第二个参数为 null
// 删除节点时:节点被删除,不存在 更新后节点 ,所以第三个参数为 null
// 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据
switch (type.name()) {
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
if (afterData != null) {
System.out.println("创建了节点: " + afterData.getPath());
}
break;
case "NODE_CHANGED": // 节点更新
if (beforeData.getData() != null) {
System.out.println("修改前的数据: " + new String(beforeData.getData()));
} else {
System.out.println("节点第一次赋值!");
}
System.out.println("修改后的数据: " + new String(afterData.getData()));
break;
case "NODE_DELETED": // 节点删除
System.out.println(beforeData.getPath() + " 节点已删除");
break;
default:
break;
}
}
});
// 开启监听
curatorCache.start();
// 线程阻塞防止停止
while (true){}
}
@Test
/**
* 方法2,对单个的事件进行监听,来源与curator官网example写法
*/
public void test2(){
CuratorCache cache = CuratorCache.build(client,"/test1");
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> System.out.println(String.format("Node created: [%s]", node)))
.forChanges((oldNode, node) -> System.out.println(String.format("Node changed. Old: [%s] New: [%s]", oldNode, node)))
.forDeletes(oldNode -> System.out.println(String.format("Node deleted. Old value: [%s]", oldNode)))
.forInitialized(() -> System.out.println("Cache initialized"))
.build();
// register the listener
cache.listenable().addListener(listener);
// the cache must be started
cache.start();
while (true){}
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
2.PathChildrenCache
在5.1之后,new PathChildrenCache显示为废弃方法
如果有更好的方法麻烦教授
package com.zko0.watcher;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@Slf4j
public class PathChildrenCacheTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
/**
* PathChildrenCache已经被标记为废弃方法
* 使用curator5.5官网example推荐写法
*/
public void method1() throws Exception {
//true表示是否缓存data信息
PathChildrenCache cache = new PathChildrenCache(client, "/test2", true);
PathChildrenCacheListener listener = new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
switch ( event.getType() )
{
case CHILD_ADDED:
{
System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_UPDATED:
{
System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
case CHILD_REMOVED:
{
System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
}
}
};
cache.getListenable().addListener(listener);
cache.start();
while (true){}
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
3.TreeCache
监听树上所有节点的变化情况
package com.zko0.watcher;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TreeCacheTest {
private CuratorFramework client;
@Before
//在create前执行
public void connect(){
//重试机制
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
}
@Test
public void test() throws Exception {
TreeCache cache = TreeCache.newBuilder(client, "/test3").setCacheData(true).build();
cache.getListenable().addListener((c, event) -> {
if ( event.getData() != null )
{
System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
}
else
{
System.out.println("type=" + event.getType());
}
});
cache.start();
while (true){}
}
@After
//在create执行后执行
public void close(){
if (client!=null){
client.close();
}
}
}
5.Zookeeper分布式锁
1)zookeeper分布式锁原理:
思想:获取锁时,创建节点,使用完,删除节点
流程:
- 在lock节点下创建临时且顺序节点(类似Redis,防止宕机时其他client无法获取)
- 然后获取lock下所有的子节点,获取到后,如果发现自己的子节点最小,认为客户端获取到了锁。使用完后,删除该节点
- 如果发现自己创建的节点并非lock中所有子节点最小的,说明没有获取到锁,客户端需要找到比自己小(前一个)的那个节点,注册监听删除事件
- 如果发现比自己小的那个节点被删了,客户端Watcher收到通知,再判断自己是不是最小的(避免宕机使中间节点删除)。如果是,获取锁,否则继续监听前一个小的节点。
2)Curator分布式锁Api
在Curator中有五种锁:
- InterProcessSemaphoreMutex:分布式排他锁
- InterProcessMutes:分布式可重入锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMulitiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
可重入锁演示InterProcessMutes:
package com.zko0.lock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class LockTest implements Runnable{
public LockTest() {
//创建client
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("101.43.244.40")
.retryPolicy(retryPolic)
// zookeeper根目录为/zko0,不为/
.namespace("zko0")
.build();
client.start();
this.lock=new InterProcessMutex(client,"/lock");
}
public static void main(String[] args) {
LockTest lockTest = new LockTest();
Thread t1=new Thread(lockTest,"test1");
Thread t2=new Thread(lockTest,"test2");
t1.start();
t2.start();
}
private Integer num=10;//对该变量做锁
private InterProcessLock lock;
@Override
public void run() {
while (true){
//acquire为等待时间
try {
lock.acquire(3, TimeUnit.SECONDS);
if (num>0){
System.out.println(Thread.currentThread()+""+num);
num--;
}
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
6.集群
Zookeeper客户端通信端口:默认2181
Zookeeper服务器通信端口:默认2881
Zookeeper投票端口:默认3881
Leader选举策略:
- Serverid:服务器ID 比如有三台服务器,编号分别是1,2,3。编号越大代表在选择算法中,它的权重就越大。
- Zxid:数据ID 服务器中存在的最大数据ID,值越大说明数据越新,在选举算法中数据越新,权重就越大。
- 在Leader选举的过程中,如果某台Zookeeper获票超过半数,那么就可以成为Leader;
搭建
安装
-
下载zookeeper3.6.3版本
-
解压到目录下
/zoo/z1
/zoo/z2
/zoo/z3
-
修改配置文件
z1:
# example sakes. dataDir=/zoo/z1/data/ # the port at which the clients will connect clientPort=2181
z2:
# example sakes. dataDir=/zoo/z2/data/ # the port at which the clients will connect clientPort=2182
z3:
# example sakes. dataDir=/zoo/z3/data/ # the port at which the clients will connect clientPort=2183
配置集群
-
在每个zookeeper的data目录下创建myid文件,内容分别是1,2,3。这个文件记录每个服务的id。
echo 1 >/zoo/z1/data/myid echo 2 >/zoo/z2/data/myid echo 3 >/zoo/z3/data/myid
-
配置端口(三台的zoo.cfg)
此步骤不仅配置了,需要连接的其他节点的端口信息。而且还配置了自己的服务器通信端口和投票端口。
server.1=127.0.0.1:2881:3881 server.2=127.0.0.1:2882:3882 server.3=127.0.0.1:2883:3883
-
启动三台zookeeper
-
通过status查看各zookeeper的状态,搭建完成
故障模拟
-
从服务器宕机
3号挂了,1好follower正常,2号leader正常
1号也挂了,2号leader不运行,集群处于休眠状态
[root@VM-24-4-centos zoo]# z2/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /zoo/z2/bin/../conf/zoo.cfg Client port found: 2182. Client address: localhost. Client SSL: false. Error contacting service. It is probably not running.
-
重新启动1号
2号leader恢复正常,1号为follower
-
启动3号,停掉2号
[root@VM-24-4-centos zoo]# z3/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /zoo/z3/bin/../conf/zoo.cfg Client port found: 2183. Client address: localhost. Client SSL: false. Mode: leader
总结
Zookeeper集群三个角色:
- Leader
- 处理事务请求
- 内部各服务器调度者
- Follower
- 处理客户端非事务请求(查询操作),转发事务请求给Leader
- 参与Leader选举的投票
- Observer(来分担Follower压力)
- 处理非事务请求,转发事务请求给Leader
7.附:环境搭建
Zookeeper版本:3.6.3
Maven依赖
标签:Zookeeper,笔记,public,学习,client,org,apache,import,节点 From: https://www.cnblogs.com/zko0/p/16994688.html<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.28</version> </dependency> </dependencies>