首页 > 其他分享 >zookeeper 实现---分布式锁

zookeeper 实现---分布式锁

时间:2022-12-14 18:37:24浏览次数:49  
标签:return String lock zookeeper public --- new 节点 分布式


在分布式解决方案中,Zookeeper是一个分布式协调工具。当多个JVM客户端,同时在ZooKeeper上创建相同的一个临时节点,因为临时节点路径是保证唯一,只要谁能够创建节点成功,谁就能够获取到锁。没有创建成功节点,就会进行等待,当释放锁的时候,采用事件通知给客户端重新获取锁资源。如果请求超时直接返回给客户端超时,重新请求即可。

lock操作过程:

  • 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
  • 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
  • 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
  • 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
  • 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。

unlock操作过程:

  • 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出

其中的几个关键点:

  1. node节点选择为EPHEMERAL_SEQUENTIAL很重要。
    * 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
    * 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
  2. 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)

第一种 ​​Curator实现分布式锁​​

mavn 依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>

配置类

@Configuration
public class ZkConfiguration {
@Autowired
WrapperZk wrapperZk;

@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
wrapperZk.getConnectString(),
wrapperZk.getSessionTimeoutMs(),
wrapperZk.getConnectionTimeoutMs(),
new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
}

@Bean(initMethod = "init")
public DistributedLockByZookeeper distributedLockByZookeeper() {
return new DistributedLockByZookeeper();
}
}

配置文件和配置类

@ConfigurationProperties(prefix = "curator")
public class WrapperZk {
/**
* 重试次数
*/
private int retryCount;
/**
* 重试间隔时间
*/
private int elapsedTimeMs;
/**
* 连接地址
*/
private String connectString;
/**
* Session过期时间
*/
private int sessionTimeoutMs;
/**
* 连接超时时间
*/
private int connectionTimeoutMs;


public int getRetryCount() {
return retryCount;
}

public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}

public int getElapsedTimeMs() {
return elapsedTimeMs;
}

public void setElapsedTimeMs(int elapsedTimeMs) {
this.elapsedTimeMs = elapsedTimeMs;
}

public String getConnectString() {
return connectString;
}

public void setConnectString(String connectString) {
this.connectString = connectString;
}

public int getSessionTimeoutMs() {
return sessionTimeoutMs;
}

public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}

public int getConnectionTimeoutMs() {
return connectionTimeoutMs;
}

public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
}

配置文件 

#重试次数
curator.retryCount=5
#重试间隔时间
curator.elapsedTimeMs=5000
# zookeeper 地址
curator.connectString=109.63.19.25:2181
# session超时时间
curator.sessionTimeoutMs=60000
# 连接超时时间
curator.connectionTimeoutMs=5000
spring.main.allow-bean-definition-overriding=true

分布式锁实现关键类

@Component
public class DistributedLockByZookeeper {

private final static String ROOT_PATH_LOCK = "houxian1103";

private CountDownLatch countDownLatch = new CountDownLatch(1);

/**
* The Curator framework.
*/
@Autowired
CuratorFramework curatorFramework;

/**
* 获取分布式锁
* 创建一个临时节点,
*
* @param path the path
*/
public void acquireDistributedLock(String path) {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
while (true) {
try {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
System.out.println("success to acquire lock for path:{}"+keyPath);

break;
} catch (Exception e) {
//抢不到锁,进入此处!
System.out.println("failed to acquire lock for path:{}"+keyPath);
System.out.println("while try again .......");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
//避免请求获取不到锁,重复的while,浪费CPU资源
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}

/**
* 释放分布式锁
*
* @param path the 节点路径
* @return the boolean
*/
public boolean releaseDistributedLock(String path) {
try {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
System.out.println("failed to release lock,{}"+e.getMessage());
return false;
}
return true;
}

/**
* 创建 watcher 事件
*/
private void addWatcher(String path) {
String keyPath;
if (path.equals(ROOT_PATH_LOCK)) {
keyPath = "/" + path;
} else {
keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
}
try {
final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
System.out.println("上一个节点 " + oldPath + " 已经被断开");
if (oldPath.contains(path)) {
//释放计数器,让当前的请求获取锁
countDownLatch.countDown();
}
}
});
} catch (Exception e) {
System.out.println("监听是否锁失败!{}"+e.getMessage());
}
}

