首页 > 其他分享 >ZooKeeper : Curator框架之数据缓存与监听CuratorCache

ZooKeeper : Curator框架之数据缓存与监听CuratorCache

时间:2022-11-09 19:06:50浏览次数:41  
标签:CuratorCache ZooKeeper father Curator son grandson1 116 path data


CuratorCache

​CuratorCache​​​会试图将来自节点的数据保存在本地缓存中。 可以缓存指定的单个节点,也可以缓存以指定节点为根的整个子树(默认缓存方案)。可以给​​CuratorCache​​实例注册监听器,当相关节点发生更改时会接收到通知, 将响应更新、创建、删除等事件。

很多基于​​ZooKeeper​​​的框架,就是使用​​CuratorCache​​​来缓存相关节点的数据,比如​​ElasticJob​​,这样大部分节点的数据可以通过内存来获取,提高了性能,并且可以很方便地监听相关节点的事件。

​CuratorCache​​接口源码:

public interface CuratorCache extends Closeable, CuratorCacheAccessor
{
/**
* 缓存构建选项
*/
enum Options
{
/**
* 通常,以指定节点为根的整个子树都会被缓存(默认缓存方案)
* 这个选项只会缓存指定的节点(即单节点缓存)
*/
SINGLE_NODE_CACHE,

/**
* 通过org.apache.curator.framework.api.GetDataBuilder.decompressed()解压数据
*/
COMPRESSED_DATA,

/**
* 通常,当通过close()关闭缓存时,会通过CuratorCacheStorage.clear()清除storage
* 此选项可防止清除storage
*/
DO_NOT_CLEAR_ON_CLOSE
}

/**
* 使用标准standard实例返回具有给定选项的给定路径的CuratorCache实例
*/
static CuratorCache build(CuratorFramework client, String path, Options... options)
{
return builder(client, path).withOptions(options).build();
}

/**
* 启动CuratorCache构建器
*/
static CuratorCacheBuilder builder(CuratorFramework client, String path)
{
return new CuratorCacheBuilderImpl(client, path);
}

/**
* 启动CuratorCacheBridge构建器,CuratorCacheBridge是一个facade
* 如果持久Watcher可用,则使用CuratorCache,否则使用TreeCache(使用ZooKeeper3.5.x)
*/
static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path)
{
return new CuratorCacheBridgeBuilderImpl(client, path);
}

/**
* 启动缓存
* 这将导致从缓存的根节点开始进行完全刷新
* 并为找到的所有节点生成事件
*/
void start();

/**
* 关闭缓存
* 停止响应事件
*/
@Override
void close();

/**
* 返回监听器容器,以便可以注册监听器以收到缓存更改的通知
*/
Listenable<CuratorCacheListener> listenable();

/**
* 从storage中返回指定路径的节点数据
*/
@Override
Optional<ChildData> get(String path);

/**
* 返回当前storage中的条目数
*/
@Override
int size();

/**
* 返回storage中条目的流
*/
@Override
Stream<ChildData> stream();
}

​CuratorCache​​​接口通过​​build​​​静态方法返回的​​CuratorCache​​​实例是其实现类的实例(最终会调用​​CuratorCacheBuilderImpl​​​类的​​build​​方法):

@Override
public CuratorCache build()
{
return new CuratorCacheImpl(client, storage, path, options, exceptionHandler);
}

而只需要使用​​CuratorCache​​​接口即可(创建的实例就是​​CuratorCacheImpl​​​类的实例),而​​CuratorCacheImpl​​​类中可用的方法基本上是实现​​CuratorCache​​接口中的方法。

ZooKeeper : Curator框架之数据缓存与监听CuratorCache_apache

CuratorCacheListener

​CuratorCacheListener​​​是​​CuratorCache​​事件的监听器。

​CuratorCacheListener​​接口源码(函数式接口):

@FunctionalInterface
public interface CuratorCacheListener
{
/**
* 描述事件的枚举类型
*/
enum Type
{
/**
* 一个新节点被添加到缓存中
*/
NODE_CREATED,

/**
* 已在缓存中的节点被更改
*/
NODE_CHANGED,

/**
* 已在缓存中的节点被删除
*/
NODE_DELETED
}

/**
* 在创建、更改或删除节点时被调用
* 从而调用监听器
*/
void event(Type type, ChildData oldData, ChildData data);

/**
* 当缓存启动时,会跟踪初始节点,当它们完成加载到缓存中时,将调用此方法
*/
default void initialized()
{
// NOP
}

/**
* 返回允许特定类型和特殊用途监听器的构建器
*/
static CuratorCacheListenerBuilder builder()
{
return new CuratorCacheListenerBuilderImpl();
}
}

