首页 > 数据库 >利用redis作为消息队列实现异步秒杀业务

利用redis作为消息队列实现异步秒杀业务

时间:2022-10-22 15:45:55浏览次数:64  
标签:异步 队列 redis lua 消息 key --

实现消费券秒杀的优化,在加入限时抢购的优惠券时,自动的将消费券的库存stock信息也加入到redis中(可设为抢购结束后过期)

抢购之前在redis中进行库存是否充足(stock)、用户是否已经抢购(set)的判断

如果条件都满足,则将订单信息加入到消息队列中

另开启一个线程将消息队列中订单信息异步地同步到数据库中,这样就缓解了直接写数据库的压力,新开启的线程可以根据数据库适应的速度进行写操作

异步秒杀业务流程

说明

Lua脚本保证一些操作在Redis执行的原子性。直接由redis中的eva "lua script" [keys] [args]命令调用

在一开始直接将优惠券的信息直接读到redis中,然后利用lua脚本直接在redis中进行用户抢购资格判断(避免了每次抢购都要直接读数据库进行判断)

根据lua脚本的返回值判断用户是否符合资格,如果符合资格则加入到消息队列(或阻塞队列)中

三种redis实现消息队列的思路

基于list实现消息队列

redis中的list数据结构是一个双向链表,可以利用LPUSH和RPOP、RPUSH和LPOP组合实现消息队列

队列中没有消息再pop会直接返回null,可以使用BLPOP、BRPOP来达到阻塞等待pop的效果

优点:基于redis存储不受限于jvm内存上限、基于redis持久化机制数据安全性有保证、可满足消息有序性

缺点:无法避免消息丢失(一个消费者获取消息后出现异常)、只适用于单消费者模式(只能pop一次)

基于PubSub实现消息队列

常用的命令

subscribe channel [channel]  #消费者订阅频道
publish [channel] [msg]  #生产者向频道发送消息
psubscribe pattern[pattern]  #订阅和pattern格式相同的频道  通配符?、*、[]、\

优点:支持多生产者、多消费者模式

缺点:不支持持久化、无法避免消息丢失(无人订阅时消息一发送就丢失)、消息堆积有上限超出时消息丢失

基于Stream消费组实现消息队列

redis5.0引入的一种数据类型(可持久化),可以实现一个功能非常完善的消息队列

单消费者模式

优点:消息可回溯、一个消息可被多个消费者获取、可阻塞读取

缺点:任然存在消息漏读的风险

消费者组模式

xgroup create [key] [groupname] [id] [mkstream]  
#key代表队列名称、id为消息索引$为最后一个消息,0为第一个消息、mkstream自动创建队列(可选)

xgroup destroy [key] [groupname]
#删除指定的消费者组

xgroup createconsumer [key] [groupname] [consumername]
#给key消息队列的groupname组中添加消费者consumername

xgroup delconsumer [key] [groupname] [consumername]
#将key消息队列的groupname组中消费者consumername删除

是redis中最为完善的消息队列:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有漏读风险、消息确认机制,每个消息至少被消费

基于Stream消息队列异步秒杀的代码实现

新增优惠券信息时将库存信息写入redis中{“voucher:stock:voucherId” : “stockValue”}

创建一个Stream类型的消息队列,名为“stream.orders”。redis创建命令:XGROUP CREATE stream.orders g1 0 MKSTREAM

修改秒杀脚本,用户在redis中获取到抢购资格后直接在redis中将订单信息加入到消息队列中

lua脚本seckill.lua

判断用户是否有购买资格,如果有则将订单信息加入到消息队列中

-- 1、参数列表
-- 1.1、优惠券id
local voucherId = ARGV[1]
-- 1.2、用户id
local userId = ARGV[2]
-- 1.3、订单id
local orderId = ARGV[3]
-- 2、数据key
-- 1.1、库存key
local stockKey = "seckill:stock:" .. voucherId
-- 1.2、订单key
local orderKey = "seckill:order:" .. voucherId
-- 3、脚本业务
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.1、判断库存是否充足,如果不足返回1
    return 1
end
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.2、判断用户是否下过单,如果下过单返回2
    return 2
end
-- 4、扣减库存
redis.call('incrby', stockKey, -1)
-- 5、将userId存入订单的set集合,返回0
redis.call('sadd',orderKey, userId)
-- 6、发送消息到消息队列中  xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

return 0

静态加载lua脚本

//静态加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
     //resource/seckill.lua
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); 
    //lua脚本执行后的返回类型
    SECKILL_SCRIPT.setResultType(Long.class);
}

秒杀的主业务流程

SeckillServiceImpl.seckillVoucher2()

public Result seckillVoucher2(Long voucherId){
    UserDTO user = UserHolder.getUser();
    long orderId = redisIdWorker.nextId("order");

    //1、执行lua脚本
    Long returnValue = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(),
        user.getId().toString(),
        String.valueOf(orderId)
    );
    //2、判断结果是否为0
    //3、不为0,没有资格购买
    int result = returnValue.intValue();
    if(result != 0){
        return Result.fail(result == 1 ? "库存不足" : "仅限抢购一单");
    }

    //4、如果有购买资格的话在lua脚本中就将订单加入到消息队列到中了

    return Result.ok(orderId);
}

