首页 > 数据库 >Redis实现秒杀功能 lua脚本判断库存、判断一人一单、添加到stream队列、异步处理订单

Redis实现秒杀功能 lua脚本判断库存、判断一人一单、添加到stream队列、异步处理订单

时间:2023-01-06 14:00:10浏览次数:45  
标签:stream -- Redis redis lua key spring id

需求:

  1. 新增秒杀商品 - 将秒杀商品的id和秒杀数量添加到秒杀表中 数据库操作
  2. 将秒杀信息保存到Redis中
  3. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否有下单资格
  4. 如果抢购成功,将商品id,订单id,用户id封装后添加到队列
  5. 开启线程任务,不断从队列中获取信息,实现异步下单

Redis秒杀

代码实现

添加依赖

<!--redis依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- 连接池依赖 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!-- 工具类 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.5.16</version>
        </dependency>

redis相关配置

#Redis 数据库索引(默认为0)
spring.redis.database=0
#Redis 服务器地址
spring.redis.host=localhost
#Redis 服务器连接端口
spring.redis.port=6379
#Redis 服务器连接密码(默认为空)
spring.redis.password=123456
#spring.redis.timeout=2000
#连接池最大连接数(使用负值表示没有限制)默认8
spring.redis.lettuce.pool.max-active=8
#连接池最大阻塞等待时间(使用负值表示灭有限制)默认-1
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
#连接池中最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0

注: 如果是本机测试需要为redis设置密码,并且需改redis配置 protected-mode no
否则可能出异常 Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed

在resources目录下创建lua脚本,文件名seckill.lua
脚本内容如下

-- 1.参数列表
-- 1.1.商品id
local productId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. productId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. productId

-- 3.脚本业务
-- 3.1.判断库存是否充足
if(tonumber(redis.call('get',stockKey)) <= 0) then
    --3.2.库存不足,返回1
    return 1
end
-- 3.3.判断用户是否下过单
if(redis.call('sismember',orderKey,userId) == 1) then
    -- 3.4.存在,说明用户重复下单,返回2
    return 2
end
--3.5.扣减库存
redis.call('incrby',stockKey,-1)
--3.6.下单
redis.call('sadd',orderKey,userId)
-- 3.7.发送消息到队列中, XADD key ID filed value [field value ...]
redis.call('xadd','stream.orders','*','productId',productId, 'userId',userId,'orderId',orderId)
return 0

秒杀代码实现类

@Component
public class Seckill {

    private static final long BEGIN_TIMESTAMP=1640995200l;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 生产唯一订单编号
     * 订单号:由64bit组成,前32bit是时间戳,后32bit是自增数每日从1开始自增
     * key:自定义前缀+yyyy:MM:dd
     * */
    public long nextId(String keyPrefix){
        LocalDateTime now=LocalDateTime.now();
        long nowSecond=now.toEpochSecond(ZoneOffset.UTC);
        long timestamp=nowSecond-BEGIN_TIMESTAMP;

        String date=now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count=stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);

