首页 > 数据库 >如何使用mysql实现分布式锁

如何使用mysql实现分布式锁

时间:2023-12-02 15:23:50浏览次数:47  
标签:distributedLock 如何 boolean result mysql import lockKey 分布式

如何使用mysql实现可重入的分布式锁


目录

  • 什么是分布式锁?
  • 如何实现分布式锁?
  • 定义分布式表结构
  • 定义锁统一接口
  • 使用mysql来实现分布式锁
    ① 生成线程标记ID
    ② 加锁
    ③ 解锁
    ④ 重置锁
  • 写在最后

1. 什么是分布式锁?

百度百科:分布式锁是控制分布式系统之间同步访问共享资源的一种方式。

ㅤ如引用所述,分布式锁是一种用于在分布式系统中对资源进行同步访问的机制。在分布式系统中,多个节点同时访问某个共享资源时,需要确保资源的一致性和正确性。分布式锁可以通过协调多个节点之间的操作,保证在同一时间内只有一个节点能够访问该资源,从而避免竞态条件和数据不一致的问题。

2. 如何实现分布式锁?

基于数据库的分布式锁:使用数据库的事务机制来实现分布式锁,通过在数据库中创建一个唯一约束或者加锁表来保证同一时间只有一个节点能够获得锁。
基于共享存储的分布式锁:使用共享存储如Redis、ZooKeeper等来实现分布式锁,通过在共享存储中创建占位节点或者临时节点来表示锁的状态。
基于乐观锁的分布式锁:通过使用版本号或者时间戳等方式,在修改资源时判断是否被其他节点修改,从而实现资源的同步访问。
ㅤ分布式锁的实现需要考虑各种复杂条件,如锁的可重入性、死锁的处理、锁的过期时间等。因此,使用分布式锁时需要谨慎设计和合理选择实现方式,以保证系统的性能和可靠性。

3. 定义分布式表结构

create table if not exists t_distributed_lock
(
    id              int auto_increment primary key,
    lock_key        varchar(255) default '' not null comment '锁key',
    thread_ident    varchar(255) default '' not null comment '线程标识',
    timeout         mediumtext              not null comment '超时时间',
    reentrant_count int          default 0  not null comment '可重入次数',
    version         int          default 1  not null comment '版本号(用于乐观锁)'
)
    comment '分布式锁';

4. 定义锁统一接口

public interface ILock<T> {

    /**
     * 加锁(支持可重入)
     *
     * @param lockKey                   锁key
     * @param lockTimeoutMillisecond    锁超时时间(1.锁不是无限存在的 2.为可能存在的死锁做兜底)
     * @param getLockTimeoutMillisecond 获取锁的超时时间(获取锁的时间应该也是有限的)
     */
    boolean lock(String lockKey, long lockTimeoutMillisecond, long getLockTimeoutMillisecond);

    /**
     * 解锁
     *
     * @param lockKey 锁key
     */
    boolean unLock(String lockKey);

    /**
     * 重置锁
     *
     * @param t 锁对象
     */
    boolean resetLock(T t);
}

ㅤ定义关于锁的统一接口,对参数类型进行泛化。

5. 使用mysql来实现分布式锁

5.1 生成线程标记ID
import cn.hutool.core.lang.UUID;
import cn.wxroot.learn.distributedLock.mysql.entity.DistributedLock;
import cn.wxroot.learn.distributedLock.mysql.mapper.DistributedLockMapper;
import cn.wxroot.learn.distributedLock.ILock;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.util.Objects;

@Service
@Slf4j
public class MysqlLock extends ServiceImpl<DistributedLockMapper, DistributedLock> implements ILock<DistributedLock> {

    ThreadLocal<String> threadIdent = new ThreadLocal<>();

    /**
     * 获取线程标记ID
     *
     * @return 线程标记ID
     */
    private String getThreadIdentId() {

        if (Objects.isNull(threadIdent.get())) {

            threadIdent.set(UUID.randomUUID().toString());
        }

        return threadIdent.get();
    }
    // 加锁、解锁、重置锁
}

ㅤ创建mysql方式的锁实现类。方法getThreadIdentId()来生成线程的唯一标识。