在项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,异步的将消息队列中未处理的订单信息写入到数据库

创建一个专门处理处理消息队列的内部类SeckillServiceImpl.VoucherOrderHandler.java

private static final String STREAM_ORDER_MQ = "stream.orders";
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct  //在项目启动时,开启一个线程任务
private void init(){
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

//专门处理将订单信息写入数据库的线程
private class VoucherOrderHandler implements Runnable{
    @Override
    public void run() {
        while(true){
            try {
                //1、获取消息队列中的信息
                //xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >
                List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().block(Duration.ofSeconds(2)).count(1),
                    StreamOffset.create(STREAM_ORDER_MQ, ReadOffset.lastConsumed())
                );

                //2判断获取消息是否成功
                if(records == null || records.isEmpty()){
                    //3、如果失败,说明没有消息,继续循环
                    continue;
                }
                //String 为消息的id, <Object, Object>存的是消息队列中的键值对
                MapRecord<String, Object, Object> record = records.get(0);
                Map<Object, Object> map = record.getValue();

                //4、如果获取成功进行下单
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                handleCreateOrder(voucherOrder);

                //5、在消息队列中对此条消息进行确认
                stringRedisTemplate.opsForStream().acknowledge(STREAM_ORDER_MQ, "g1", record.getId());

            } catch (Exception e) {
                log.error("订单处理错误", e);
                //当出现异常时,从pendingList获取未处理的消息继续处理
                handlePendingList();
            }
        }
    }

    //发生异常时进行处理
    private void handlePendingList(){
        while(true){
            try {
                //1、获取pending-list中的异常消息
                //xreadgroup group g1 c1 count 1 block 2000 streams stream.orders 0
                List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create(STREAM_ORDER_MQ, ReadOffset.from("0"))
                );

                //2判断是否有异常消息
                if(records == null || records.isEmpty()){
                    //3、如果没有跳出循环,执行正常消息
                    break;
                }
                //String 为消息的id, <Object, Object>存的是消息队列中的键值对
                MapRecord<String, Object, Object> record = records.get(0);
                Map<Object, Object> map = record.getValue();

                //4、如果获取成功进行下单
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                handleCreateOrder(voucherOrder);

                //5、在消息队列中对此条消息进行确认
                stringRedisTemplate.opsForStream().acknowledge(STREAM_ORDER_MQ, "g1", record.getId());

            } catch (Exception e) {
                log.error("订单处理错误", e);
                //如果又出现异常,进入下一轮循环
                try {
                    Thread.sleep(50);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

    private void handleCreateOrder(VoucherOrder voucherOrder) {
        voucherOrderService.save(voucherOrder);
    }
}

标签:异步,队列,redis,lua,消息,key,--
From: https://www.cnblogs.com/Gw-CodingWorld/p/16816205.html

相关文章

  • 刷题 LeetCode 栈和队列2
    代码随想录LeetCode20. 有效的括号carl栈思路左括号入栈,右括号出栈,如果出栈时栈为空或不匹配,或者最终栈不为空则false细节LeetCode1047. 删除字符串中的所有......
  • docker安装redis(win10已安装Docker Desktop)
    参考文档:RunRedisStackonDocker|Redis如图所示:  终端中运行以下命令:dockerrun-d--nameredis-stack-p6379:6379-p8001:8001redis/redis-stack:late......
  • ModStart: 宝塔配置 MySQL 队列调度
    宝塔配置MySQL队列调度执行以下操作前提前进入网站根目录,如​​cd/www/wwwroot/xxx.com​​执行​​artisan​​ 命令前请参照开发教程→开发使用问题→如何运行​......
  • 消息队列的解释【杭州多测师】【杭州多测师_王sir】
    一、消息队列概述消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有A......
  • 回顾缓存穿透、缓存雪崩、缓存击穿及封装Redis工具类
    缓存穿透问题的解决思路缓存穿透:缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。常见的解决方案有两种:缓存......
  • 单调队列优化dp(1)(P2034 选择数字)
    参考算法学习笔记(66):单调队列-知乎(zhihu.com)题目描述给定一行n个非负整数a[1]..a[n]。现在你可以选择其中若干个数,但不能有超过k个连续的数字被选择。你的任务是......
  • Redis两节点高可用设计方案​功能验证
    该文档主要是针对RedisGraph高可用方案设计的功能测试,来说明方案是可实施是可行的。具体方案设计看前面的高可用方案设计文章功能测试准备条件master、slave两台服务器172.......
  • redis缓存时间范围数据用法
    场景:A系统需要根据业务系统名(比如业务系统就叫KKK)以及时间范围如2022-10-2210:01到2022-10-2210:31请求B系统,B系统会返回10:01到10:31这30个分钟的数据;这个数据需要缓存......
  • Redis笔记8
    (ObjectivelyDown)stateonlyifatleast<quorum>sentinelsagree. #(客观下降)仅状态至少法定人数同意。 NotethatwhateveristheODOWNquorum,aSentinelw......
  • Redis笔记8
    (ObjectivelyDown)stateonlyifatleast<quorum>sentinelsagree. #(客观下降)仅状态至少法定人数同意。 NotethatwhateveristheODOWNquorum,aSentinelw......