首页 > 其他分享 >Zookeeper入门实战(5)-分布式锁

Zookeeper入门实战(5)-分布式锁

时间:2023-06-11 10:36:30浏览次数:34  
标签:入门 Thread lock Zookeeper cf countDownLatch new size 分布式

在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。

1、引入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.4.0</version>
</dependency>

2、使用样例

2.1、可重入锁

@Test
public void interProcessMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一线程中可重复获取
                lock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //获取了几次就要释放几次
                release(lock);
                release(lock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.2、不可重入锁

@Test
public void interProcessSemaphoreMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一线程中不可重复获取
                //lock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(lock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.3、读写锁(可重入)

@Test
public void interProcessReadWriteLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
    InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
    InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                readLock.acquire();
                readLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了读锁");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //获取了几次就要释放几次
                release(readLock);
                release(readLock);
                logger.info(Thread.currentThread().getName() + "释放了读锁");
            }
            try {
                writeLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了写锁");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(writeLock);
                logger.info(Thread.currentThread().getName() + "释放了写锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.4、信号量

信号量用于控制对资源同时访问的进程或线程数。

@Test
public void interProcessSemaphoreV2() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            Lease lease = null;
            try {
                //获取一个许可
                lease = semaphore.acquire();
                logger.info(Thread.currentThread().getName() + "获得了许可");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放一个许可
                semaphore.returnLease(lease);
                logger.info(Thread.currentThread().getName() + "释放了许可");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.5、多个锁作为单个实体管理

InterProcessMultiLock 主要功能是将多个锁合并为一个对象来操作,简化了代码量。

@Test
public void InterProcessMultiLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                //相当于 lock.acquire() 和 lock2.acquire()
                multiLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(multiLock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

 

分布式锁使用样例的完整代码如下:

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

public class CuratorLockCase {
    private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class);

    private static String connectString = "10.49.196.33:2181";
    private static int sessionTimeout = 40 * 1000;
    private static int connectionTimeout = 60 * 1000;

    /**
     * 可重入锁
     */
    @Test
    public void interProcessMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一线程中可重复获取
                    lock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //获取了几次就要释放几次
                    release(lock);
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 不可重入锁
     */
    @Test
    public void interProcessSemaphoreMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一线程中不可重复获取
                    //lock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 读写锁(可重入)
     */
    @Test
    public void interProcessReadWriteLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
        InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
        InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    readLock.acquire();
                    readLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了读锁");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //获取了几次就要释放几次
                    release(readLock);
                    release(readLock);
                    logger.info(Thread.currentThread().getName() + "释放了读锁");
                }
                try {
                    writeLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了写锁");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(writeLock);
                    logger.info(Thread.currentThread().getName() + "释放了写锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 信号量,用于控制对资源同时访问的进程或线程数
     */
    @Test
    public void interProcessSemaphoreV2() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                Lease lease = null;
                try {
                    //获取一个许可
                    lease = semaphore.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了许可");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //释放一个许可
                    semaphore.returnLease(lease);
                    logger.info(Thread.currentThread().getName() + "释放了许可");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }


    /**
     * 多个锁作为单个实体管理
     */
    @Test
    public void InterProcessMultiLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    //相当于 lock.acquire() 和 lock2.acquire()
                    multiLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(multiLock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    private CuratorFramework getCuratorFramework() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        return cf;
    }

    private void release(InterProcessLock lock) {
        if (lock != null) {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
CuratorLockCase.java

 

标签:入门,Thread,lock,Zookeeper,cf,countDownLatch,new,size,分布式
From: https://www.cnblogs.com/wuyongyin/p/17316087.html

相关文章

  • WPF入门教程系列二十八 ——DataGrid使用示例MVVM模式(5)
    WPF入门教程系列目录WPF入门教程系列二——Application介绍WPF入门教程系列三——Application介绍(续)WPF入门教程系列四——Dispatcher介绍WPF入门教程系列五——Window介绍WPF入门教程系列十一——依赖属性(一)WPF入门教程系列十五——WPF中的数据绑定(一)  添加Cl......
  • 分布式流处理组件-理论篇:Broker
    ......
  • Vue入门实战05-模板语法
    Vue使用一种基于HTML的模板语法,声明式将其组件实例的数据绑定到DOM。所有Vue模板都是语法层合法的HTML,可被符合规范的浏览器和HTML解析器解析。底层机制中,Vue会将模板编译成高度优化的JavaScript代码。结合响应式系统,当应用状态变更时,Vue能够智能地推导出需要重新渲染的......
  • 矩阵乘法与动态 DP 入门
    矩阵乘法及广义矩阵乘法前置知识:矩阵相关基础概念。记\(A(i,j)\)表示矩阵\(A\)的第\(i\)行第\(j\)列,\(n_A\)为\(A\)的行数,\(m_A\)为\(A\)的列数。定义矩阵加法\(A+B\)为(\(n_A=n_B,m_A=m_B\)):\[\\\\\[A+B](i,j)=A(i,j)+B(i,j)\]矩阵加法有交换律,结合......
  • java——微服务——spring cloud——Nacos——Nacos快速入门
            父工程中新增依赖:          ==================================================================================        客户端依赖修改——userservice和orderservice两个修改       ......
  • .NET 微服务入门
    前置条件安装.NETSDK(目前.NET7.0)2023年6月10日安装Docker前置条件自己搞定。检查.NET和Docker是否安装成功#查看.NET是否安装成功dotnet--version#查看Docker是否安装成功docker--version说明:由于我这边安装了最新.NET8预览版所以显示的是最......
  • Python+sklearn决策树算法使用入门
    在学习决策树算法之前,首先介绍几个相关的基本概念。决策树算法原理与sklearn实现简单地说,决策树算法相等于一个多级嵌套的选择结构,通过回答一系列问题来不停地选择树上的路径,最终到达一个表示某个结论或类别的叶子节点,例如有无贷款意向、能够承担的理财风险等级、根据高考时各科成......
  • 2. docker的入门
    1.物理机演进到虚拟化部署1.1物理机部署在早期的项目部署中是非常繁琐和复杂的,通常就是一台物理机跑起来一个项目部署非常慢成本很高资源浪费难以扩展和迁移1.2虚拟部署 1.2.1虚拟机虚拟机的出现可以很好的解决物理机部署存在的问题一台物理机可以部署多个app......
  • SkyWalking分布式链路追踪工具的基本使用
    下载我们需要一个监控中心,还有一个javaagents工具apache-skywalking-apm(显示/存储多个程序的指标数据),APM是ApplicationPerformanceManagement的缩写和skywalking-agent(收集单个程序的指标数据)启动Skywalking和java程序apache-skywalking-apm\bin\startup.bat......
  • 【若归】 【LGR-142-Div.4】洛谷入门赛 #13赛后反思
    比赛链接:【LGR-142-Div.4】洛谷入门赛#13rk288,比前几次差(可能是因为rated?)A十年OI一场空,不开longlong见祖宗#include<bits/stdc++.h>usingnamespacestd;intmain(){ longlongintn; cin>>n; cout<<"8"<<12*(n-2)<<""<<6*(n-......