        return timestamp << 32 | count;
    }

    private final String SECKILL_STOCK_KEY="seckill:stock:";//库存key
    private final String SECKILL_ORDER_KEY="seckill:order:";//订单key
    private final String SCEKILL_STREAM_KEY="stream.orders";//队列名称
    private final String SECKILL_GROUP_NAME="g1"; //消费者组名称
    /**
    * 上架秒杀商品
    * 将渺少商品id和数量添加到redis
    * @param productId 商品id
    * @param stock 秒杀数量
    * */
    public void addSeckill(String productId,int stock){
        // 1.TODO将秒杀信息添加到秒杀表中
        // 2.将秒杀商品和秒杀数量保存到redis
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+productId,String.valueOf(stock));
        // 3.创建消费者组 XGROUP CREATE key groupName ID [MKSTREAM]
        // key:队列名称  groupName:消费者组名称 ID:起始ID表示,$代表队列中最后一个消息 0则代码队列中第一个消息  MKSTREAM:队列不存在时自动创建队列
        String str= stringRedisTemplate.opsForStream().createGroup(SCEKILL_STREAM_KEY,SECKILL_GROUP_NAME);
        System.out.println("商品上架完成:"+str);
    }

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static{
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    /**
     * 秒杀抢单
     * 执行Lua脚本判断库存是否充足,判断是否重复下单,减库存,将下单用户id保存到redis
     *
     * */
    public long seckillOrder(String productId,String userId){
        //1.获取订单id
        long orderId=nextId("order");
        //2.执行lua脚本
        Long result=stringRedisTemplate.execute(SECKILL_SCRIPT,
                Collections.emptyList(),
                productId,
                userId,
                String.valueOf(orderId));
        int r=result.intValue();
        //2.判断结果是否为0
        //2.1.不为0 代表没有购买资格
        if(r!=0){
            if(r==1){
                System.out.println("库存不足");
                return 0;
            }
            if(r==2){
                System.out.println("不能重复下单");
                return 0;
            }
        }
        //3.返回订单id
        return orderId;
    }

    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

    /**
     * @PostConstruct 该注解在类初始化完成之后执行
     * */
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new OrderHandler());
    }
    private class OrderHandler implements Runnable{
        @Override
        public void run() {
            while (true){
                try{
                    System.out.println("into read ...");
                    // 1.获取消息队列中的订单消息
                    // XREADGROUP GROUP groupname consumer [COUNT count] [BLOCK millisenconds] [NOACK] STREAMS key [key ...] ID [ID ...]
                    // groupname:组名称 consumer:消费者名称 key:队列名称
                    List<MapRecord<String,Object,Object>> list=stringRedisTemplate.opsForStream().read(
                            //group:组名称,name:消费者名称,可任意名称
                            Consumer.from(SECKILL_GROUP_NAME,"c1"),
                            //count:读取1条消息, block:阻塞时间2秒
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
                            //stream:队列名称 ReadOffset:lastConsumed下一个未被消费的消息
                            StreamOffset.create(SCEKILL_STREAM_KEY, ReadOffset.lastConsumed())
                    );
                    // 2.判断消息获取是否成功
                    if(list == null || list.isEmpty()){
                        // 2.1.获取消息失败,说明没有消息
                        System.out.println("未读到消息");
                        continue;
                    }
                    // 3.解析消息中的订单信息
                    MapRecord<String,Object,Object> mapRecord=list.get(0);
                    Map<Object,Object> values=mapRecord.getValue();
                    Order order= BeanUtil.fillBeanWithMap(values,new Order(),true);
                    // 3.TODO订单处理将订单信息更新到mysql
                    System.out.println("read info:"+order.toString());
                    // 4.ACK确认 SACK key group ID
                    stringRedisTemplate.opsForStream().acknowledge(SCEKILL_STREAM_KEY,"g1",mapRecord.getId());
                    System.out.println("streams ID:"+mapRecord.getId()+" ack ok");
                }catch (Exception e){
                    // 5.处理异常消息
                    System.out.println(e.getMessage());
                    try{
                        // 5.1.读取未被确认的消息即pending-list中的消息
                        List<MapRecord<String,Object,Object>> list=stringRedisTemplate.opsForStream().read(
                                //组名称,消费者名称
                                Consumer.from("g1","c1"),
                                //count:读取1条消息, block:阻塞时间2秒
                                StreamReadOptions.empty().count(1),
                                //stream:队列名称 ReadOffset:lastConsumed下一个未被消费的消息
                                StreamOffset.create(SCEKILL_STREAM_KEY, ReadOffset.from("0"))
                        );
                        // 6.TODO处理异常消息
                        // 7.ACK确认
                        stringRedisTemplate.opsForStream().acknowledge(SCEKILL_STREAM_KEY,"g1",list.get(0).getId());
                    }catch (Exception ee){

                    }
                }
            }
        }
    }
}

标签:stream,--,Redis,redis,lua,key,spring,id
From: https://www.cnblogs.com/big-strong-yu/p/17027941.html

相关文章

  • Windows下安装并设置Redis
    作者: ​​铁锚​​日期:2014年8月10日 ​​Redis​​对于​​Linux​​是官方支持的,安装和使用没有什么好说的,普通使用按照官方指导,5分钟以内就能搞定。详情请参考:​......
  • Redis缓存
    Redis缓存有哪些淘汰策略缓存被写满是不可避免的。即使你精挑细选,确定了缓存容量,还是要面对缓存写满时的替换操作。缓存替换需要解决两个问题:决定淘汰哪些数据,如何处理那......
  • Ubuntu 安装 Redis
    本文档记录使用Ubuntu安装一个生产可用的Redis实例。版本UbuntuUbuntu22.04.1LTSRedis7.0.7如果你正在运行一个非常小的发行版(比如Docker容器......
  • springboot使用redis实现计数限流
    lua脚本resources下创建文件redis/AccessLimit.lua内容为:locallimitSecond=tonumber(ARGV[1])locallimitMaxCount=tonumber(ARGV[2])localnum=tonumber(......
  • Redis——概览
     1、网络模型:IO多路复用 2、操作模块 3、数据类型:String、List(列表)、Hash(哈希)、Set(集合)、SortedSet(有序集合)底层数据结构:哈希表、双向链表、压缩列表、跳表、整......
  • <<Redis 核心技术与实战>> 小记随笔 —— 有一亿个keys要统计,应该用哪种集合?
    聚合统计应用场景统计手机App每天的新增用户数和第二天的留存用户数解决方案由于Set类型可以实现并集、交集、差集等能力。所以设计一个Set存所有的用户Id,并且......
  • <<Redis 核心技术与实战>> 小记随笔 —— GEO
    应用场景用于描述LBS(Location-BasedService)的数据结构,能够存储对象的经纬度信息,并且可以进行指定经纬度点指定距离范围内的查询、排序等能力。类型介绍是一种自定......
  • Redis简单入门
    0什么是Redisredis的结构是key-valuekey是字符串,value有5种类型字符串类型,hash类型(map),set类型(不允许重复),list(linkedlist),有序集合类型(sortedset)1字符串类型命令......
  • 解决Centos8.x报Failed to download metadata for repo ‘appstream‘: Cannot prepar
    今天新购买的服务器安装宝塔面板时出现ERROR: 出现下面错误“错误:Failedtodownloadmetadataforrepo‘AppStream’:Cannotprepareinternalmirrorlist:NoURLsi......
  • 基于Redis通用缓存
    基于Redis通用缓存redis简介:流程:基于SpringAop切面类进行增强,逻辑如下1.数据进入controller层调用serviceservice调用对应dao方法进行查询前应该先从redis中查......