首页 > 数据库 >redisson分布式锁的应用——秒杀、超卖 简单例子(分布式锁相关)

redisson分布式锁的应用——秒杀、超卖 简单例子(分布式锁相关)

时间:2023-02-07 15:13:36浏览次数:50  
标签:加锁 return bookId redis userId redisson key 超卖 分布式

1、常见的分布式事务锁

1、数据库级别的锁

  • 乐观锁,给予加入版本号实现
  • 悲观锁,基于数据库的for update实现

2、Redis,基于SETNX、EXPIRE实现
3、Zookeeper,基于InterProcessMutex实现
4、Redisson的lock、tryLock(背后原理也是Redis)

2、redis搭建模式

单机: 只有一台,挂了就无法工作。主从:备份关系,数据会同步到从库,可以读写分离。哨兵:master挂了,哨兵就进行选举,选出新的master,作用是监控主从,主从切换。集群:高可用,分散请求,目的是将数据分片存储,节省内内存。
image
image
image
分布式事务: 按照传统的系统架构、下单、扣库存等,这一系列的操作都是在一个应用一个数据库中完成的,也就是要保证了事务的ACID特性。如果在分布式应用中就会涉及到跨应用、跨库。这样就涉及到了分布式事务,就要考虑怎么保证这一列操作要么都成功要么都失败。保证数据的一致性

3、redis分布式锁的原理

互斥性:保证同一时间只有一个客户端可以拿到锁
安全性:只有加锁的服务才有解锁权限,也就是不能让客户端A加的锁,客户端B、C都可以解锁
避免死锁:保证加锁与解锁操作是原子操作,这个其实属于是实现分布式锁的问题,假设a用redis实现分布式锁,假设加锁操作,操作步骤分为两步:1,设置key set(key,value) 2,给key设置过期时间。
Redis实现分布式锁的核心就是
加锁

SET key value NX EX timeOut

参数说明:
NX:只有这个key不存才的时候才会进行操作,即 if not exists;
EX:设置key的过期时间为秒,具体时间由第5个参数决定
timeOut:设置过期时间保证不会出现死锁【避免宕机死锁】

代码实现:

public Boolean lock(String key,String value,Long timeOut){
     String var1 = jedis.set(key,value,"NX","EX",timeOut); //加锁,设置超时时间 原子性操作
     if(LOCK_SUCCESS.equals(var1)){
         return true;
     }
     return false;
 }

总的来说,执行上面的set()方法就只会导致两种结果:

当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。已有锁存在,不做任何操作。
注:从2.6.12版本后, 就可以使用set来获取锁、Lua 脚本来释放锁。setnx是以前刚开始的实现方式,set命令nx、xx等参数,,就是为了实现 setnx 的功能。

解锁:

代码实现:

public Boolean redisUnLock(String key, String value) {
    String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else  return 0 end";

    Object var2 = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value));
    if (UNLOCK_SUCCESS == var2) {
        return true;
    }
    return false;
}

这段lua代码的意思:首先获取锁对应的value值,检查是否与输入的value相等,如果相等则删除锁(解锁)。

4、Redisson 分布式锁原理(重要)

Redisson是一个在Redis的基础上实现的Java驻内存数据网格。
加锁流程
image
redisson的lock()、tryLock()方法 底层 其实是发送一段lua脚本到一台服务器:

if (redis.call('exists' KEYS[1]) == 0) then  +  --  exists 判断key是否存在
       redis.call('hset' KEYS[1] ARGV[2] 1);  +   --如果不存在,hset存哈希表
       redis.call('pexpire' KEYS[1] ARGV[1]);  + --设置过期时间
       return nil;  +                            -- 返回null 就是加锁成功
          end;  +
          if (redis.call('hexists' KEYS[1] ARGV[2]) == 1) then  + -- 如果key存在,查看哈希表中是否存在(当前线程)
              redis.call('hincrby' KEYS[1] ARGV[2] 1);  + -- 给哈希中的key加1,代表重入1次,以此类推
              redis.call('pexpire' KEYS[1] ARGV[1]);  + -- 重设过期时间
              return nil;  +
          end;  +
          return redis.call('pttl' KEYS[1]); --如果前面的if都没进去,说明ARGV[2]的值不同,也就是不是同一线程的锁,这时候直接返回该锁的过期时间

参数说明:

