首页 > 数据库 >基于Redis的Stream结构作为消息队列,实现异步秒杀下单

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

时间:2022-12-02 00:01:40浏览次数:50  
标签:异步 Stream g1 订单 -- Redis list 队列 c1

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders

  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId

  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单\

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

VoucherOrderServiceImpl

private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有消息,继续下一次循环
                    continue;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                //处理异常消息
                handlePendingList();
            }
        }
    }

    private void handlePendingList() {
        while (true) {
            try {
                // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create("stream.orders", ReadOffset.from("0"))
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有异常消息,结束循环
                    break;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理pendding订单异常", e);
                try{
                    Thread.sleep(20);
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

思路:

  1. 将lua脚本在项目启动时加载进来

  private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

  2. 获取一个线程,是项目启动时,启动线程任务。 利用了 静态 去创建线程, @PostConstruct 注解为,当前bean实例化后,立即执行。bean是spring控制的,所以项目启动后,线程任务即开始执行。

 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

  3. lua 脚本执行语法解析 : 

Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),String.valueOf(orderId)
);

   4. 第一次从消息队列中拿值得命令: 写法要点:参考了redis的相同方法,虽然java调用的被封装好的方法,本质还是redis的命令。

  

  // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            // 指定group 内容 g1组下的c1
                            Consumer.from("g1", "c1"),
                            //队列的属性是一次取一个值,阻塞等待两秒
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            // 哪个队列名字 queueName,取值方式是取还未消费消息
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );

  5.从消息队列的pending中拿值得命令: 写法要点:参考了redis的相同方法,虽然java调用的被封装好的方法,本质还是redis的命令。

   // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1  2000 STREAMS s1 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        // g1组的c1
                        Consumer.from("g1", "c1"),
                        //一次取一个值,不阻塞
                        StreamReadOptions.empty().count(1),
                        //队列名称queueName,取pendingList的第一个值
                        StreamOffset.create(queueName, ReadOffset.from("0"))
                );

 

标签:异步,Stream,g1,订单,--,Redis,list,队列,c1
From: https://www.cnblogs.com/kisshappyboy/p/16943199.html

相关文章

  • Redis消息队列
    什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:消息队列:存储和管理消息,也被称为消息代理(MessageBroker)生产者:发送消息到消息队列......
  • 秒杀优化-异步秒杀思路
    当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤1、查询优惠卷2、判断秒杀库存是否足够3、查询订单4、校验是否是......
  • 总算给女盆友讲明白了,如何使用stream流的filter()操作
    一、引言在上一篇文章中《这么简单,还不会使用java8stream流的map()方法吗?》分享了使用stream的map()方法,不知道小伙伴还有印象吗,先来回顾下要点,map()方法是把一个流中的......
  • SpringCloud (五) - 云服务器Centos7.6,安装JDK,Maven,Mysql,Redis
    1、购买云服务器购买地址:https://cloud.tencent.com/act/pro/2022double11_warmup后面的环境都是基于此环境Centos7.6;2、安装SecureCRT和SecureFX2.1SecureCRT教......
  • 面试题系列:Redis 夺命连环11问
      1.说说Redis基本数据类型有哪些吧 1.字符串:redis没有直接使用C语言传统的字符串表示,而是自己实现的叫做简单动态字符串SDS的抽象类型。C语言的字符串不记录自......
  • Redis 集群
    Redis集群作用容量不够,redis如何进行扩容并发写操作,redis如何分摊都可使用集群解决。介绍Redis集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分......
  • CS144 lab1: stitching substrings into a byte stream
    CS144lab1:stitchingsubstringsintoabytestream​ Lab1中我们主要实现一个叫做StreamReassembler的东西。从Lab1提供的文档中,我们可以大致了解未来我们将要自己实......
  • SpringBoot(七) - Redis 缓存
    1、五大基本数据类型和操作1.1字符串-string命令说明setkeyvalue如果key还没有,那就可以添加,如果key已经存在了,那会覆盖原有key的值getkey如果key还没有,......
  • Debian 11 安装 redis 5.0
    安装命令sudoaptinstallredis-server安装后检查Redis服务状态sudosystemctlstatusredis-serverRedis的主要配置文件位于 /etc/redis/redis.conf重启red......
  • 分布式锁-redission锁重试和WatchDog机制
    抢锁过程中,获得当前线程,通过tryAcquire进行抢锁,该抢锁逻辑和之前逻辑相同1、先判断当前这把锁是否存在,如果不存在,插入一把锁,返回null2、判断当前这把锁是否是属于当前线......