首页 > 数据库 >redis分布式锁、介绍、具体实现,调用、原理、使用场景

redis分布式锁、介绍、具体实现,调用、原理、使用场景

时间:2023-05-06 15:08:38浏览次数:30  
标签:加锁 return String redis value 调用 key 分布式


一、作用

redis分布式锁:可以分为两点:1.分布式 2.加锁

主要作用是,在多副本部署服务的情况下(或者高并发时),相同时间点内,对业务代码进行加锁,业务代码只能被一个线程执行
用了分布式锁,相当于强制将多副本(或者单副本高并发时)并行改成串行执行,其他副本直接返回或者阻塞等待(排队执行)

由于是多副本部署服务, JVM锁某些情况下不能用,诸如synchronized或ReentrantLock只能是锁定当前副本, 分布式锁就能解决锁定全部副本服务

缺点:并行改成串行后,对高并发不友好,处理能力降低



二、使用场景

1.DB操作扣减/增加商品库存数量,DB操作扣减/增加财务金额;按顺序记录变动前,变动后,变动值
使用mysql时,如果想知道扣减/增加商品库存数量,mysql不能通过一句sql知道数量变动前, 变动后,变动值,所以可使用加锁后,先查询,在更新的方式。
点赞Sql Server可以使用update处理

2.接口防重; 创建订单、支付订单,可以使用用户id进行加锁,防止重复提交; 同一时刻用户只能创建一笔订单或支付一次

3.防止机器高频刷接口;可以使用颁发给前端的token进行加锁

三、redis分布式锁的实现原理

1.单线程执行命令
不考虑redis集群时, 单体redis服务是单线程执行命令的(get、set、delete等等命令), 命令会排队执行,并不存在多个命令同时执行.
redis服务其实也是多线程,但在执行命令时候是单线程的,所以我们经常说它是单线程。
redis在6.0的版本中引入了多线程, 多线程处理了网络I/O,用来提高性能, 但是执行命令还是保留单线程,这个经常是面试重点.

redis提供底层setnx命令;setnx是一个原子性操作;进行加锁
若key不存在时,才会set值且填充过期时间,返回 1 。
若key已存在时,不做任何动作,返回 0。

redis提供底层del命令;进行释放锁
执行成功返回 1 ; 否则返回 0。



四、使用StringRedisTemplate操作redis

如何使用stringRedisTemplate参考:
javascript:void(0)

引入org.springframework.data.redis.core
使用redis的字符串对象opsForValue进行封装加锁、释放锁

实现加锁,对应redis的setnx命令, 具体实现参考下文
stringRedisTemplate.opsForValue().setIfAbsent(K key, V value, long timeout, TimeUnit unit)

为保证释放的锁是自己加的锁,使用lua脚本,保证原子性的, 具体实现参考下文
stringRedisTemplate.execute();

借助lua脚本释放锁时,只有获取到的value是客户端自己的value, 才会去删除锁
所以设置锁的value时,尽可能需要全局唯一的value
KEYS[1] 表示redis中的key, ARGV[1] 表示redis中的value; 具体实现参考下文



五、分布式锁的过期时间

过期时间主要是为了防止释放锁异常,导致死锁;设置了过期时间redis可以自动删除锁。保证后续可以加锁。

过期时间具体值是多长时间,可有开发人员来衡量。下文中代码没默认了10分钟;

如果加锁后,10分种内程序都没运行完,由于又有过期时间,所以会被自动释放,可能导致分布式锁没有唯一性。
可以在加锁之后开启一个子线程进行异步周期性地续时。当释放锁时,再中断结束这个续时线程。这个过程下文代码中并未实现。

过期时间默认10分钟,业务代码还没执行完,这时候就应该优化业务代码,而不是分布锁。什么业务代码可以一次性执行10分钟都没完成??



六、锁的重试机制 retryLock

1.等待时间;设置5秒就可,由开发人员来衡量设置