KEYS[1]:即加锁的key,RLock lock = redisson.getLock("myLock"); 中的myLock
ARGV[1]:即 TimeOut 锁key的默认生存时间,默认30秒
ARGV[2]:代表的是加锁的客户端的ID,类似于这样的:99ead457-bd16-4ec0-81b6-9b7c73546469:1
其中lock()默认是30秒的生存时间。

锁互斥
假如客户端A已经拿到了 myLock,现在 有一客户端(未知) 想进入:

1、第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。
2、第二个if判断,判断一下,myLock锁key的hash数据结构中, 如果是客户端A重新请求,证明当前是同一个客户端同一个线程重新进入,所以可从入标志+1,重新刷新生存时间(可重入); 否则进入下一个if。
3、第三个if判断,客户端B 会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。

此时客户端B会进入一个while循环,不停的尝试加锁。
watch dog 看门狗自动延期机制

lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
默认值:30000
监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。(如果设置了leaseTimeout那就会自动失效了呀~)

看门狗的时间可以自定义设置:

config.setLockWatchdogTimeout(30000);

看门狗有什么用呢?

假如客户端A在超时时间内还没执行完毕怎么办呢? redisson于是提供了这个看门狗,如果还没执行完毕,监听到这个客户端A的线程还持有锁,就去续期,默认是 LockWatchdogTimeout/ 3 即 10 秒监听一次,如果还持有,就不断的延长锁的有效期(重新给锁设置过期时间,30s)

可以在lock的参数里面指定:

lock.lock(); //如果不设置,默认的生存时间是30s,启动看门狗 
lock.lock(10, TimeUnit.SECONDS);//10秒以后自动解锁,不启动看门狗,锁到期不续

如果是使用了可重入锁( leaseTimeout):

lock.tryLock(); //如果不设置,默认的生存时间是30s,启动看门狗 
lock.tryLock(100, 10, TimeUnit.SECONDS);//尝试加锁最多等待100秒,上锁以后10秒自动解锁,不启动看门狗

这里的第二个参数leaseTimeout 设置为 10 就会覆盖 看门狗的设置(看门狗无效),在10秒后锁就自动失效,不会去续期;如果是 -1 ,就表示 使用看门狗的默认值。

释放锁机制

lock.unlock(),就可以释放分布式锁。就是每次都对myLock数据结构中的那个加锁次数减1。

如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key。

  • 为了安全,会先校验是否持有锁再释放,防止业务执行还没执行完,锁到期了。(此时没占用锁,再unlock就会报错)
  • 主线程异常退出、或者假死
finally {
            if (rLock.isLocked()) {
                if (rLock.isHeldByCurrentThread()) {
                    rLock.unlock();
                }
            }
        }

可能存在的问题
如果是 主从、哨兵模式,当客户端A 把 myLock这个锁 key 的value写入了 master,此时会异步复制给slave实例。
万一在这个主从复制的过程中 master 宕机了,主备切换,slave 变成了master。
那么这个时候 slave还没来得及加锁,此时 客户端A的myLock的 值是没有的,客户端B在请求时,myLock却成功为自己加了锁。这时候分布式锁就失效了,就会导致数据有问题。
所以说Redis分布式说最大的缺点就是宕机导致多个客户端加锁,导致脏数据,不过这种几率还是很小的。

5、实际应用(重要),模拟秒杀任务

引入依赖

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.7.3</version>
        </dependency>

yml文件redis配置

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6124

配置redisson

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
//        config.useSingleServer();//单机
//        config.useMasterSlaveServers();//集群
//        config.useSentinelServers();//哨兵
//        config.useClusterServers();//集群
//        config.setLockWatchdogTimeout(30000);
//使用的Redis主从模式
        config.useMasterSlaveServers()
                .setPassword("redis")
                .setMasterAddress("redis://82.71.16.139:6379")
                .addSlaveAddress("redis://82.71.16.139:6380","redis://82.71.16.139:6381");
        return Redisson.create(config);
    }

新建两个实体

/**
 * @author 公众号:HelloCoder,每天分享Java技术和面试题
 * @date 2020/10/16
 * @Description
 */

@Builder
@Data
@TableName("t_book")
@AllArgsConstructor
@NoArgsConstructor
public class Book {
    @TableId(value = "book_id", type = IdType.AUTO)
    private long bookId;
    private String name;
    private int count;
}
@Builder
@Data
@TableName("t_book_order")
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    @TableId(value = "id", type = IdType.AUTO)
    private int id;
    private String orderId;
    private long bookId;
    private int status;
    private long userId;
    private int count;
    private String billTime;
}
@RestController
@Slf4j
@RequestMapping("Order/")
public class OrderController {

