https://blog.csdn.net/qq_35387940/article/details/100514134
RabbitMQ 概简介 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。 消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,知道接收者取回它。 Producer:消息生产者,负责生产和发送消息到Broker; Broker:消息处理中心,负责消息存储、确认、重试等; Consumer:消息消费中心,负责从Broker中获取消息并处理。
- 消息队列-特性
异步性:将耗时的同步任务通过发送消息的方式进行异步处理,减少等待时间。 松耦合:不同系统、服务之间可以通过消息队列进行通信,不用关心彼此的实现细节,数据格式一致。 分布式:为了防止消息堵塞,可以对消费者集群进行横向扩展,避免单点故障,同样队列本身也可以。 可靠性:将接收到的消息落盘,就算服务器重启或者发生故障,恢复之后也能重新加载
Message
消息,消息是不具名的,它由消息头和消息体组成 消息头,包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序
Exchange
交换器,将生产者消息路由给服务器中的队列 类型有direct(默认),fanout, topic, 和headers,具有不同转发策略
Queue
消息队列,保存消息直到发送给消费者
Binding
绑定,用于消息队列和交换器之间的关联
Connection
网络连接,比如一个TCP连接
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。
vhost 是 AMQP 概念的基础,必须在连接时指定
RabbitMQ 默认的 vhost 是 /
Broker
消息队列服务器实体
RabbitMQ 安装 [docker]
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
直接启动 没有资源会去自动下载
docker update rabbitmq --restart=always 自动启动
http://192.168.56.10:15672/#/
账号 密码 guest
https://blog.csdn.net/J080624/article/details/80943312
RabbitMQ提供了四种Exchange模式
RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。 header模式在实际使用中较少,这里只讨论前三种模式.
fanout 模式#
fanout 模式就是广播模式~,消息来了,会发给所有的队列~
测试广播模式: 先在交换机创建 fanout模式的交换机,命名为 my.fanout.exchange,然后再到队列创建多个队列,再到交换机绑定队列,fanout可以不设置路由key,因为这个是广播模式的,最后发消息测试。
Direct 模式#
Direct 模式就是指定队列模式, 消息来了,只发给指定的 Queue, 其他Queue 都收不到。
Topic 模式#
主题模式,注意这里的主题模式,和 ActivityMQ 里的不一样。 ActivityMQ 里的主题,更像是广播模式。 那么这里的主题模式是什么意思呢? 如图所示消息来源有: 美国新闻,美国天气,欧洲新闻,欧洲天气。 如果你想看 美国主题: 那么就会收到 美国新闻,美国天气。 如果你想看 新闻主题: 那么就会收到 美国新闻,欧洲新闻。 如果你想看 天气主题: 那么就会收到 美国天气,欧洲天气。 如果你想看 欧洲主题: 那么就会收到 欧洲新闻,欧洲天气。
这样就可以灵活搭配~
创建交换机
创建队列
绑定在一起
发消息
看消息
Nack message requeue true 继续放
Nack message requeue false该方式每次获取一条消息,获取后从队列中移除
所有队列都能收到
Spring引入RabbitMQ
1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.properties 配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
账号密码默认不用配置了
3.开启功能
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication
给配置文件中
引入amqp场景;RabbitAutoConfiguration就会自动生效
给容器中自动配置了
RabbitTempLate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
测试RabbitMQ
@SpringBootTest
class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void creatExchange() {
//创建交换机
/**String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
*/
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
}
@Test
void creatQueue() {
/**
*创建队列
* String name, 队列名字
* boolean durable, 是否持久化
* boolean exclusive, 是否排ta
* boolean autoDelete, 是否自动删除
* @Nullable Map<String, Object> arguments
*/
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
}
@Test
public void crateBinding() {
/**
* 绑定
* String destination, 目的地
* Binding.DestinationType destinationType, 目的地类型-两种不同类型
* String exchange, 交换机
* String routingKey, 路由键
* @Nullable Map<String, Object> arguments 自定义参数
*
*/
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
amqpAdmin.declareBinding(binding);
}
/**
* 发送消息
*/
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity);
log.info("消息发送完成:{}",reasonEntity);
}
}
配置发送消息转换 json
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
//接收消息
@RabbitHandler //标注在方法上(重载区分不同的消息)
//发送的是Message 是一个消息详细信息 头+体 并且可以指定类型实体类
public void reciveMessagew(Message message, OrderReturnReasonEntity content) {
//Body:'{"id":1,"name":"reason","sort":2,"status":1,"createTime":1603286841263}
byte[] body = message.getBody();//得到消息体内容
MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
//message.getClass()) 获取类型
System.out.println("接受消息" + message + content + message.getClass());
}
@RabbitListener(queues = {"hello-java-queue"}) //类+方法上 监听哪些队列
@RabbitHandler //标注在方法上(重载区分不同的消息)
RabbitMQ消息确认机制-可靠抵达
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。
publisher confirmCallback 确认模式 publisher returnCallback 未投递到 queue 退回模式 consumer ack机制
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
@GetMapping("/sendMq")
public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num) {
for (int i = 0; i < num; i++) {
if (i % 2 == 0) {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason" + i);
//CorrelationData 指定一个UUid
rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
} else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java",
orderEntity,new CorrelationData(UUID.randomUUID().toString()));
}
}
return "ok";
}
消息抵达服务器 Broker 服务器收到消息
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true (废弃)
spring.rabbitmq.publisher-confirm-type=correlated
/**
* 定制RabbitTemplate
*/
@PostConstruct //MyRabbitConfig构造器创建完成 执行调用这个方法
public void initRabbitTemplate(){
//设置一个确认回调机制
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/*
* @param correlationData 消息唯一关联id 消息唯一id
* @param b 消息是否成功收到
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
});
}
消息正确抵达队列 消息回调
#开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" +
"==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");
});
消费端确认 1、消费者获取到消息,成功处理,可以回复Ack给Broker
basic.ack 用于肯定确认;broker将移除此消息 basic.nack 用于否定确认;可以指定broker是否丢弃此消息,可以批量 basic.reject 用于否定确认;同上,但不能批量 2、默认,消息被消费者收到,就会从broker的queue中移除 3、queue无消费者,消息依然会被存储,直到消费者消费 4、消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式。
消息处理成功,ack(),接受下一个消息,此消息broker就会移除。 消息处理失败,nack()/reject(), 重新发送给其他人处理,或者容错处理后ack。 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人。
- 修改 application.properties 配置文件:
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 测试
@RabbitHandler
public void revieveMessage(Message message,
OrderReturnReasonEntity content, Channel channel) throws IOException {
System.out.println("接收到消息..."+content);
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===内容:" + content);
// Thread.sleep(3000);
// Channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag===>" + deliveryTag);
try{
// 签收货物,非批量模式
channel.basicAck(deliveryTag, false);
}catch (Exception e){
// 网络中断(突然)
}
}
标签:reasonEntity,队列,spring,rabbitmq,消息,RabbitMQ,message
From: https://blog.51cto.com/u_15993308/6409111