如果时间设置的太长,用户就会等待太久才能得到响应结果
如果时间设置的太短,太短程序退出,就没有了重试的意义

2.重试间隔时间;

如果时间设置的太长或太短,都会造成重试成功概率减小;
主要依据业务代码的执行时间,如果被锁的业务代码大概500毫秒能执行完,重试间隔时间就可以设置或小于500毫秒,比如400毫秒



七、java代码实现

默认java项目已经安装和配置redis服务,也已经引用StringRedisTemplate

1.定义接口

package com.xxx.redis;

public interface RedisLockService {

    /**
     * 重试获取锁。
     *
     * 第一次获取锁失败后,在重试时间retryTimeout时间内,会挂起线程睡眠一定时间,不断重试,
     * 如果重试成功,则直接返回成功;
     * 如果重试失败,直到超时时间结束,返回失败
     *
     * @param key   锁的key
     * @param value  锁的值; 需要一个唯一值, 可以用UUID来产生, 唯一性可确保加锁和释放琐是同一操作人
     * @param retryTimeout   重试超时时间,时间内,不断重试
     * @return 锁获取成功,返回true;否则,返回false
     */
    boolean retryLock(String key, String value, int retryTimeout);

    /**
     * 获取锁。
     *
     * @param key   锁的key
     * @param value 锁的值; 需要一个唯一值, 可以用UUID来产生, 唯一性可确保加锁和释放琐是同一操作人
     * @return 锁获取成功,返回true;否则,返回false
     */
    boolean lock(String key, String value);

    /**
     * 释放锁
     *
     * @param key   锁的key
     * @param value 锁的值; 需要一个唯一值, 可以用UUID来产生, 唯一性可确保加锁和释放琐是同一操作人
     */
    boolean unlock(String key, String value);
}



2.实现接口