通过​​CuratorCacheListenerBuilderImpl​​​的实例可以给​​CuratorCache​​添加各种监听器。

ZooKeeper : Curator框架之数据缓存与监听CuratorCache_分布式_02

演示

SINGLE_NODE_CACHE(单节点)

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.util.Optional;

public class Application {

private static final String SERVER_PROXY = "192.168.31.175:9000";
private static final int CONNECTION_TIMEOUT_MS = 40000;
private static final int SESSION_TIMEOUT_MS = 10000;
private static final String NAMESPACE = "MyNamespace";

public static void main(String[] args) throws Exception {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(SERVER_PROXY)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.namespace(NAMESPACE)
.build();

curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
curator.blockUntilConnected();

if(curator.checkExists().forPath("/father") != null) {
curator.delete().deletingChildrenIfNeeded().forPath("/father");
}

// 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)
// 缓存构建选项是SINGLE_NODE_CACHE
CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
CuratorCache.Options.SINGLE_NODE_CACHE);

// 创建一系列CuratorCache监听器,都是通过lambda表达式指定
CuratorCacheListener listener = CuratorCacheListener.builder()
// 初始化完成时调用
.forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))
// 添加或更改缓存中的数据时调用
.forCreatesAndChanges(
(oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",
oldNode, node)
)
// 添加缓存中的数据时调用
.forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]\n", childData))
// 更改缓存中的数据时调用
.forChanges(
(oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]\n",
oldNode, node)
)
// 删除缓存中的数据时调用
.forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))
// 添加、更改或删除缓存中的数据时调用
.forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]\n", type, oldData, data))
.build();

// 给CuratorCache实例添加监听器
cache.listenable().addListener(listener);

// 启动CuratorCache
cache.start();

// 创建节点/father/son/grandson1
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());

// 创建节点/father/son/grandson1/test
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1/test", "test".getBytes());

// 创建节点/father/son/grandson1/test/test2
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1/test/test2", "test2".getBytes());

// 更改节点/father/son/grandson1的数据
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());

// 更改节点/father/son/grandson1/test的数据
curator.setData()
.forPath("/father/son/grandson1/test", "new test".getBytes());

// 删除节点/father/son/grandson1
curator.delete()
.deletingChildrenIfNeeded()
.forPath("/father/son/grandson1");

Thread.sleep(10000000);
}
}

输出:

// 初始化完成
[forInitialized] : Cache initialized

// 创建节点/father/son/grandson1,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]

// 更改节点/father/son/grandson1,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]

// 删除节点/father/son/grandson1,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}] [null]

很显然​​/father/son/grandson1​​​的子节点并没有被缓存(因为缓存构建选项是​​SINGLE_NODE_CACHE​​),因此子节点的相关操作并没有被监听。

默认(子树)

如果使用默认选项构建缓存,即:

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
[forInitialized] : Cache initialized

// 创建节点/father/son/grandson1,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]

// 创建节点/father/son/grandson1/test,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]

// 创建节点/father/son/grandson1/test/test2,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]

// 更改节点/father/son/grandson1,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]

// 更改节点/father/son/grandson1/test,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]

// 删除节点/father/son/grandson1/test/test2,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}] [null]

// 删除节点/father/son/grandson1/test,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}] [null]

// 删除节点/father/son/grandson1,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}] [null]

很显然​​/father/son/grandson1​​的子节点也被缓存了,因此子节点的相关操作可以被监听。

NodeCacheListener

是一种桥接监听器,可以将旧式监听器​​NodeCacheListener​​​(因为​​NodeCache​​​类已经标记​​@Deprecated​​​注解)与​​CuratorCache​​重用。

@Override
public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
{
listeners.add(new NodeCacheListenerWrapper(listener));
return this;
}

通过​​lambda​​​表达式指定​​NodeCacheListener​​实例,该实例不接收任何参数。

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
CuratorCache.Options.SINGLE_NODE_CACHE);

CuratorCacheListener listener = CuratorCacheListener.builder()
.forNodeCache(() -> System.out.println("forNodeCache"))
.build();

输出:

// 创建节点/father/son/grandson1
forNodeCache
// 更改节点/father/son/grandson1
forNodeCache
// 删除节点/father/son/grandson1
forNodeCache