/**
* 创建父节点,并创建永久节点
*/
public void init() {
curatorFramework = curatorFramework.usingNamespace("lock-namespace");
String path = "/" + ROOT_PATH_LOCK;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
addWatcher(ROOT_PATH_LOCK);
System.out.println("root path 的 watcher 事件创建成功");
} catch (Exception e) {
System.out.println("connect zookeeper fail,please check the log >> {}"+ e.getMessage());
}
}

}

测试实现类

@RestController
public class DistributedLockTestApi {
/**
* The Distributed lock by zookeeper.
*/
@Autowired
DistributedLockByZookeeper distributedLockByZookeeper;


private final static String PATH = "testv3";

private int count = 100;

/**
* Gets lock 1.
*
* @return the lock 1
*/
@GetMapping("/lock1")

public ApiResult getLock1() throws Exception {

tickeTest();
// Boolean flag = false;
// distributedLockByZookeeper.acquireDistributedLock(PATH);
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
// }
// flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
// if (flag) {
// return ApiResult.prepare().success("释放锁资源成功!");
// }
return ApiResult.prepare().error(false, 500, "释放锁资源失败");
}

public void tickeTest() throws Exception {

TickRunnable tr = new TickRunnable();

new Thread(tr,"窗口A").start();
new Thread(tr,"窗口B").start();
new Thread(tr,"窗口c").start();

Thread.sleep(10000);
}

第二种 使用  zkclient 客户端实现

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

分布式锁工具类

/**
* 基于 zk的临时顺序节点实现分布式锁实现
*/
public class ZKSDistributeLock implements Lock {


/**
* 利用zk的同父子节点不可重名的特点来实现分布式锁
* 加锁,去创建指定名称的节点,如果能创建成功,则获得锁(加锁成功),如果节点已经存在,就标识锁被别人获取了,你就得阻塞,等待
* 释放锁,删除指定名称的节点
*/

private String lockPath;

private ZkClient client;

private String currentPath;

private String befirePath;


public ZKSDistributeLock(String lockPath){
super();
this.lockPath = lockPath;
client = new ZkClient("106.13.99.25:2181");
client.setZkSerializer(new MyZkSerializer());

if(!this.client.exists(lockPath)){
try {
this.client.createPersistent(lockPath);
} catch (Exception e){

}
}
}

@Override
public void lockInterruptibly() throws InterruptedException {

}
// 非阻塞式加锁
@Override
public boolean tryLock() {

if(this.currentPath == null){
// 创建 一个临时顺序节点
currentPath = this.client.createEphemeralSequential(lockPath +"/","houxianyong");
}

List<String> childrens = this.client.getChildren(lockPath);
Collections.sort(childrens);

// 判断当前节点是否式最小
if(currentPath.equals(lockPath+"/"+childrens.get(0))){
return true;
} else{
// 如果当前节点不是第一,获取前一个节点,赋值给beforePath
// 得都字节得索引号
int cutIndex = childrens.indexOf(currentPath.substring(lockPath.length()+1));
befirePath = lockPath +"/"+childrens.get(cutIndex);
}

return false;
}

@Override
public void lock() {

if(!tryLock()) {
// 如果没有加上锁,阻塞等待
waitForLock();
// 递归尝试加锁
lock();
}
}

private void waitForLock() {

// 增么 让你自己阻塞
CountDownLatch cdl = new CountDownLatch(1);

// 注册 watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("数据改变");
}

@Override
public void handleDataDeleted(String s) throws Exception {

System.out.println("----监听到节点被删除");
cdl.countDown();
}
};

client.subscribeDataChanges(this.befirePath,listener);

if(this.client.exists(this.befirePath)) {

try {
cdl.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}

// 取消watcher 注册
client.unsubscribeDataChanges(this.befirePath,listener);

}


@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {

this.client.delete(this.currentPath);

}

@Override
public Condition newCondition() {
return null;
}
}

