首页 > 其他分享 >Zookeeper实现分布式锁(Curator API)

Zookeeper实现分布式锁(Curator API)

时间:2023-03-14 20:36:15浏览次数:60  
标签:ourPath false String Zookeeper Curator API null true 节点

Curator API提供了基于Zookeeper的分布式锁的实现

通过查看InterProcessMutexLockInternals源码,确定分布式锁的锁定和释放流程

互斥锁设计的核心思想:同一时间,仅一个进程/线程可以占有

  1. 临时节点:利用临时节点,会话中断,就会删除的特点,避免死锁
  2. 节点的顺序性:利用同一路径下,不能存在相同节点,节点创建存在顺序,先创建的节点的序号更小,序号最小的节点占有锁
  3. Watch机制:监听当前占用锁的路径,如果锁对应的路径被修改,就唤醒所有等待的节点

获取锁

// InterProcessMutex

	public void acquire() throws Exception
    {
        // 获取锁
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
	
	private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();

        LockData lockData = threadData.get(currentThread);
        //线程已经占用锁,增加重入次数
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        // 尝试获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        //设置
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
// LockInternals

    // 该Watcher对象,一旦收到通知,就会唤醒所有阻塞的线程
	private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            client.postSafeNotify(LockInternals.this);
        }
    };

	String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                // 在path下创建一个路径,作为锁路径
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 循环获取锁,成功获取锁,返回true;否则,返回false
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // 会话超时导致的锁释放,重试获取锁
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    // 超过最大重试次数,抛出异常
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            // 返回锁路径
            return ourPath;
        }

        return null;
    }

	private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
			
            // 需要在和Zookeeper保持连接,且未获取到锁,就不断循环获取
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                // 获取锁
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                // 成功获取锁,退出循环,返回true
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    // 获取需要监听的节点的绝对路径
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                           //监听对应的路径 
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            // 设置了超时时间
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                // 已经超时,将doDelete标志位设置为true,后续删除节点
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
								
                                // 未超时,同步阻塞当前线程millisToWait时间
                                wait(millisToWait);
                            }
                            else
                            {
                                // 未设置超时时间,直接同步阻塞当前线程
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            // 设置doDelete=true,删除路径
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

	// 获取锁
    // maxLeases默认为1,表示资源数
	public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        // 在有序的列表中,找到当前节点的索引,索引从0开始,如果<0,表示没找到当前节点
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
		
        // 当前路径所属索引为0时,才能成功获取锁
        boolean         getsTheLock = ourIndex < maxLeases;
        // 需要监听当前节点的前一个节点(这里因为maxLeases为1)
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

释放锁

释放锁的过程较为简单。

参考

标签:ourPath,false,String,Zookeeper,Curator,API,null,true,节点
From: https://www.cnblogs.com/DehLiu/p/17216236.html

相关文章

  • luogu P7599 [APIO2021] 雨林跳跃
    题面传送门我成功了,我不再是以前那个我了!我们发现部分分里面有个单点跳到单点,尝试考虑这个部分分。一个点有两个点可以跳,贪心地想,如果我先跳了比较矮的那个,那么再一步能......
  • nacos报错 Caused by: com.alibaba.nacos.api.exception.NacosException: java.io.IOE
    麻麻劈,根据这个报错一顿ulimit -n 修改打开文件数,鸡儿报错一直在。 最终修改 vi/etc/sysctl.conf增加三项:fs.inotify.max_queued_events=32768fs.inotify.ma......
  • ShowDoc在线API文档
    官方文档: https://www.showdoc.com.cn/helpdocker部署说明:#原版官方镜像安装命令(中国大陆用户不建议直接使用原版镜像,可以用后面的加速镜像)dockerpullstar7th......
  • 集成API
    问题现在,如果要把这个简单的TODO应用变成一个用户能使用的Web应用,我们需要解决几个问题:用户的TODO数据应该从后台读取;对TODO的增删改必须同步到服务器后端;用户在View上必须......
  • #创作者激励# [FFH]napi_generator(一)——NAPI框架生成工具介绍
    【本文正在参加2023年第一期优质创作者激励计划】napi_generator(一)——NAPI框架生成工具介绍个人简介:深圳技术大学FSR实验室大三学生,正于九联科技实习,共同学习研究Open......
  • FastAdmin的API接口生成器插件,使用validate验证时报错等问题。
    1、当使用生成接口生成全局模型时,生成的validate文件的namespace错误应为 namespaceapp\common\validate;实际为namespaceapp\api\validate;解决方法:1、找到ap......
  • OpenAi API接口访问不通
     使用nodejs接api接口,已经开了那啥,但还是报连接失败    ......
  • 百度地图web服务API
    一、获取AK官网地址:https://api.map.baidu.com/lbsapi/cloud/index.htm点击地图API产品,选择web服务API该套API免费对外开放,使用前请先申请密钥(key)点击创建应用提......
  • docker安装zookeeper
    1.docker安装zookeeper1.1.下载zookeeper最新版镜像点击查看代码dockersearchzookeeperdockerpullzookeeperdockerimages//查看下载的......
  • 专为学习提供的标准API
    http://api.shop.eduwork.cn/index.html B站视频地址:https://www.bilibili.com/video/BV1E8411g77X/?spm_id_from=333.337.search-card.all.click&vd_source=7db8c19d0f......