在分布式解决方案中,Zookeeper是一个分布式协调工具。当多个JVM客户端,同时在ZooKeeper上创建相同的一个临时节点,因为临时节点路径是保证唯一,只要谁能够创建节点成功,谁就能够获取到锁。没有创建成功节点,就会进行等待,当释放锁的时候,采用事件通知给客户端重新获取锁资源。如果请求超时直接返回给客户端超时,重新请求即可。
lock操作过程:
- 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
- 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
- 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
- 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
- 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
- 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出
其中的几个关键点:
- node节点选择为EPHEMERAL_SEQUENTIAL很重要。
* 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
* 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。 - 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
第一种 Curator实现分布式锁
mavn 依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
配置类
@Configuration
public class ZkConfiguration {
@Autowired
WrapperZk wrapperZk;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
wrapperZk.getConnectString(),
wrapperZk.getSessionTimeoutMs(),
wrapperZk.getConnectionTimeoutMs(),
new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
}
@Bean(initMethod = "init")
public DistributedLockByZookeeper distributedLockByZookeeper() {
return new DistributedLockByZookeeper();
}
}
配置文件和配置类
@ConfigurationProperties(prefix = "curator")
public class WrapperZk {
/**
* 重试次数
*/
private int retryCount;
/**
* 重试间隔时间
*/
private int elapsedTimeMs;
/**
* 连接地址
*/
private String connectString;
/**
* Session过期时间
*/
private int sessionTimeoutMs;
/**
* 连接超时时间
*/
private int connectionTimeoutMs;
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
public int getElapsedTimeMs() {
return elapsedTimeMs;
}
public void setElapsedTimeMs(int elapsedTimeMs) {
this.elapsedTimeMs = elapsedTimeMs;
}
public String getConnectString() {
return connectString;
}
public void setConnectString(String connectString) {
this.connectString = connectString;
}
public int getSessionTimeoutMs() {
return sessionTimeoutMs;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public int getConnectionTimeoutMs() {
return connectionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
}
配置文件
#重试次数
curator.retryCount=5
#重试间隔时间
curator.elapsedTimeMs=5000
# zookeeper 地址
curator.connectString=109.63.19.25:2181
# session超时时间
curator.sessionTimeoutMs=60000
# 连接超时时间
curator.connectionTimeoutMs=5000
spring.main.allow-bean-definition-overriding=true
分布式锁实现关键类
@Component
public class DistributedLockByZookeeper {
private final static String ROOT_PATH_LOCK = "houxian1103";
private CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* The Curator framework.
*/
@Autowired
CuratorFramework curatorFramework;
/**
* 获取分布式锁
* 创建一个临时节点,
*
* @param path the path
*/
public void acquireDistributedLock(String path) {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
while (true) {
try {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
System.out.println("success to acquire lock for path:{}"+keyPath);
break;
} catch (Exception e) {
//抢不到锁,进入此处!
System.out.println("failed to acquire lock for path:{}"+keyPath);
System.out.println("while try again .......");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
//避免请求获取不到锁,重复的while,浪费CPU资源
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
/**
* 释放分布式锁
*
* @param path the 节点路径
* @return the boolean
*/
public boolean releaseDistributedLock(String path) {
try {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
System.out.println("failed to release lock,{}"+e.getMessage());
return false;
}
return true;
}
/**
* 创建 watcher 事件
*/
private void addWatcher(String path) {
String keyPath;
if (path.equals(ROOT_PATH_LOCK)) {
keyPath = "/" + path;
} else {
keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
}
try {
final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
System.out.println("上一个节点 " + oldPath + " 已经被断开");
if (oldPath.contains(path)) {
//释放计数器,让当前的请求获取锁
countDownLatch.countDown();
}
}
});
} catch (Exception e) {
System.out.println("监听是否锁失败!{}"+e.getMessage());
}
}
/**
* 创建父节点,并创建永久节点
*/
public void init() {
curatorFramework = curatorFramework.usingNamespace("lock-namespace");
String path = "/" + ROOT_PATH_LOCK;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
addWatcher(ROOT_PATH_LOCK);
System.out.println("root path 的 watcher 事件创建成功");
} catch (Exception e) {
System.out.println("connect zookeeper fail,please check the log >> {}"+ e.getMessage());
}
}
}
测试实现类
@RestController
public class DistributedLockTestApi {
/**
* The Distributed lock by zookeeper.
*/
@Autowired
DistributedLockByZookeeper distributedLockByZookeeper;
private final static String PATH = "testv3";
private int count = 100;
/**
* Gets lock 1.
*
* @return the lock 1
*/
@GetMapping("/lock1")
public ApiResult getLock1() throws Exception {
tickeTest();
// Boolean flag = false;
// distributedLockByZookeeper.acquireDistributedLock(PATH);
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
// }
// flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
// if (flag) {
// return ApiResult.prepare().success("释放锁资源成功!");
// }
return ApiResult.prepare().error(false, 500, "释放锁资源失败");
}
public void tickeTest() throws Exception {
TickRunnable tr = new TickRunnable();
new Thread(tr,"窗口A").start();
new Thread(tr,"窗口B").start();
new Thread(tr,"窗口c").start();
Thread.sleep(10000);
}
第二种 使用 zkclient 客户端实现
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
分布式锁工具类
/**
* 基于 zk的临时顺序节点实现分布式锁实现
*/
public class ZKSDistributeLock implements Lock {
/**
* 利用zk的同父子节点不可重名的特点来实现分布式锁
* 加锁,去创建指定名称的节点,如果能创建成功,则获得锁(加锁成功),如果节点已经存在,就标识锁被别人获取了,你就得阻塞,等待
* 释放锁,删除指定名称的节点
*/
private String lockPath;
private ZkClient client;
private String currentPath;
private String befirePath;
public ZKSDistributeLock(String lockPath){
super();
this.lockPath = lockPath;
client = new ZkClient("106.13.99.25:2181");
client.setZkSerializer(new MyZkSerializer());
if(!this.client.exists(lockPath)){
try {
this.client.createPersistent(lockPath);
} catch (Exception e){
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
// 非阻塞式加锁
@Override
public boolean tryLock() {
if(this.currentPath == null){
// 创建 一个临时顺序节点
currentPath = this.client.createEphemeralSequential(lockPath +"/","houxianyong");
}
List<String> childrens = this.client.getChildren(lockPath);
Collections.sort(childrens);
// 判断当前节点是否式最小
if(currentPath.equals(lockPath+"/"+childrens.get(0))){
return true;
} else{
// 如果当前节点不是第一,获取前一个节点,赋值给beforePath
// 得都字节得索引号
int cutIndex = childrens.indexOf(currentPath.substring(lockPath.length()+1));
befirePath = lockPath +"/"+childrens.get(cutIndex);
}
return false;
}
@Override
public void lock() {
if(!tryLock()) {
// 如果没有加上锁,阻塞等待
waitForLock();
// 递归尝试加锁
lock();
}
}
private void waitForLock() {
// 增么 让你自己阻塞
CountDownLatch cdl = new CountDownLatch(1);
// 注册 watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("数据改变");
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("----监听到节点被删除");
cdl.countDown();
}
};
client.subscribeDataChanges(this.befirePath,listener);
if(this.client.exists(this.befirePath)) {
try {
cdl.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
// 取消watcher 注册
client.unsubscribeDataChanges(this.befirePath,listener);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
this.client.delete(this.currentPath);
}
@Override
public Condition newCondition() {
return null;
}
}
序列化类
public class MyZkSerializer implements ZkSerializer {
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError
{
return new String(bytes, Charsets.UTF_8);
}
@Override
public byte[] serialize(Object obj) throws ZkMarshallingError
{
return String.valueOf(obj).getBytes(Charsets.UTF_8);
}
}
测试类如下:
public class ZkSDTickerTest {
private static int count = 2;
public static void main(String[] args) {
// 循环屏蔽
CyclicBarrier cb = new CyclicBarrier(count);
for (int i =0;i<count;i++){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ "-----我准备好--------");
Lock lock = new ZKSDistributeLock("/dbj88");
try {
cb.await(); // 等待一起出发
lock.lock();
try {
if(count>0){
System.out.println(Thread.currentThread().getName()+"售出:"+ count -- + " 张票");
}
}finally {
lock.unlock();
}
}catch (Exception e){
}
}
}).start();
}
}
}
至此 ,上述是zk 实现得分布式锁,也就是跑个转引个玉,希望对大家有所帮助。
标签:return,String,lock,zookeeper,public,---,new,节点,分布式 From: https://blog.51cto.com/u_15461374/5938153