序列化类

public class MyZkSerializer implements ZkSerializer {

@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError
{
return new String(bytes, Charsets.UTF_8);
}

@Override
public byte[] serialize(Object obj) throws ZkMarshallingError
{
return String.valueOf(obj).getBytes(Charsets.UTF_8);
}
}

 测试类如下:

public class ZkSDTickerTest {

private static int count = 2;

public static void main(String[] args) {

// 循环屏蔽
CyclicBarrier cb = new CyclicBarrier(count);

for (int i =0;i<count;i++){

new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ "-----我准备好--------");

Lock lock = new ZKSDistributeLock("/dbj88");

try {
cb.await(); // 等待一起出发

lock.lock();
try {
if(count>0){
System.out.println(Thread.currentThread().getName()+"售出:"+ count -- + " 张票");
}
}finally {
lock.unlock();
}

}catch (Exception e){

}
}
}).start();
}
}

}

至此 ,上述是zk 实现得分布式锁,也就是跑个转引个玉,希望对大家有所帮助。

标签:return,String,lock,zookeeper,public,---,new,节点,分布式
From: https://blog.51cto.com/u_15461374/5938153

相关文章

  • 分布式锁--redis 缓存实现
    分布式锁目前几乎所有的大型网站及应用都是采用分布式部署的方式,分布式系统开发带来的优点很多,高可用,高并发,水平扩展,分开部署等。但分布式的开发也带来了一些新问题,有的时候......
  • spring boot + Redis实现消息队列-生产消费者
    实现思路:Redis本身提供了一个发布/订阅模式,但生产消费者模式需要我们自己去实现。利用Redis中的队列,将新消息放入名称为xx的队列末尾,完成消息生产者。启动一个线程,使用​​b......
  • 服务器接口安全设计之--防止重复提交
    这里介绍是通过redis+token来实现防止重复提交问题。1.pom文件依赖<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boo......
  • redis5-cluster 集群搭建
    1、安装环境信息centos7redis52、整体集群信息#以直接在一台机器上实现上述的伪集群,因为端口号特意设置为不同的。#重点:不论机器多少,对于部署过程都是一样的,只不过是在不......
  • nginx 反向代理多示例----实现Session共享
    关于session共享的方式有多种:(1)通过nginx的ip_hash,根据ip将请求分配到对应的服务器(2)基于关系型数据库存储(3)基于cookie存储(4)服务器内置的session复制域。(5)基于nosq......
  • 基于Sharding-Jdbc 实现的读写分离实现
    1.pom文件依赖<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version......
  • Zookeeper 实现分布式配置管理实现 @Value 的动态变化 (二)
      概述:  前一篇 zookeeper 实现了的配置管理,但是最后的时候说过没有实现@Value 的动态变化,也就是说没有实现配置文件的动态变化, 今天在昨天的基础上,实现了配置......
  • qt-信号与槽初步
    接下来,我们将沿着上一篇的进度,学习如何将按钮与退出程序建立连接。建立连接是什么意思呢,我们就拿按钮来解释一下。按钮可以被按下。显然当按钮按下的时候,我们希望程序做出......
  • CSS pointer-events 属性
    pointer-events属性用于设置元素是否对鼠标事件做出反应。CSS语法pointer-events:auto|none;属性值属性值描述auto默认值,设置该属性链接可以正常点击访问。......
  • 《ASP.NET Core 6 框架揭秘》第五章读书笔记 - 配置选项(上)
    5.1读取配置信息.NET的配置支持多样化的数据源。内存变量、环境变量、命令行参数及各种格式的配置文件都可以作为配置的数据来源。 5.1.1编程模型三要素从编程层面......