一、Curator 简介
Apache Curator 是一个比较完善的 ZooKeeper 客户端框架,通过封装的一套高级 API 简化了 ZooKeeper 的操作。通过查看官方文档,可以发现 Curator 主要解决了三类问题:
- 封装 ZooKeeper client 与 ZooKeeper server 之间的连接处理
- 提供了一套 Fluent 风格的操作 API
- 提供 ZooKeeper 各种应用场景 recipes, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
Curator 主要从以下几个方面降低了 zk 使用的复杂性:
- 重试机制: 提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
- 连接状态监控: Curator 初始化之后会一直对 zk 连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
- zk客户端实例管理: Curator 会对 zk 客户端到 server 集群的连接进行管理,并在需要的时候重建 zk 实例,保证与 zk 集群连接的可靠性
- 各种使用场景支持: Curator 实现了 zk 支持的大部分使用场景(甚至包括 zk 自身不支持的场景),这些实现都遵循了 zk 的最佳实践,并考虑了各种极端情况
二、工程中引入 curator 依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
三、简单示例
package com.topsail.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class Demo1 {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("192.168.181.128:2181,192.168.181.128:2182,192.168.181.128:2183")
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("namespace-demo1")
.build();
curatorFramework.start();
System.out.println(ZooKeeper.States.CONNECTED);
System.out.println(curatorFramework.getState());
createPersistent(curatorFramework, "/demo1/persistent");
createPersistentSeq(curatorFramework, "/demo1/persistent_seq");
createEphemeral(curatorFramework, "/demo1/ephemeral");
createEphemeralSeq(curatorFramework, "/demo1/ephemeral_seq");
dataOps(curatorFramework, "/demo1/persistent");
createPersistent(curatorFramework, "/demo1/a/b/c");
deleteCascade(curatorFramework, "/demo1/a");
}
/**
* 创建持久化节点
*
* @param curatorFramework
* @throws Exception
*/
private static void createPersistent(CuratorFramework curatorFramework, String path) throws Exception {
Stat stat = curatorFramework.checkExists().forPath(path);
if (Objects.isNull(stat)) {
curatorFramework.create()
.creatingParentContainersIfNeeded() // 如果指定节点的父节点不存在,则会自动级联创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath(path, "=== CONTENT ===".getBytes());
System.out.println(path + " create success!");
} else {
System.out.println(path + " is exists!");
}
}
/**
* 获取某个节点数据,设置某个节点数据
*
* @param curatorFramework
* @param path
* @throws Exception
*/
private static void dataOps(CuratorFramework curatorFramework, String path) throws Exception {
curatorFramework.setData().forPath(path, "=== CONTENT ===".getBytes());
byte[] bytes = curatorFramework.getData().forPath(path);
System.out.println("->->-> " + new String(bytes));
}
/**
* 创建持久化顺序节点
*
* @param curatorFramework
* @throws Exception
*/
private static void createPersistentSeq(CuratorFramework curatorFramework, String path) throws Exception {
for (int i = 0; i < 5; i++) {
curatorFramework.create()
.creatingParentContainersIfNeeded() // 如果指定节点的父节点不存在,则会自动级联创建父节点
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(path + "/i-", "persistent seq data".getBytes());
}
List<String> children = curatorFramework.getChildren().forPath(path);
Collections.sort(children);
for (String child : children) {
System.out.println("-> " + child);
}
}
/**
* 创建临时节点
*
* @param curatorFramework
* @param path
* @throws Exception
*/
private static void createEphemeral(CuratorFramework curatorFramework, String path) throws Exception {
curatorFramework.create()
.creatingParentContainersIfNeeded() // 如果指定节点的父节点不存在,则会自动级联创建父节点
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "ephemeral data".getBytes());
}
/**
* 创建临时顺序节点
*
* @param curatorFramework
* @param path
* @throws Exception
*/
private static void createEphemeralSeq(CuratorFramework curatorFramework, String path) throws Exception {
for (int i = 0; i < 5; i++) {
curatorFramework.create()
.creatingParentContainersIfNeeded() // 如果指定节点的父节点不存在,则会自动级联创建父节点
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path + "/i-", "ephemeral seq data".getBytes());
}
List<String> children = curatorFramework.getChildren().forPath(path);
Collections.sort(children);
for (String child : children) {
System.out.println("EphemeralSeq -> " + child);
}
}
/**
* 删除节点
*
* @param curatorFramework
* @param path
* @throws Exception
*/
private static void delete(CuratorFramework curatorFramework, String path) throws Exception {
curatorFramework.delete().forPath(path);
}
/**
* 级联删除子节点
*
* @param curatorFramework
* @param path
* @throws Exception
*/
private static void deleteCascade(CuratorFramework curatorFramework, String path) throws Exception {
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
}
}
标签:Exception,示例,Curator,curatorFramework,throws,param,使用,path,节点 From: https://www.cnblogs.com/steven-note/p/17022680.html