5.1 加锁
    @Override
    public boolean lock(String lockKey, long lockTimeoutMillisecond, long getLockTimeoutMillisecond) {

        boolean result = false;

        long startTime = System.currentTimeMillis();

        String threadIdentId = this.getThreadIdentId();

        while (true) {

            if (startTime + getLockTimeoutMillisecond < System.currentTimeMillis()) {
                log.info("获取锁超时!");
                break;
            }

            QueryWrapper<DistributedLock> distributedLockQueryWrapper = new QueryWrapper<>();
            distributedLockQueryWrapper.lambda().eq(true, DistributedLock::getLockKey, lockKey);
            DistributedLock distributedLock = this.getOne(distributedLockQueryWrapper);

            if (Objects.nonNull(distributedLock)) {

                if (StringUtils.isNotEmpty(distributedLock.getThreadIdent())) {

                    if (threadIdentId.equals(distributedLock.getThreadIdent())) {

                        log.info("重入锁+1");
                        distributedLock.setReentrantCount(distributedLock.getReentrantCount() + 1);
                        distributedLock.setTimeout(System.currentTimeMillis() + lockTimeoutMillisecond);
                        this.updateById(distributedLock);
                        result = true;
                        break;
                    } else {

                        // 其他线程锁定了该lockKey,需要等待
                        if (distributedLock.getTimeout() < System.currentTimeMillis()) {

                            this.resetLock(distributedLock);
                        } else {

                            try {
                                log.info("休眠");
                                Thread.sleep(100);
                            } catch (InterruptedException ignored) {

                            }
                        }
                    }
                } else {

                    log.info("占用锁");
                    distributedLock.setThreadIdent(threadIdentId);
                    distributedLock.setReentrantCount(1);
                    distributedLock.setTimeout(System.currentTimeMillis() + lockTimeoutMillisecond);
                    this.updateById(distributedLock);
                    result = true;
                    break;
                }

            } else {

                log.info("创建新记录");
                DistributedLock addDistributedLock = DistributedLock.builder()
                        .lockKey(lockKey)
                        .threadIdent(threadIdentId)
                        .timeout(System.currentTimeMillis() + lockTimeoutMillisecond)
                        .reentrantCount(1)
                        .version(1)
                        .build();

                this.save(addDistributedLock);
                result = true;
                break;
            }
        }

        return result;
    }

ㅤ实现加锁的方法boolean lock(···)应考虑以下几点:
ㅤ1. 使用锁的时间应该有限的,即不能永久占有锁。
ㅤ2. 获取锁的过程应该是有限的,即指定时间内获取不到锁应该返回。

5.1 解锁
    @Override
    public boolean unLock(String lockKey) {

        boolean result = false;

        String threadIdentId = this.getThreadIdentId();

        log.info("解锁");

        QueryWrapper<DistributedLock> distributedLockQueryWrapper = new QueryWrapper<>();
        distributedLockQueryWrapper.lambda().eq(true, DistributedLock::getLockKey, lockKey);
        DistributedLock distributedLock = this.getOne(distributedLockQueryWrapper);

        if (Objects.nonNull(distributedLock)) {

            if (distributedLock.getThreadIdent().equals(threadIdentId)) {

                if (distributedLock.getReentrantCount() > 1) {

                    distributedLock.setReentrantCount(distributedLock.getReentrantCount() - 1);
                    this.updateById(distributedLock);
                    result = true;
                } else {
                    result = this.resetLock(distributedLock);
                }
            }
        }

        return result;
    }

ㅤ基于可重入锁的特性,在进行解锁操作时,应该有所判断。

5.1 重置锁
    @Override
    public boolean resetLock(DistributedLock distributedLock) {

        log.info("重置锁");
        distributedLock.setThreadIdent("");
        distributedLock.setReentrantCount(0);
        this.updateById(distributedLock);
        return true;
    }


转载请注明出处

标签:distributedLock,如何,boolean,result,mysql,import,lockKey,分布式
From: https://www.cnblogs.com/YangPeng/p/17842144.html

