一.死信队列
1.Config配置类
package com.yufou.studyrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: YuFou
* @Description: ttl队列
* @Date: Created in 16:18 2023/7/9
*/
@Configuration
public class TtlQueueConfig {
//普通交换机
public static final String X_EXCHANGE = "X";
//死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String QUEUE_C = "QC";
//死信队列
public static final String DEAD_LETTER_QUEUE_D = "QD";
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("qa")
public Queue qa(){
return QueueBuilder
.durable(QUEUE_A)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.ttl(10000) //(创建Queue时设置ttl延迟时间)
.build();
}
@Bean("qb")
public Queue qb(){
return QueueBuilder
.durable(QUEUE_B)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.ttl(40000)
.build();
}
@Bean("qc")
public Queue qc(){
return QueueBuilder
.durable(QUEUE_C)
.deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.build();
}
@Bean("qd")
public Queue qd(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
}
@Bean
public Binding queueABingX(@Qualifier("qa") Queue qa,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(qa).to(xExchange).with("XA");
}
@Bean
public Binding queueBBingX(@Qualifier("qb") Queue qb,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(qb).to(xExchange).with("XB");
}
@Bean
public Binding queueCBingX(@Qualifier("qc") Queue qc,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(qc).to(xExchange).with("XC");
}
@Bean
public Binding queueDBingY(@Qualifier("qd") Queue qd,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(qd).to(yExchange).with("YD");
}
}
2.Controller发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable("message") String msg) {
log.info("当前时间:{},发送一条消息给两个TTL队列,消息是:{}",new Date(),msg);
rabbitTemplate.convertAndSend("X","XA","消息来自延迟10秒TTL队列:"+msg);
rabbitTemplate.convertAndSend("X","XB","消息来自延迟40秒TTL队列:"+msg);
}
@GetMapping("/sendMsg/{message}/{ttl}")
public void sendMessageTTL(@PathVariable("message") String msg,
@PathVariable("ttl") String ttl) {
log.info("当前时间:{},发送一条延迟{}秒的消息给TTL队列,消息是:{}",new Date(),Integer.valueOf(ttl),msg);
rabbitTemplate.convertAndSend("X","XC",msg,message -> {
message.getMessageProperties().setExpiration(String.valueOf(Integer.valueOf(ttl)*1000));
return message;
});
}
3.Consumer消费消息
@Component
@Slf4j
public class DeadLetterConsumer {
@RabbitListener(queues = "QD")
public void receiveDDLMessage(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间为:{},接收到死信队列的消息:{}",new Date(),msg);
}
}
二.基于插件的延迟队列
1.Config配置类
package com.yufou.studyrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: YuFou
* @Description: 基于插件的延迟队列
* @Date: Created in 18:20 2023/7/9
*/
@Configuration
public class DelayQueueConfig {
//交换机
public static final String DELAYED_EXCHANGE = "delayed.exchange";
//队列
public static final String DELAYED_QUEUE = "delayed.queue";
//routingkey
public static final String DELAYED_ROUTINGKEY = "delayed.routingKey";
// CustomExchange 自定义交换机
@Bean("delayedExchange")
public CustomExchange delayedExchange() {
// Map<String, Object> arguments = new HashMap<>();
// arguments.put("x-delayed-type", "direct");
CustomExchange customExchange = new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, null);
customExchange.getArguments().put("x-delayed-type", "direct");
return customExchange;
}
@Bean("delayedQueue")
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE)
.build();
}
@Bean
public Binding delayedQueueBindDelayedExchange(){
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTINGKEY).noargs();
}
}
2.Controller发送消息
@GetMapping("/sendDelayMsg/{msg}/{delayTime}")
public void sendDelayMessage(@PathVariable("msg") String msg,
@PathVariable("delayTime") Integer delayTime){
log.info("当前时间:{},发送一条延迟{}秒的消息给delay队列,消息是:{}",new Date(),delayTime,msg);
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE,
DelayQueueConfig.DELAYED_ROUTINGKEY,
msg, message -> {
message.getMessageProperties().setDelay(delayTime);
return message;
});
}
3.Consumer消费消息
@Slf4j
@Component
public class DelayedConsumer {
@RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE)
public void receiveDelayQueueMsg(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间为:{},接收到延迟队列的消息:{}", new Date(), msg);
}
}
标签:return,String,rabbitmq,Bean,msg,message,public
From: https://www.cnblogs.com/yufou/p/17539361.html