首页 > 数据库 >Redis加Lua脚本实现分布式锁

Redis加Lua脚本实现分布式锁

时间:2024-02-27 19:35:39浏览次数:35  
标签:return KEYS Redis redis ARGV Lua call public 分布式

先讲一下为什么使用分布式锁:

在传统的单体应用中,我们可以使用Java并发处理相关的API(如ReentrantLock或synchronized)来实现对共享资源的互斥控制,确保在高并发情况下同一时间只有一个线程能够执行特定方法。然而,随着业务的发展,单体应用逐渐演化为分布式系统,多线程、多进程分布在不同机器上,这导致了原有的单机部署下的并发控制策略失效。为了解决这一问题,我们需要引入一种跨JVM的互斥机制来管理共享资源的访问,这就是分布式锁所要解决的核心问题。

Lua介绍

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

为什么要用Lua呢

Redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。

在以下场景中:

  1. 当 事务1执行删除操作时,查询到的锁值确实相等。
  2. 在 事务1执行删除操作之前,锁的过期时间刚好到达,导致 Redis 自动释放了该锁。
  3. 事务2获取了这个已被释放的锁。
  4. 当 事务1执行删除操作时,会意外地删除掉 事务2持有的锁。

上面的删除情况也无法保证原子性,只能通过lua脚本实现

如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。

Lua脚本命令

在Redis中需要通过eval命令执行lua脚本

EVAL script numkeys key [key ...] arg [arg ...]

script:lua脚本字符串,这段Lua脚本不需要(也不应该)定义函数。
numkeys:lua脚本中KEYS数组的大小
key [key ...]:KEYS数组中的元素
arg [arg ...]:ARGV数组中的元素

案列1:动态传参

EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 8 10 30 40 50 60 70 
# 输出:8 10 60 70

EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20
# 输出:0

EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10
# 输出:1

案列2:执行redis类库方法

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 bbb 20

可重入性

可重入性是指一个线程在持有锁的情况下,可以多次获取同一个锁而不会发生死锁或阻塞的特性。在可重入锁中,线程可以重复获取已经持有的锁,每次获取都会增加一个计数器,直到计数器归零时才会真正释放锁。
下面是一个示例代码来说明可重入性:

public synchronized void a() {
 b();
}
public synchronized void b() {
 // pass
}

假设线程X在方法a中获取了锁后,继续执行方法b。如果这是一个不可重入的锁,线程X在执行b方法时将会被阻塞,因为它已经持有了该锁并且无法再次获取。这种情况下,线程X必须等待自己释放锁后才能再次争抢该锁。
而对于可重入性的情况,当线程X持有了该锁后,在遇到加锁方法时会直接将加锁次数加1,并继续执行方法逻辑。当退出加锁方法时,加锁次数再减1。只有当加锁次数归零时,该线程才会真正释放该锁。
因此,可重入性的最大特点就是计数器的存在,用于统计加锁的次数。在分布式环境中实现可重入分布式锁时也需要考虑如何正确统计和管理加锁次数。

加锁脚本

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。

if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
    redis.call('hincrby', KEYS[1], ARGV[1], 1);
    redis.call('expire', KEYS[1], ARGV[2]);
    return 1;
else
	return 0;
end

假设值为:KEYS:[lock], ARGV[uuid, expire]

如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。

解锁脚本

-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then 
    return nil; 
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    return 1; 
end;

如果锁不存在直接返回null,如果锁存在就对数量进行减一,如果减到等于0 就直接删除此锁

自动续期

有可能代码没执行完毕,锁就到期了。基于上面这种情况需要对锁进行续期。使用定时器加lua脚本进行对锁续期

if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then 
    redis.call('expire', KEYS[1], ARGV[2]); 
    return 1; 
else 
    return 0; 
end

Java代码实现


考虑到分布式锁可能使用多种方式实现,比如Redis、mysql、zookeeper,所以暂时做成一个工厂类,按需使用。

以下是完整代码:

public class DistributedRedisLock implements Lock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private long expire = 30;

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = uuid + ":" + Thread.currentThread().getId();
    }

    @Override
    public void lock() {
        this.tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            return this.tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 加锁方法
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time != -1){
            this.expire = unit.toSeconds(time);
        }
        String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
                "   redis.call('expire', KEYS[1], ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){
            Thread.sleep(50);
        }
        // 加锁成功,返回之前,开启定时器自动续期
        this.renewExpire();
        return true;
    }

    /**
     * 解锁方法
     */
    @Override
    public void unlock() {
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
                "then " +
                "   return redis.call('del', KEYS[1]) " +
                "else " +
                "   return 0 " +
                "end";
        Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
        if (flag == null){
            throw new IllegalMonitorStateException("this lock doesn't belong to you!");
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    private void renewExpire(){
        String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
                "then " +
                "   return redis.call('expire', KEYS[1], ARGV[2]) " +
                "else " +
                "   return 0 " +
                "end";
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
                    renewExpire();
                }
            }
        }, this.expire * 1000 / 3);
    }
}

DistributedLockClient

@Component
public class DistributedLockClient {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private String uuid;

    public DistributedLockClient() {
        this.uuid = UUID.randomUUID().toString();
    }

    public DistributedRedisLock getRedisLock(String lockName){
        return new DistributedRedisLock(redisTemplate, lockName, uuid);
    }
}

使用及测试:

在业务代码中使用:

public void deduct() {
    DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");
    redisLock.lock();

    try {
        // 1. 查询库存信息
        String stock = redisTemplate.opsForValue().get("stock").toString();

        // 2. 判断库存是否充足
        if (stock != null && stock.length() != 0) {
            Integer st = Integer.valueOf(stock);
            if (st > 0) {
                // 3.扣减库存
                redisTemplate.opsForValue().set("stock", String.valueOf(--st));
            }
        }
    } finally {
        redisLock.unlock();
    }
}

测试可重入性:

红锁算法

在Redis集群状态下可能出现的问题如下:

  1. 客户端A从主节点(master)获取到了锁。
  2. 在主节点将锁同步到从节点(slave)之前,主节点发生宕机。
  3. 从节点被晋升为主节点。
  4. 客户端B获取了同一个资源,但是客户端A已经在另一个锁上获取了锁。
    在这种情况下,由于主节点宕机导致从节点晋升为新的主节点,可能会出现客户端B误认为资源未被锁定而获取了另一个锁的情况。这可能导致数据不一致性或竞争条件的发生。
    为了避免这种问题

安全失效

解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock

实现步骤:

  1. 客户端向N个Redis节点发送请求获取锁。
  2. 每个Redis节点生成一个独立的随机值作为锁值,并设置相同的过期时间。
  3. 客户端等待大部分节点(如大多数节点的一半以上)返回获取成功的响应。
  4. 如果大部分节点返回获取成功,则认定为成功获取了分布式锁;否则认定为未获取到分布式锁。

标签:return,KEYS,Redis,redis,ARGV,Lua,call,public,分布式
From: https://www.cnblogs.com/dupengpeng/p/18037650

相关文章

  • redis自学(6)SkipList
    SkipListSkipList(跳表)首先是链表,但与传统链表相比有几点差异:  元素按照升序排列存储  节点可能包含多个指针,指针跨度不同(最多允许32级指针,跨度成倍数递增)    SkipList的特点:  跳跃表是一个双向链表,每个节点都包含score和ele值  节点按照score值排序,sc......
  • Redis集群在线迁移
    一、redis集群迁移的几种方式离线迁移:通过rdb或者aof文件的方式,实现离线迁移缺点:版本限制,不同版本启动时,可能会出现覆盖数据文件的情况(也可能是集群模式的限制);需要停机,会造成数据不一致问题。使用shell脚本,单库对单库,性能极低,生产环境不能使用。主从同步:成为从节点slaveofip......
  • Redis部署-主从复制
    原理分master和slave;master以写为主,slave只支持读,master的数据发生变化时自动同步到slave。作用读写分离:主机提供写,从机提供读,可提高性能,分散负载压力容灾恢复:主机与从机数据一致,其中一台机器宕机后,另一台机器能正常使用数据备份:主机与从机分散在不同的机器,数据一致,起到了数......
  • 【进阶篇】使用 Redis 实现分布式缓存的全过程思考(一)
    目录前言一、关于缓存二、基本数据结构三、缓存注解3.1自定义注解3.2定义切点(拦截器)3.3AOP实现3.4使用示例四、数据一致性4.1缓存更新策略4.2缓存读写过程五、高可用5.1缓存穿透5.2缓存击穿5.3缓存雪崩5.4Redis集群六、文章小结前言写在前面,让我们从3个问题开始今天的文章:......
  • 项目开发中 Redis 缓存和数据库一致性问题及解决方案
    引入Redis缓存提高性能如果公司的项目业务处于起步阶段,流量非常小,那无论是读请求还是写请求,直接操作数据库即可,这时架构模型是这样的:但随着业务量的增长,你的项目业务请求量越来越大,这时如果每次都从数据库中读数据,那肯定会有性能问题。这个阶段通常的做法是,引入缓存来提高读性......
  • 13.分布式事件总线DotNetCore.CAP的简单使用
    DotNetCore.CAP框架提供了一个简单易用的API和多种消息传输协议支持(包括Redis、RabbitMQ等),可以让用户轻松地实现消息队列、事件发布/订阅、分布式事务等功能。它还具备自动重试、异常处理、数据序列化等高级特性,可以保证消息的可靠性和一致性。使用DotNetCore.CAP框架,你可以:1.......
  • 使用python批量删除redis key
     比如我的业务。刚上线默认为超级管理员新增权限--请导出id用于清缓存svc格式请注意分页需要导出全部selectCONCAT('@rbac/ent/aclgr/',e.id)as需要清理缓存的rediskeyfroment_rbac_groupewherenotexists(selectp.`groupid`froment_rbac_group_permissionp......
  • Redis扩展功能
    Redis事务一次操作执行多条命令,本质是一组命令的集合。一个事务中的所有命令都会序列化,按顺序地串行化执行而不会被其它命令插入、不许加塞。由于redis只能在执行前检查一组命令的语法错误,在命令执行时出现异常没法全体回滚,所以是弱一致性。multi+exec组合正常执行执行前队......
  • 机器学习策略篇:详解单一数字评估指标(Single number evaluation metric)
    单一数字评估指标无论是调整超参数,或者是尝试不同的学习算法,或者在搭建机器学习系统时尝试不同手段,会发现,如果有一个单实数评估指标,进展会快得多,它可以快速告诉,新尝试的手段比之前的手段好还是差。所以当团队开始进行机器学习项目时,经常推荐他们为问题设置一个单实数评估指标。......
  • Unity xLua开发环境搭建与基础进阶
    Unity是一款非常流行的游戏开发引擎,而xLua是一个为Unity开发者提供的Lua框架,可以让开发者使用Lua语言来进行游戏开发。在本文中,我们将介绍如何搭建UnityxLua开发环境,并进行基础进阶的学习。 对啦!这里有个游戏开发交流小组里面聚集了一帮热爱学习游戏的零基础小白,也有一些正......