相关文章

  • 岩土工程仪器振弦采集仪在极端气候下如何稳定运行
    岩土工程仪器振弦采集仪在极端气候下如何稳定运行岩土工程仪器振弦采集仪在极端气候下的稳定运行取决于以下几个因素:温度:振弦采集仪应在指定的工作温度范围内运行。在极端气候下,温度可能会超出仪器的工作温度范围,这会影响仪器的稳定性。因此,在使用仪器之前,需要检查工作温度范围,并确......
  • Spring Boot如何在swagger2中配置header请求头等参数信息
    前言在开发Web应用程序时,处理HTTP请求头信息是非常重要的。一般情况下,我们需要将一些固定的参数,如Token、Session等,附加在请求头中进行传递。Swagger2是一个非常流行的API文档生成工具,但在使用时,我们可能需要在请求头中附加这些参数,本文就是为了解决这个问题而写的。摘要本文将......
  • 在Ubuntu上安装MySQL
    一、在Ubuntu上安装MySQLsudoaptupdatesudoaptinstallmysql-server安装完成后,MySQL服务将会自动启动,验证MySQL服务是否正在运行,输入命令:systemctlstatusmysql.service输出显示表示该服务已经启用且正在运行:MySQL安装了一个名为"mysql_secure_installation"的......
  • 【PostgreSQL 数据库线下沙龙(武汉站)】PieCloudDB Database :云原生分布式虚拟数仓的诞
    2023年6月3日,开源软件联盟PostgreSQL中文社区在武汉举办了技术沙龙活动。本次活动主题围绕未来数据库展开讨论和分享。通过探讨未来数据库的概念和特点,为智能化时代的发展提供更多的支持和服务。同时,通过探讨数据库和AI技术的共生共荣,推动数字经济的发展和创新,开创未来数据库的新......
  • 云时代已至,新一代数据分析平台是如何实现的?
    2023年5月,由Stackoverflow发起的2023年度开发者调查数据显示,PostgreSQL已经超越MySQL位居第一,成为开发人员首选。PostgreSQL在国内的热度也越来越高。6月17日,PostgreSQL数据库技术峰会在成都顺利召开。本次大会以“新机遇、新态势、新发展”为主题,邀请众多行业大咖参与本次......
  • mysql语句
    一:表的增删改查只改表的数据,没有改变表的结构 1:创建数据库和删除数据库createdatabase库名dropdatabase库名 2:创建表和删除表createtable表名droptable表名 3:改数据库的名字和表的名字数据库的改名(还没有想清楚)altertable旧表名rename新表明......
  • TCP粘包/拆包,如何解决
    TCP粘包(TCPPacketStickiness):TCP粘包指的是发送方发送的多个小数据包被接收方一次性接收,形成一个大的数据包。这种情况可能会导致接收方难以正确解析消息的边界,因为多个消息被粘合在一起。TCP是面向流的协议,它不保留消息的边界信息,而是将数据流划分为小的数据块进行传输。TCP拆......
  • # yyds干货盘点 # Python如何通过input输入一个键,然后自动打印对应的值?
    大家好,我是皮皮。一、前言前几天在Python最强王者交流群【冯诚】问了一个Python基础的问题,一起来看看吧。问题描述:大佬们,我有个字典如下:dict={'b':2,'a':4,'c':3}如何通过input输入一个键,然后自动打印对应的值?二、实现过程这里【巭孬......
  • 「拓数派(OpenPie)2022发布会实录 」PieCloudDB Database分布式引擎
    10月24日程序员节,拓数派发布了云原生数据库PieCloudDB社区版与商业版。本文整理自拓数派「OpenPie」2022发布会的演讲,将从PieCloudDBDatabase的计算、存储、事务三个方面来介绍分布式引擎模块。计算首先为大家介绍的方面是计算。PieCloudDB通过重新打造云上的数据库内核,突破......
  • 国标GB28181平台LiteCVR如何快速配置平台国标级联?
    今天我们就来介绍一下LiteCVR如何配置平台级联。具体操作步骤如下:1、平台级联在级联中,LiteCVR既可以作为下级平台,也可以作为上级平台,并支持同时级联多个上级平台。作为上级平台时,下级平台按照设备接入方式,配置LiteCVR的SIP信息即可。作为下级平台时,则要在【配置中心】-【国标级联】......