    @Autowired
    BookOrderService bookOrderService;


    @RequestMapping("/seckill")
    public RetResult seckill(@RequestParam(value = "bookId") Long bookId, @RequestParam(value = "userId", required = false) Long userId) {
        if (userId == null) {
            //模拟userId,随机生成,这里应该有前端传入
            userId = (long) (Math.random() * 1000);
        }
        String result = bookOrderService.seckill(bookId, userId);
        return RetResponse.makeOKRsp(result);
    }
}

这里模拟了两种情况:

一种是不加锁,第二种是加redis锁

@Slf4j
@Service
public class BookOrderService {

    @Autowired
    BookMapper bookMapper;

    @Autowired
    OrderMapper orderMapper;

    @Autowired
    RedissonClient redissonClient;
    
    public String seckill(Long bookId, Long userId) {
          return notLockDemo(bookId, userId);
//        return lockDemo(bookId, userId);
    }
    
    
    String lockDemo(Long bookId, Long userId) {
        final String lockKey = bookId + ":" + "seckill" + ":RedissonLock";
        RLock rLock = redissonClient.getLock(lockKey);

        try {
            // 尝试加锁,最多等待20秒,上锁以后10秒自动解锁
            Boolean flag = rLock.tryLock(20, 10, TimeUnit.SECONDS);

            if (flag) {
                //1、判断这个用户id 是否已经秒杀过
                List<Order> list = orderMapper.selectList(new QueryWrapper<Order>().lambda().eq(Order::getUserId, userId).eq(Order::getStatus, 1).eq(Order::getBookId, bookId));
                if (list.size() >= 1) {
                    log.info("你已经抢过了");
                    return "你已经抢过了,一人只能抢一次";
                }

                //2、查库存
                Book book = bookMapper.selectOne(new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
                if (book != null && book.getCount() > 0) {
                    //生成订单
                    String orderId = UUID.randomUUID().toString();
                    Order newOrder = Order.builder().
                            orderId(orderId).
                            status(1).
                            bookId(bookId).
                            userId(userId).
                            count(1).
                            billTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())).build();

                    orderMapper.insert(newOrder);

                    //更新库存
                    Book newBook = Book.builder().count(book.getCount() - 1).build();
                    bookMapper.update(newBook, new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
                    log.info("userId:{} 秒杀成功", userId);
                    return "秒杀成功" + "";
                } else {
                    log.info("秒杀失败,被抢完了");
                }
            } else {
                log.info("请勿重复点击,userid:{} ", userId);
                return "你已经抢过了";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (rLock.isLocked()) {
                if (rLock.isHeldByCurrentThread()) {
                    rLock.unlock();
                }
            }
        }
        return "很遗憾,没货了...";
    }


    String notLockDemo(Long bookId, Long userId) {
        //1、判断这个用户id 是否已经秒杀过
        List<Order> list = orderMapper.selectList(new QueryWrapper<Order>().lambda().eq(Order::getUserId, userId).eq(Order::getStatus, 1).eq(Order::getBookId, bookId));
        if (list.size() >= 1) {
            log.info("你已经抢过了");
            return "你已经抢过了,一人只能抢一次";
        }

        //2、查库存
        Book book = bookMapper.selectOne(new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
        if (book != null && book.getCount() > 0) {
            //生成订单
            String orderId = UUID.randomUUID().toString();
            Order newOrder = Order.builder().
                    orderId(orderId).
                    status(1).
                    bookId(bookId).
                    userId(userId).
                    count(1).
                    billTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())).build();
            orderMapper.insert(newOrder);
            //更新库存
            Book newBook = Book.builder().count(book.getCount() - 1).build();
            bookMapper.update(newBook, new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
            log.info("userId:{} 秒杀成功", userId);
            return "秒杀成功" + "";
        } else {
            log.info("秒杀失败,被抢完了");
            return "很遗憾,没货了...";
        }
    }
}

新建两个表

DROP TABLE IF EXISTS `t_book` ;
CREATE TABLE `t_book` (
  `book_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `name` varchar(400) DEFAULT NULL COMMENT '名称',
  `count` int DEFAULT 0 COMMENT '数量',
  PRIMARY KEY (`book_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

DROP TABLE IF EXISTS `t_book_order` ;
CREATE TABLE `t_book_order` (
	`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` varchar(100) NOT NULL  COMMENT '订单号',
  `book_id` bigint(20) NOT NULL  COMMENT '商品id',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
	`status` int DEFAULT 1 COMMENT '状态',
  `count` int DEFAULT 0 COMMENT '购买数量',
  `bill_time`  datetime DEFAULT NULL COMMENT '下单时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';


INSERT INTO `seckill`.`t_book`(`book_id`, `name`, `count`) VALUES (1, '《HaC的自传》', 5);

测试
1、配置Nginx
配置Nginx,分流进入两个服务。

修改nginx.conf

	upstream mysite {
        server 127.0.0.1:8090 weight=1;
        server 127.0.0.1:8091 weight=1;
    }
    server {
        listen       80;

        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
		
		location / {
		 proxy_pass http://mysite;
        }
    }

说明:当访问localhost:80 端口会分流到8090和8091端口

启动服务,启动两个端口的服务,模拟分布式部署。
(1)不加锁的情况下
使用jmeter 模拟并发。不加锁的情况模拟10个请求在1s发出 共2次,方便查看:
查看一下日志:

8090这台服务器:
image
8091这台服务器:
image
同一时间进入请求。
查询一下订单:
image
库存为0之后,但是初始化只有 5 本书,最后竟然出现了18个订单,显然是有问题的。
这就是不加锁的结果。
(2)加锁的情况下
8090服务器:
image
8091服务器:
image
看一下数据库:
image
刚好生成 5 个订单,没有超卖的现象。

标签:加锁,return,bookId,redis,userId,redisson,key,超卖,分布式
From: https://www.cnblogs.com/cgy1995/p/17098464.html

相关文章

  • 《分布式技术原理与算法解析》学习笔记Day03
    分布式互斥方法什么是分布式互斥?对于同一个共享资源,当一个程序正在使用的时候,不希望被其他程序打扰,这种排他性的资源访问方式,叫做分布式互斥,被互斥访问的共享资源被称作......
  • <<从Paxos到Zookeeper-分布式一致性原理与实践>>
    <<从Paxos到Zookeeper-分布式一致性原理与实践>>1分布式特点分布性对等性并发性缺乏全局时钟故障总会发生2分布式问题通信异常网络分区三态单体应用中一次......
  • Seata分布式事务
    使用Seata版本:1.6.1(2023/2/6最新版)该版本存在很多坑,相较于1.0版本,配置上存在很多差别,如果你的版本不同,请不要参考本文。1.6.1配置存在许多问题,比较难找,如果你使用1.6.1可......
  • 分布式、集群式、负载均衡的区别和联系
    分布式、集群式、负载均衡的介绍:分布式:一个系统拆成多个子系统,部署在不同服务器。每个服务器只做一小块。功能拆分。集群式:每个服务器提供的服务一样,靠数量多取胜。负......
  • 《分布式技术原理与算法解析》学习笔记Day02
    分布式系统发展历程分布式的发展过程经历了三个阶段:单机模式(单兵模式)数据并行或者数据分布式(游击队模式)任务并行或者任务分布式(集团军模式)什么是单机模式,它的优缺点......
  • ElasticSearch分布式搜索引擎——从入门到精通
    ES分布式搜索引擎注意:在没有创建库的时候搜索,ES会创建一个库并自动创建该字段并且设置为String类型也就是text什么是elasticsearch?一个开源的分布式搜索引擎,可以用......
  • 05安装一个Hadoop分布式集群
    安装一个Hadoop分布式集群最小化的Hadoop已经可以满足学习过程中大部分需求,但是为了研究Hadoop集群运行机制,部署一个类生产的环境还是有必要的。因为集群机器比较少,笔者没......
  • 《分布式技术原理与算法解析》学习笔记Day01
    开篇词|四纵四横,带你透彻理解分布式技术谁更好掌握了分布式技术,谁就更容易在新一轮技术浪潮中获得主动。很多有多年工作经验的人,在分布式上面,也可能会有下面的问题:各......
  • Seata分布式事务
    一、理论基础1.1分布式服务的事务问题在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一致,这样的事务就是分布式事务......
  • Idea 源码启动 Apache IoTDB 1.0 分布式
    背景ApacheIoTDB1.0版本实现了新的分布式架构。今天介绍一下如何通过源码启动IoTDB1.0。IoTDB分布式里有两个主要概念,ConfigNode和DataNode,下文介绍启动1C1D。环......