一、需求:
1.订单下单超过30分钟以后,如果还未支付,则自动转为取消支付状态
2.订单收货超过七天以后,如果还未评价,则自动转为好评
3.等类似需求
二、实现步骤:
1. 引入redisson依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.5</version> </dependency>
<!--加载hutool--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.4.3</version> </dependency>
2. 创建延时队列到期事件处理方法,消费延时队列
/** * @author dong */ public interface DelayQueueConsumer { /** * 执行延迟消息 * * @param delayMessage delayMessage */ void execute(DelayMessage delayMessage); }
3. 具体的延时队列消费实现
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * @author dong */ @Component @Slf4j public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer { @Override public void execute(DelayMessage delayMessage) { //处理自己过期的后的业务 log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage); } }
4. 初始化队列
注:SpringUtil.getBean,若是启动时报空指针,启动类加上@ComponentScan(basePackages={"com.ctw.utils","com.ctw.*"}),扫描自己SpringUtil所在的位置,若是用的hutool则改成@ComponentScan(basePackages={"cn.hutool.extra.spring"})
import com.ctw.utils.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author dong */ @Slf4j @Component public class RedissonDelayQueueClient implements InitializingBean { @Resource private RedissonClient redissonClient; private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16); public void addDelayMessage(DelayMessage delayMessage) { log.info("delayMessage={}", delayMessage); if (delayQueueMap.get(delayMessage.getQueueName()) == null) { log.warn("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName()); return; } delayMessage.setCreateTime("123456"); RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName()); rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit()); } @Override public void afterPropertiesSet() throws Exception { // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values(); for (RedisDelayQueueEnum queueEnum : queueEnums) { DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanId()); if (delayQueueConsumer == null) { throw new RuntimeException("queueName=" + queueEnum.getBeanId() + ",delayQueueConsumer=null,请检查配置..."); } // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后, // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。 RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode()); RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue); delayQueueMap.put(queueEnum.getCode(), rDelayedQueue); // 订阅新元素的到来,调用的是takeAsync(),异步执行 rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute); } } }
5. 延迟队列业务实体
import com.alibaba.fastjson.JSON; import lombok.Data; import java.io.Serializable; import java.util.concurrent.TimeUnit; /** * @author dong */ @Data public class DelayMessage implements Serializable { private String queueName; private Long delayTime; private TimeUnit timeUnit; private String msgBody; private String createTime; @Override public String toString() { return JSON.toJSONString(this); } }
6. 延迟队列Enum
注:如果没有业务不要产生过多的队列业务类型,否则会找不到类而抛出异常
import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @Getter @NoArgsConstructor @AllArgsConstructor public enum RedisDelayQueueEnum { ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderAutoCancelDelayQueueConsumer"); // ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated"); /** * 延迟队列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean * 可通过 Spring 的上下文获取 */ private String beanId; }
7. 此时只需要再下单成功的方法里面新增以下逻辑即可
@Autowired private RedissonDelayQueueClient redissonDelayQueueClient; //添加到延迟队列 DelayMessage delayMessage=new DelayMessage(); delayMessage.setQueueName(RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode()); //时间 delayMessage.setDelayTime(60l); //标识(一般指唯一的单号等) delayMessage.setMsgBody("2023001"); //时间可自行调整(毫秒,秒,分,天等 不填默认秒) // delayMessage.setTimeUnit(TimeUnit.SECONDS); redissonDelayQueueClient.addDelayMessage(delayMessage);标签:Redisson,SpringBoot,队列,private,delayMessage,import,public,延时 From: https://www.cnblogs.com/FRIM/p/17511839.html