首页 > 其他分享 >Zookeeper入门实战(5)-分布式锁

Zookeeper入门实战(5)-分布式锁

时间:2023-06-18 16:38:18浏览次数:37  
标签:入门 Thread lock Zookeeper cf countDownLatch new size 分布式

在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。

1、引入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.4.0</version>
</dependency>

2、使用样例

2.1、可重入锁

@Test
public void interProcessMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一线程中可重复获取
                lock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //获取了几次就要释放几次
                release(lock);
                release(lock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.2、不可重入锁

@Test
public void interProcessSemaphoreMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一线程中不可重复获取
                //lock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(lock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.3、读写锁(可重入)

@Test
public void interProcessReadWriteLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
    InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
    InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                readLock.acquire();
                readLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了读锁");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //获取了几次就要释放几次
                release(readLock);
                release(readLock);
                logger.info(Thread.currentThread().getName() + "释放了读锁");
            }
            try {
                writeLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了写锁");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(writeLock);
                logger.info(Thread.currentThread().getName() + "释放了写锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.4、信号量

信号量用于控制对资源同时访问的进程或线程数。

@Test
public void interProcessSemaphoreV2() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            Lease lease = null;
            try {
                //获取一个许可
                lease = semaphore.acquire();
                logger.info(Thread.currentThread().getName() + "获得了许可");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放一个许可
                semaphore.returnLease(lease);
                logger.info(Thread.currentThread().getName() + "释放了许可");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.5、多个锁作为单个实体管理

InterProcessMultiLock 主要功能是将多个锁合并为一个对象来操作,简化了代码量。

@Test
public void InterProcessMultiLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                //相当于 lock.acquire() 和 lock2.acquire()
                multiLock.acquire();
                logger.info(Thread.currentThread().getName() + "获得了锁");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(multiLock);
                logger.info(Thread.currentThread().getName() + "释放了锁");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

 

分布式锁使用样例的完整代码如下:

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

public class CuratorLockCase {
    private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class);

    private static String connectString = "10.49.196.33:2181";
    private static int sessionTimeout = 40 * 1000;
    private static int connectionTimeout = 60 * 1000;

    /**
     * 可重入锁
     */
    @Test
    public void interProcessMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一线程中可重复获取
                    lock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //获取了几次就要释放几次
                    release(lock);
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 不可重入锁
     */
    @Test
    public void interProcessSemaphoreMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一线程中不可重复获取
                    //lock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 读写锁(可重入)
     */
    @Test
    public void interProcessReadWriteLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
        InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
        InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    readLock.acquire();
                    readLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了读锁");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //获取了几次就要释放几次
                    release(readLock);
                    release(readLock);
                    logger.info(Thread.currentThread().getName() + "释放了读锁");
                }
                try {
                    writeLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了写锁");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(writeLock);
                    logger.info(Thread.currentThread().getName() + "释放了写锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 信号量,用于控制对资源同时访问的进程或线程数
     */
    @Test
    public void interProcessSemaphoreV2() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                Lease lease = null;
                try {
                    //获取一个许可
                    lease = semaphore.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了许可");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //释放一个许可
                    semaphore.returnLease(lease);
                    logger.info(Thread.currentThread().getName() + "释放了许可");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }


    /**
     * 多个锁作为单个实体管理
     */
    @Test
    public void InterProcessMultiLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    //相当于 lock.acquire() 和 lock2.acquire()
                    multiLock.acquire();
                    logger.info(Thread.currentThread().getName() + "获得了锁");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(multiLock);
                    logger.info(Thread.currentThread().getName() + "释放了锁");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    private CuratorFramework getCuratorFramework() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        return cf;
    }

    private void release(InterProcessLock lock) {
        if (lock != null) {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

CuratorLockCase.java

 



标签:入门,Thread,lock,Zookeeper,cf,countDownLatch,new,size,分布式
From: https://blog.51cto.com/wuyongyin/6508735

相关文章

  • excel新手入门
    没办法,实习需要excel。来叨一些快捷键和容易错的东西只粘贴数值ctrl+alt+v切换窗口:alt+tab用excel“爬”数据(操作太sao)3000行左右,数据干净都能做人肉复制,批量粘贴到记事本,然后用word处理换行符,然后在整理成一行一段的格式,然后放到excel里做分列和一些具体的删减替换(可以只......
  • 聊聊Zookeeper的Session会话超时重连
    概述简单地说,ZooKeeper的连接与会话就是客户端通过实例化ZooKeeper对象来实现客户端与服务器创建并保持TCP连接的过程。本质上,Session就是一个TCP长连接。会话Session会话的作用:ZKServer执行任何请求之前,都需要Client与Server先建立Session;Client提交给Server的......
  • 分布式、负载均衡、缓存、数据库中间件【杭州多测师_王sir】
      ......
  • WPF入门教程系列二十八 ——DataGrid使用示例MVVM模式(6)
    WPF入门教程系列目录WPF入门教程系列二——Application介绍WPF入门教程系列三——Application介绍(续)WPF入门教程系列四——Dispatcher介绍WPF入门教程系列五——Window介绍WPF入门教程系列十一——依赖属性(一)WPF入门教程系列十五——WPF中的数据绑定(一)   八......
  • SpringBatch从入门到实战(一):简介和环境搭建
    一:简介SpringBatch是一个轻量级的批处理框架,适合处理大批量的数据(如百万级别)。功能就是从一个地方读数据写到另一个地方去。一般都是系统之间不能直接访问同一个数据库,需要通过文件来交换数据。二:从文件中读然后写到数据库这代码谁都会写,那么为什么还要使用框架?try(BufferedReader......
  • [ML从入门到入门] 初识人工神经网络、感知机算法以及反向传播算法
    前言人工神经网络(Artificialneuralnetworks,ANNs)被广泛认为诞生于20世纪四五十年代,其核心理论可以追溯到19世纪初 Adrien-MarieLegendre发明的最小二乘法,而在今天,经过了半个世纪互联网和计算机技术的迅猛发展,这片耕耘良久的沃土重新掀起了机器学习的研究热潮。本文主要......
  • 相机入门攻略
    本文仅介绍佳能佳能EOS相机ElectroOpticalSystem电子光学系统D结尾(如90D)数码单反相机全画幅和旗舰APS-C画幅:如5DD前面只能是1,5,6,76DMarkII高端APS-C画幅:90D中端:850D200DII低端:1500DM开头(如M50)旧款微单相机R开头(如RS)新款微单相机......
  • ASP.NET Core MVC 从入门到精通之Identity入门
    随着技术的发展,ASP.NETCoreMVC也推出了好长时间,经过不断的版本更新迭代,已经越来越完善,本系列文章主要讲解ASP.NETCoreMVC开发B/S系统过程中所涉及到的相关内容,适用于初学者,在校毕业生,或其他想从事ASP.NETCoreMVC系统开发的人员。经过前几篇文章的讲解,初步了解ASP.NETCore......
  • MongoDB入门操作
    数据库操作查看所有数据库--->showdbs通过use关键字切换数据库--->usetestdb删除数据库--->db.dropDatabase()新增数据db.COLLECTION_NAME.insert(document)注意事项:在MongoDB中,存储的文档结构是一种类似于json的结构,称之为bson(全称为:BinaryJSON)如:{id:2,userna......
  • MongoDB入门介绍
    MongoDB简介MongoDB是一个开源、高性能、支持海量数据存储的文档型数据库是NoSQL数据库产品中的一种,是最像关系型数据库(MySQL)的非关系型数据库数据特征数据存储量较大,甚至是海量对数据读写的响应速度要求较高某些数据安全性要求不高,可以接受一定范围内的误差MongoDB存储......