使用默认选项构建缓存:

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");
// 创建节点/father/son/grandson1
forNodeCache
// 创建节点/father/son/grandson1/test
forNodeCache
// 创建节点/father/son/grandson1/test/test2
forNodeCache
// 更改节点/father/son/grandson1
forNodeCache
// 更改节点/father/son/grandson1/test
forNodeCache
// 删除节点/father/son/grandson1/test/test2
forNodeCache
// 删除节点/father/son/grandson1/test
forNodeCache
// 删除节点/father/son/grandson1
forNodeCache

很显然该监听器监听缓存中的所有节点(不需要知道这些节点发生事件的数据)。

PathChildrenCacheListener

是一种桥接监听器,可以将旧式监听器​​PathChildrenCacheListener​​​(因为​​PathChildrenCache​​​类已经标记​​@Deprecated​​​注解)与​​CuratorCache​​重用。该监听器需要指定根路径,以便仅为此路径的子级而不是路径本身桥接事件。

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
CuratorCache.Options.SINGLE_NODE_CACHE);

CuratorCacheListener listener = CuratorCacheListener.builder()
.forPathChildrenCache("/father/son/grandson1", curator, (client, event) -> {
System.out.println(event);
})
.build();

输出:

// 初始化完成
PathChildrenCacheEvent{type=INITIALIZED, data=null}

使用默认选项构建缓存:

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
PathChildrenCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198507,1640856689919,1640856689919,0,0,0,0,4,0,198507
, data=[116, 101, 115, 116]}}

// 创建节点/father/son/grandson1/test/test2
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198508,198508,1640856689922,1640856689922,0,0,0,0,5,0,198508
, data=[116, 101, 115, 116, 50]}}

// 更改节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198510,1640856689919,1640856689932,1,1,0,0,8,1,198508
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1/test/test2
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198508,198508,1640856689922,1640856689922,0,0,0,0,5,0,198508
, data=[116, 101, 115, 116, 50]}}

// 删除节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198510,1640856689919,1640856689932,1,1,0,0,8,1,198508
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

很显然该监听器监听缓存中指定根节点的所有子节点,并且不包括根节点。

TreeCacheListener

是一种桥接监听器,可以将旧式监听器​​TreeCacheListener​​​(因为​​TreeCache​​​类已经标记​​@Deprecated​​​注解)与​​CuratorCache​​重用。

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
CuratorCache.Options.SINGLE_NODE_CACHE);

CuratorCacheListener listener = CuratorCacheListener.builder()
.forTreeCache(curator, (client, event) -> {
System.out.println(event);
})
.build();

输出:

// 初始化完成
TreeCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1', stat=198523,198523,1640857107063,1640857107063,0,1,0,0,4,1,198524
, data=[100, 97, 116, 97]}}

// 更改节点/father/son/grandson1
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1', stat=198523,198526,1640857107063,1640857107072,1,1,0,0,8,1,198524
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

// 删除节点/father/son/grandson1
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1', stat=198523,198526,1640857107063,1640857107072,1,1,0,0,8,1,198524
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

使用默认选项构建缓存:

CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
TreeCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1', stat=198540,198540,1640857267893,1640857267893,0,1,0,0,4,1,198541
, data=[100, 97, 116, 97]}}

// 创建节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198541,1640857267896,1640857267896,0,0,0,0,4,0,198541
, data=[116, 101, 115, 116]}}

// 创建节点/father/son/grandson1/test/test2
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198542,198542,1640857267899,1640857267899,0,0,0,0,5,0,198542
, data=[116, 101, 115, 116, 50]}}

// 更改节点/father/son/grandson1
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1', stat=198540,198543,1640857267893,1640857267907,1,1,0,0,8,1,198541
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

// 更改节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198544,1640857267896,1640857267910,1,1,0,0,8,1,198542
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1/test/test2
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198542,198542,1640857267899,1640857267899,0,0,0,0,5,0,198542
, data=[116, 101, 115, 116, 50]}}

// 删除节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198544,1640857267896,1640857267910,1,1,0,0,8,1,198542
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1', stat=198540,198543,1640857267893,1640857267907,1,1,0,0,8,1,198541
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

很显然该监听器监听缓存中的所有节点(需要知道这些节点发生事件的数据)。

获取缓存数据

if(curator.checkExists().forPath("/") != null) {
curator.delete().deletingChildrenIfNeeded().forPath("/");
}

CuratorCache cache = CuratorCache.build(curator, "/");