package com.xxx.redis;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class RedisLockServiceImpl implements RedisLockService {

    /**
     * 600000毫秒 = 10分钟; 单位毫秒
     *
     * redis过期时间,过期后自动被删除; 此参数也可使用方法参数来传递进来
     */
    final int expireTime = 1000 * 60 * 10;

    /**
     * 200毫秒;单位毫秒
     *
     * 重试锁,重试间隔时间; 此参数也可使用方法参数来传递进来
     */
    final int retryIntervalTime = 200;

    /**
     * lua脚本,释放锁, lua脚本命令执行具有原子性
     *
     * 保证只会释放客户端自己的锁
     * 说明
     * 1. if redis.call('get', KEYS[1]) == ARGV[1]  获取到的value是客户端自己的value, 才会去删除锁;
     * 2. 基于上面逻辑,设置锁的value时,尽可能需要全局唯一的value
     * 3. KEYS[1] 表示redis中的key
     * 4. ARGV[1] 表示redis中的value
     *
     */
    final String SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] " +
            "then " +
            "       return redis.call('del', KEYS[1]) " +
            "else " +
            "       return 0 " +
            "end";

    @Autowired
    private  StringRedisTemplate redisTemplate;

    @Override
    public boolean retryLock(String key, String requestId, int retryTimeout) {
        if(StringUtils.isBlank(key) || StringUtils.isBlank(requestId)){
            return false;
        }

        long endTime = System.currentTimeMillis() + retryTimeout;
        while (endTime >= System.currentTimeMillis()) {
            boolean lock = this.lock(key, requestId);
            if (lock) {
                return true;
            }
            try {
                //重试锁,睡眠间隔后,再次获取锁,直到成功或超时失败
                Thread.sleep(retryIntervalTime);
            } catch (InterruptedException e) {
                return false;
            }
        }

        return false;
    }

    @Override
    public boolean lock(String key, String requestId) {
        if(StringUtils.isBlank(key) || StringUtils.isBlank(requestId)){
            return false;
        }
        // 若key不存在时,才会set值且填充过期时间,返回 1 。
        // 若key已存在时,不做任何动作,返回 0。
        return redisTemplate.opsForValue().setIfAbsent(this.getLockKey(key), requestId, expireTime, TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean unlock(String key, String requestId) {
        if(StringUtils.isBlank(key) || StringUtils.isBlank(requestId)){
            return false;
        }

        String finalLockKey = this.getLockKey(key);
        Long result = redisTemplate.execute((RedisCallback<Long>) connection ->
                connection.eval(SCRIPT.getBytes(), ReturnType.INTEGER, 1, finalLockKey.getBytes(), requestId.getBytes()));
        if(result.equals(1)){
            return true;
        }

        return false;
    }

    /**
     * 获取加锁Key
     *
     * 自行定义锁的前置key
     *
     * @param key
     * @return
     */
    private String getLockKey(String key){
        String buffer = "LOCK_EKY:" + key;
        return buffer.toUpperCase();
    }
}



3.调用

使用try finally主要想保证在异常时,finally也会释放锁unlock

package com.xxx.controller;

import cn.hutool.core.util.IdUtil;
import com.xxx.redis.RedisLockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/v1/test")
public class TestController {

    @Autowired
    private RedisLockService redisLockService;

    /**
     * 下单接口; 简单分布锁
     * @return
     */
    @PostMapping("/createOrder")
    public String createOrder() {
        // redis的key; 示例:用户id = 1214648798765413; 只要保证每次加锁的key唯一就行,可以动态生成key
        String key = "CREATE_ORDER:1214648798765413";
        // redis的value; UUID即可; 也可以使用 用户id = 1214648798765413, 只要保证每次加锁的value唯一就行
        String requestId = IdUtil.simpleUUID();
        // 加锁成功或失败
        boolean lock = false;
        try {
            //redis分布式加锁
            lock = redisLockService.lock(key, requestId);
            if (lock) {
                //TODO 填充加锁的业务代码

                //举例业务代码,模拟业务代码执行时间
                Thread.sleep(4000L);


                return "业务处理完成";
            } else {
                //获取锁失败,表示别的线程已经占用了锁,正在执行上面业务代码
                //TODO 处理资源已经被占用
                return "业务被锁,请稍后重试";
            }
        } catch (Exception e) {
            //TODO 处理异常
            return "业务报错了";
        } finally {
            //只要获取锁成功,业务代码如出现异常,finally中强制释放锁
            if(lock) redisLockService.unlock(key, requestId);
        }
    }

    /**
     * 下单接口; 重试分布锁
     * @return
     */
    @PostMapping("/createOrder2")
    public String createOrder2() {
        // redis的key; 示例:用户id = 1214648798765413; 只要保证每次加锁的key唯一就行
        String key = "CREATE_ORDER:1214648798765413";
        // redis的value; UUID即可; 也可以使用 用户id = 1214648798765413, 只要保证每次加锁的value唯一就行
        String requestId = IdUtil.simpleUUID();
        // 重试等待时间:6秒; 6秒内不断尝试获取锁, 直至获取成功或超时
        int retryTimeout = 6 * 1000;
        // 加锁成功或失败
        boolean lock = false;
        try {
            //redis分布式加锁, 重试锁
            lock = redisLockService.retryLock(key, requestId, retryTimeout);
            if (lock) {
                //TODO 填充加锁的业务代码

                //举例业务代码,模拟业务代码执行时间
                Thread.sleep(4000L);

                return "业务处理完成";
            } else {
                //获取锁失败,表示别的线程已经占用了锁,正在执行上面业务代码
                //TODO 处理资源已经被占用
                return "业务被锁,请稍后重试";
            }
        } catch (Exception e) {
            //TODO 处理异常
            return "业务报错了";
        } finally {
            //只要获取锁成功,业务代码如出现异常,finally中强制释放锁
            if(lock) redisLockService.unlock(key, requestId);
        }
    }
}


标签:加锁,return,String,redis,value,调用,key,分布式
From: https://blog.51cto.com/u_16082658/6249903

相关文章

  • Redis Desktop Manager简单用法
    RedisDesktopManager简单用法一、官网https://redisdesktop.com/二、登录前置条件是:Redis安装正常且服务已经开启RedisDesktopManager运行打开,点击“连接到Redis服务器”1)输入连接名(不固定字符串),例如:rediscon或test或test1等等2)地址端口号(redis服务器地址及端口号):127.0.0.1......
  • ubuntu安装redis
    redis是C语言编写的一款非关系型数据库,使用键值对存储数据,数据缓存在内存中首先下载安装redis,并测试是否安装正确wgethttp://download.redis.io/releases/redis-6.0.8.tar.gztar-zxvfredis-6.0.8.tar.gzcdredis-6.0.8makemaketest如果make失败,则需要升级gcc如果mak......
  • useeffect下调用`window.onresize`不生效的解决办法
    组件化开发,多个子组件多次调用onresize使主页面的onresize无法生效解决办法时使用addEventListener添加onresize函数useeffect(()=>{window.addEventListener('resize',function(){//当浏览器窗口大小发生变化时,触发的functionfn()console.log('1......
  • redis之五种基本数据类型
    0.前言本文主要讲解redis的五种基本数据类型:String、List、Set、SortedSet、Hash。学习如何使用它们,并且了解它们的底层数据结构实现,这样我们才能在适当的应用场景选择最适合的数据类型来解决我们的需求。1.String1.1简单使用String是redis最简单的且最常用的数据类型,可以......
  • hadoop 3.3.5伪分布式集群部署
    hadoop包下载https://archive.apache.org/dist/hadoop/common/安装好jdk并配置环境变量下载hadoop压缩包并放至/data/hadoop目录解压tar-zxvfhadoop-3.3.5.tar.gz1配置1.1在Hadoop安装目录下进入到etc/hadoop目录,修改Hadoop相关配置文件。<property><name>f......
  • 【已解决】Microsoft Visual C++ Redistributable is not installed
    【Error】导入torch,提示报错:MicrosoftVisualC++Redistributableisnotinstalled,thismayleadtotheDLLloadfailure.【Cause】Anaconda没有默认安装在C盘;系统没有安装VC++Redistributable程序。【Resolve】VC++Redistributable.exe双击安装,重启电脑即可。......
  • 使用Node.js调用Sqlite3模块写的大数据查询接口
    使用Node.js调用Sqlite3模块写的大数据查询接口constsqlite3=require('sqlite3');consthttp=require('http');consturl=require('url');constSqliteDb=async(dbFile)=>{constpri={};pri.db=newsqlite3.Database(dbFile);......
  • 分布式一致性协议综述(下)
    本文首发自「慕课网」,想了解更多IT干货内容,程序员圈内热闻,欢迎关注"慕课网"!作者:大能老师|慕课网讲师 前情回顾:分布式一致性协议综述(上),需要回看请点击阅读Raft协议Paxos是论证了一致性协议的可行性,但是论证的过程据说晦涩难懂,缺少必要的实现细节,而且工程实现难度比较高广为人知实......
  • 不同的编程语言中使用管道pipe(或者说链式调用)
    目录终端语言(如bash,zsh)一般有管道符|pythonjavascriptrubymathematicac#c++scala3终端语言(如bash,zsh)一般有管道符|#将`echo`命令的输出传递给`grep`命令echo"Hello,World!"|grep"World"#将`ls`命令的输出传递给`wc`命令,以统计文件和目录的数量ls|wc......
  • 分布式系统唯一ID生成方案汇总
    系统唯一ID是我们在设计一个系统的时候常常会遇见的问题,也常常为这个问题而纠结。生成ID的方法有很多,适应不同的场景、需求以及性能要求。所以有些比较复杂的系统会有多个ID生成的策略。下面就介绍一些常见的ID生成策略。1.数据库自增长序列或字段最常见的方式。利用数据库,全数......