1、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、编写连接rabbitmq的配置
3、开启rabbitmq功能
4、rabbitmq使用json序列化机制并设置回调
package com.gulimall.ware.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* json序列化格式
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
*/
@PostConstruct //在MyRabbitConfig对象创建完成后执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
}
});
//设置消息发送到队列的回调(只有发送失败才会回调)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息【没有】投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
}
});
}
}
5、编写创建队列、交换机、绑定的配置类
package com.gulimall.ware.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 创建队列、交换机、绑定
*/
@Configuration
public class MyRabbitMqConfig {
/**
* 监听消息【这个方法的编写是为了重启ware服务后能发现没有下面的交换机、队列、绑定,他就会自己创建】
*/
@RabbitListener(queues = "stock.release.stock.queue")
public void listener(Message message){
//不做任何处理
}
/**
* 交换机
* @return
*/
@Bean
public Exchange stockEventExchange(){
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("stock-event-exchange",true, false);
}
/**
* 普通队列
*/
@Bean
public Queue stockReleaseStockQueue(){
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
return new Queue("stock.release.stock.queue", true, false, false);
}
/**
* 死信队列
* @return
*/
@Bean
public Queue orderDelayQueue(){
/**
* 配置参数
* x-dead-letter-exchange: stock-event-exchange
* x-dead-letter-routing-key: order.release
* x-message-ttl: 60000
*/
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");//死信路由到order-event-exchange交换机
arguments.put("x-dead-letter-routing-key", "stock.release");//死信路由键
arguments.put("x-message-ttl", 120000);//消息过期时间(2min)
/**
* String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*/
Queue queue = new Queue("stock.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 绑定
*/
@Bean
public Binding stockReleaseBinding(){
/**
* String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
*/
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null);
}
/**
* 绑定
*/
@Bean
public Binding stockDelayBinding(){
/**
* String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
*/
return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null);
}
}
标签:String,exchange,springframework,---,交换机,org,import,119,stock
From: https://www.cnblogs.com/morehair/p/17149955.html