cache.start();

// 创建一系列节点
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());

curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1/test", "test".getBytes());

curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1/test/test2", "test2".getBytes());

curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/kaven", "kaven".getBytes());

curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/jojo", "jojo".getBytes());

// 获取指定路径的节点数据
Optional<ChildData> childData = cache.get("/father/son/grandson1");
if(childData.isPresent()) System.out.println(childData);

// 使用stream将缓存的节点按数据值(字符串)从小到大,如果数据值相等按路径(字符串)从小到大排序
// 收集为List类型
List<ChildData> childDataList = cache.stream()
.sorted((childData1, childData2) -> {
int dataCompare = new String(childData1.getData()).compareTo(new String(childData2.getData()));
if(dataCompare != 0) return dataCompare;
else return childData1.getPath().compareTo(childData2.getPath());
})
.collect(Collectors.toList());

System.out.println(childDataList.size());
// 遍历在缓存中获得的排序后的节点数据
childDataList.forEach(child -> {
System.out.print(child.getPath() + " : ");
if(child.getData().length != 0) System.out.println(new String(child.getData()));
// 如果节点没有存储数据,输出[blank]
else System.out.println("[blank]");
});

输出:

Optional[ChildData{path='/father/son/grandson1', stat=198753,198753,1640920752605,1640920752605,0,1,0,0,4,1,198754
, data=[100, 97, 116, 97]}]
8
/ : [blank]
/father : [blank]
/father/son : [blank]
/father/son/grandson1 : data
/jojo : jojo
/kaven : kaven
/father/son/grandson1/test : test
/father/son/grandson1/test/test2 : test2

输出符合预期。

​Curator​​​框架的数据缓存与监听​​CuratorCache​​就介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


标签:CuratorCache,ZooKeeper,father,Curator,son,grandson1,116,path,data
From: https://blog.51cto.com/u_15870611/5838282

相关文章

  • ZooKeeper : Curator框架之分布式锁InterProcessSemaphoreMutex
    InterProcessSemaphoreMutex​​InterProcessSemaphoreMutex​​类的源码注释:ANONre-entrantmutexthatworksacrossJVMs.UsesZookeepertoholdthelock.Allproc......
  • Canal:ZooKeeper进行集群管理
    集成ZooKeeper前期回顾:​​Canal:部署Canal与CanalAdmin​​搭建​​ZooKeeper​​可以参考下面这几篇博客:​​ZooKeeper:Shell脚本搭建单机版ZooKeeper​​​​ZooKeeper:......
  • 1 zookeeper介绍
     https://mirror.bit.edu.cn/apache/zookeeper/https://www.cnblogs.com/sakura-yxf/p/12020348.html 1Zookeeper集群的角色:Leader和follower(Observer) 只要集群......
  • 2 zookeeper安装
    zookeeper包下载:https://mirror.bit.edu.cn/apache/zookeeper/1下载解压,修改配置zookeeper下载:http://archive.apache.org/dist/zookeeper/https://www.aboutyun.co......
  • zookeeper启动闪退问题
    在配置zookeeper集群时,遇到启动zookeeper后几秒就闪退经过一系列疯狂查找后解决问题方法一:在zookeeper的配置文件zoo.cfg中在/zkData/installed里面查找zookeeper_serv......
  • ZooKeeper系列:实现分布式锁
    锁是为了在多线程的场景中保证数据安全而增加的一种手段,Java中常用的有CountdownLatch,ReentrantLock等单应用中的锁,在现在处处都是分布式的场景需求下就不能满足了,所以就出......
  • 10-注册中心&Zookeeper设计实践课_ev 没用
                                               ......
  • 1.Zookeeper搭建使用
    安装环境准备Zookeeper需要jre环境。单机模式部署解压安装包,修改配置解压Zookeeper安装包,进入conf文件夹,复制zoo-sample.cfg为zoo.cfg,zoo.cfg是Zookeeper的配置文件。......
  • 2.Zookeeper 权限控制 ACL
    Zookeeper权限控制ACL(AccessControlList,访问控制表)ACL权限可以针对节点设置相关读写等权限,保障数据安全性。permissions可以指定不同的权限范围及角色。ACL命令......
  • zookeeper
    Zookeeper1)初识Zookeeper1.1)Zookeeper概念•Zookeeper是ApacheHadoop项目下的一个子项目,是一个树形目录服务。•Zookeeper翻译过来就是动物园管理员,他是用来......