原理:利用消息过期后消息进入死信,然后消费者订阅死信队列进行消费达到延迟的功能
生产者-->交换机01-->过期队列-->消息过期后-->死信交换机-->死行队列-->消费者定义配置
@Configuration public class TTLQueueConfig { //region 声明普通类型的交换机和队列 public static final String PT_EXCHANGENAME="zd.putong.exchange"; public static final String PT_QUEUEA="zd.putong.queueA"; public static final String PU_ROUTINGKEY="queueA"; // 声明普通交换机 @Bean("ptExchange") public DirectExchange ptExchange(){ return new DirectExchange(PT_EXCHANGENAME); } // 声明队列 设置该队列ttl 并将其绑定到 死信交换机上 @Bean("queueA") public Queue queueA(){ Map<String, Object> args = new HashMap<>(3); //绑定到死信交换机上 args.put("x-dead-letter-exchange",SX_EXCHANGENAME); //绑定死信路由key args.put("x-dead-letter-routing-key",SX_ROUTINGKEY); // 设置消息过期时间 args.put("x-message-ttl",60000); return QueueBuilder.durable(PT_QUEUEA).withArguments(args).build(); }; // 绑定队列 @Bean public Binding queueABinding(@Qualifier("queueA") Queue queueA, @Qualifier("ptExchange")DirectExchange directExchange){ return BindingBuilder.bind(queueA).to(directExchange).with(PU_ROUTINGKEY); }; //endregion //region 声明死信交换机和队列 public static final String SX_EXCHANGENAME="zd.sixing.exchange"; public static final String SX_QUEUEA="zd.sixing.queueA"; public static final String SX_ROUTINGKEY="zd.sixing.queueA"; @Bean("sxExchange") public DirectExchange sxExchange(){ return new DirectExchange(SX_EXCHANGENAME); } @Bean("sxQueueA") public Queue queuesxA(){ return new Queue(SX_QUEUEA,false); }; @Bean public Binding sxQueueABinding(@Qualifier("sxQueueA") Queue queueA, @Qualifier("sxExchange")DirectExchange directExchange){ return BindingBuilder.bind(queueA).to(directExchange).with(SX_ROUTINGKEY); }; //endregion }
模拟生产者发送消息:
@RequestMapping("/sendMsg") public String sendMsg(){ JSONObject data = new JSONObject(); data.put("id","0001"); data.put("msg","我是产生的消息"+System.currentTimeMillis()); data.put("time",DateUtil.format(new Date(), "yyyy/MM/dd HH:mm:ss")); rabbitTemplate.convertAndSend(TTLQueueConfig.PT_EXCHANGENAME,TTLQueueConfig.PU_ROUTINGKEY,data); System.out.println("产生消息:"+DateUtil.format(new Date(), "yyyy/MM/dd HH:mm:ss")); return "发送成功"; }
模拟消费者接受消息:
@RunWith(SpringRunner.class) @SpringBootTest() class RabbitmqApplicationTests { @Resource private RabbitTemplate rabbitTemplate; @Test void contextLoads() throws InterruptedException { while (true) { Thread.sleep(1000); Object msg = rabbitTemplate.receiveAndConvert(TTLQueueConfig.SX_QUEUEA); if (msg!=null){ System.out.println("收到消息:"+ DateUtil.format(new Date(), "yyyy/MM/dd HH:mm:ss")+msg); } } } }
结果:
//产生消息:2023/05/30 10:51:22 //收到消息:2023/05/30 10:52:23{"msg":"我是产生的消息1685415082519","id":"0001","time":"2023/05/30 10:51:22"}标签:SX,队列,实现,死信,queueA,new,public,延迟 From: https://www.cnblogs.com/javacx/p/17442697.html