首页 > 数据库 >基于SpringBoot整合Redisson的延迟队列

基于SpringBoot整合Redisson的延迟队列

时间:2023-06-28 16:56:35浏览次数:49  
标签:Redisson SpringBoot 队列 private delayMessage import public 延时

一、需求:

         1.订单下单超过30分钟以后,如果还未支付,则自动转为取消支付状态

    2.订单收货超过七天以后,如果还未评价,则自动转为好评

    3.等类似需求

二、实现步骤:

        1.  引入redisson依赖

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.11.5</version>
        </dependency>

 

        <!--加载hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.3</version>
        </dependency>

         2. 创建延时队列到期事件处理方法,消费延时队列  

/**
 * @author dong
 */
public interface DelayQueueConsumer {

    /**
     * 执行延迟消息
     *
     * @param delayMessage delayMessage
     */
    void execute(DelayMessage delayMessage);

}

      3. 具体的延时队列消费实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author dong
 */
@Component
@Slf4j
public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer {

    @Override
    public void execute(DelayMessage delayMessage) {
        //处理自己过期的后的业务
        log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage);
    }
}

    4. 初始化队列

  注:SpringUtil.getBean,若是启动时报空指针,启动类加上@ComponentScan(basePackages={"com.ctw.utils","com.ctw.*"}),扫描自己SpringUtil所在的位置,若是用的hutool则改成@ComponentScan(basePackages={"cn.hutool.extra.spring"})

import com.ctw.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author dong
 */
@Slf4j
@Component
public class RedissonDelayQueueClient implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16);

    public void addDelayMessage(DelayMessage delayMessage) {
        log.info("delayMessage={}", delayMessage);
        if (delayQueueMap.get(delayMessage.getQueueName()) == null) {
            log.warn("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
            return;
        }
        delayMessage.setCreateTime("123456");
        RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName());
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer
        RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();

        for (RedisDelayQueueEnum queueEnum : queueEnums) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanId());
            if (delayQueueConsumer == null) {
                throw new RuntimeException("queueName=" + queueEnum.getBeanId() + ",delayQueueConsumer=null,请检查配置...");
            }
            // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
            // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
            RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);
            // 订阅新元素的到来,调用的是takeAsync(),异步执行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }
}

   5. 延迟队列业务实体

import com.alibaba.fastjson.JSON;
import lombok.Data;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 * @author dong
 */
@Data
public class DelayMessage implements Serializable {

    private String queueName;

    private Long delayTime;

    private TimeUnit timeUnit;

    private String msgBody;

    private String createTime;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

6. 延迟队列Enum

  注:如果没有业务不要产生过多的队列业务类型,否则会找不到类而抛出异常

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {

    ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderAutoCancelDelayQueueConsumer");
//    ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");

    /**
     * 延迟队列 Redis Key
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private String beanId;

}

7. 此时只需要再下单成功的方法里面新增以下逻辑即可

    @Autowired
    private RedissonDelayQueueClient redissonDelayQueueClient;

        //添加到延迟队列
      DelayMessage delayMessage=new DelayMessage();
        delayMessage.setQueueName(RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());
        //时间
        delayMessage.setDelayTime(60l);
        //标识(一般指唯一的单号等)
        delayMessage.setMsgBody("2023001");
        //时间可自行调整(毫秒,秒,分,天等 不填默认秒)
//        delayMessage.setTimeUnit(TimeUnit.SECONDS);
        redissonDelayQueueClient.addDelayMessage(delayMessage);

标签:Redisson,SpringBoot,队列,private,delayMessage,import,public,延时
From: https://www.cnblogs.com/FRIM/p/17511839.html

相关文章

  • Springboot实现邮件发送
    本文以QQ邮箱为例,实现springboot邮件发送邮箱设置 主要获取授权码   按照步骤开启服务 获取授权码导入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency>配置......
  • Java阻塞队列原理
    阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:1.当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。2.当队列中填满数据的情况下,生产者端的所有线程都会自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。阻塞队列的......
  • springboot 自定义异常 全局异常处理器
    创建自定义异常类,继承 RuntimeException类1publicclassCustomExceptionextendsRuntimeException{2publicCustomException(Stringmessage){3super(message);4}5}在指定情况下抛出该异常,代码如下: @ServicepublicclassCategoryServiceIm......
  • 谷粒商城项目篇12_分布式高级篇_购物车功能、消息队列RabbitMQ
    目录购物车模块vo的编写编写interceptor绑定user-key线程共享数据购物车商品的增加添加完成重定向避免刷新页面重复提交购物车商品的增删改查消息队列RabbitMQ场景理解概述docker安装RabbitMQ整合SpringBoot消息确认机制一、购物车模块需求描述在线购物车:登录状态添......
  • SpringBoot接入Chat-GPT3
    创建一个APIKeyAPIKey创建网址:https://platform.openai.com/account/api-keys先登录OpenAI账号登陆后创建一个APIKey起一个名字点击创建,生成一个APIKey,记录下这个APIKey后续会用。可以在官网看到剩余容量,账号默认会送$18刀的容量,超过需要购买。在SpringB......
  • Python全栈工程师(23:消息队列RabbitMQ)
    谁能用通俗的语言解释一下什么是RPC框架?深入浅出RPC-浅出篇深入浅出RPC-深入篇1小时写一个分布式系统基础框架(一个java实现帮助理解RPC)RabbitMQ消息队列安装 http://www.rabbitmq.com/install-standalone-mac.html安装pythonrabbitMQmodulepipinstallpikaoreasy_......
  • SpringBoot自定义starter
    1、先来一个简单的案例非常简单的工程结构controllerpackagecom.ly.demo.controller;importcom.ly.demo.service.MyStarterService;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;import......
  • SpringBoot项目中功能集成的方式
    原文合集地址如下,有需要的朋友可以关注本文地址合集地址SpringBoot项目中功能集成的方式接口集成基于HTTP协议的集成方式协议和通信HTTP是一种基于客户端-服务器模型的协议。确定使用的HTTP版本(如HTTP/1.1或HTTP/2)以及通信过程中使用的其他协议和规范。在协议和通信阶段,需......
  • 基于MySQL+SpringBoot+IDEA开放的绩效评估系统
    基于MySQL+SpringBoot+IDEA开放的绩效评估系统项目介绍......
  • springboot配置
    复习:配置SSMMAVEN==》依赖(一堆的)web.xml(servlet,filter,listener...)spring-mvc.xmlspring-mybatis.xmlmapper....缺点:忒麻烦(配置文件多)容易出错花费得时间长如果从0开始配置一个helloworld(url)ssm项目一,springboot最重要得2个特点1,自动配置spring自动管理......