三 利用RabbitMQ高级特性,完善项目的可靠性
3.1 如何保证消息的可靠性
3.1.1 发送方
需要使用RabbitMQ发送端确认机制
,确认消息成功发送到RabbitMQ并被处理
需要使用RabbitMQ消息返回机制
,若没发现目标队列,中间件会通知发送方
3.1.2 消费方
需要使用RabbitMQ消费端确认机制
,确认消息没有发生处理异常
需要使用RabbitMQ消费端限流机制
,限制消息推送速度,保障接收端服务稳定
3.1.3 RabbitMQ自身
大量堆积的消息会给RabbitMQ产生很大的压力,需要使用RabbitMQ消息过期时间
,防止消息大量积压
过期后会直接被丢弃,无法对系统运行异常发出警报,需要使用RabbitMQ死信队列
,收集过期消息,以供分析
3.1.4 总结
需要引入RabbitMQ新特性,来确保消息可靠性:
发送端确认机制
消费端确认机制
消息返回机制
消费端限流机制
消息过期机制
死信队列
3.2 发送端确认机制
3.2.1 什么是发送端确认机制
消息发送后,若中间件收到消息,会给发送端一个应答
生产者接收应答,用来确认这条消息是否正常发送到中间件
3.2.2 三种确认机制
单条同步确认机制的实现方法(最优
)
配置channel,开启确认模式: channel.confirmSelect(),每发送一条消息,调用channel.waitForConfirms()方法,等待确认
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
if (channel.waitForConfirms()){
log.info("message success");
}else {
log.info("message failed");
}
}
多条同步确认机制的实现方法(不推荐)
配置channel,开启确认模式: channel.confirmSelect()
发送多条消息后,调用channel.waitForConfirms()方法,等待确认
//发送多条消息有一条确认,不推荐使用
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
if (channel.waitForConfirms()){
log.info("message success");
}else {
log.info("message failed");
}
}
异步确认机制的实现方法(不推荐)
配置channel,开启确认模式: channel.confirmSelect()
在channel上添加监听: addConfirmListener,发送消息后,会回调此方法,通知是否发送成功
异步确认有可能是单条,也有可能是多条,取决于MQ
//异步确认产生并发,问题,而且接受的应答还不在一个线程
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
ConfirmListener confirmListener = new ConfirmListener() {
@Override
/**
* deliveryTag 发送端消息序号
* multiple true多条消息 false单条消息
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("成功==deliveryTag:{},multiple:{}",deliveryTag,multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("失败==deliveryTag:{},multiple:{}",deliveryTag,multiple);
}
};
channel.addConfirmListener(confirmListener);
for (int i = 0; i < 10; i++) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
Thread.sleep(100000);
}
3.3 消息返回机制
3.3.1 消息真被路由了吗?
消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃
消息丢弃后,订单处理流程停止,业务异常
需要使用RabbitMQ消息返回机制
,确认消息被正确路由
3.3.2 消息返回机制的原理
消息发送后,中间件会对消息进行路由
若没有发现目标队列,中间件会通知发送方
Return Listener 会被调用
3.3.4 消息返回的开启方法
在RabbitMQ基础配置中有一个关键配置项: Mandatory
Mandatory若为false, RabbitMQ将直接丢弃无法路由的消息
Mandatory若为true, RabbitMQ才会处理无法路由的消息
代码实现:
餐厅微服务给订单微服务发消息,没有正确的陆游会回调俩接口都可以:ReturnListener
, ReturnCallback
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// channel.addReturnListener(new ReturnListener() {
// @Override
// public void handleReturn(int replyCode,
// String replyText,
// String exchange,
// String routingKey,
// AMQP.BasicProperties properties,
// byte[] body) throws IOException {
// log.info("消息路由失败==replyCode:{} replyCode:{} exchange:{} routingKey:{} properties:{} body:{} ",
// replyCode,replyText,exchange,routingKey,properties,new String(body)
// );
// }
// });
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
log.info("消息路由失败==replyCode:{}",returnMessage);
}
});
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant",
"key.order",
true,
null,
messageToSend.getBytes());
Thread.sleep(1000);
}
3.4 消费方确认机制
3.4.1 消费端处理异常怎么办?
默认情况下,消费端接收消息时,消息会被自动确认(ACK
)
消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
需要使用RabbitMQ消费端确认机制
,确认消息被正确处理
3.4.2 消费端ACK类型
自动ACK:消费端收到消息后,会自动签收消息
手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息
3.4.3 手动ACK类型
单条手动ACK: multiple=false
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
多条手动ACK: multiple=true
//每五条签收
if (message.getEnvelope().getDeliveryTag() % 5 ==0){
channel.basicAck(message.getEnvelope().getDeliveryTag(),true);
}
推荐使用单条ACK
3.4.4 重回队列
若设置了重回队列,消息被NACK
之后,会返回队列末尾,等待进一步被处理
一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常
//重回队列
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
3.5 消费端限流机制
3.5.1 消费端处理的过来吗?
业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃
需要使用RabbitMQ消费端限流机制
,限制消息推送速度,保障接收端服务稳定
3.5.2 RabbitMQ-QoS
针对以上问题, RabbitMQ开发了QoS (服务质量保证)功能
QoS功能保证了在一定数目的消息未被确认前,不消费新的消息
QoS功能的前提是不使用自动确认
3.5.3 QoS原理
QoS原理是当消费端有一定数量的消息未被ACK确认时,RabbitMQ不给消费端推送新的消息
RabbitMQ使用QoS机制实现了消费端限流
3.5.4 消费端限流机制参数设置
prefetchCount:针对一个消费端最多推送多少未确认消息
global: true:针对整个消费端限流false: 针对当前channel
prefetchSize : 0(单个消息大小限制,一般为0)
prefetchSize与global两项, RabbitMQ暂时未实现
没使用QoS会把消息全部推送到消费端,消费端消息全处在unacked状态,然后消费端一条一条的处理。
优点:并不一定是怕把消费者给挤爆,主要原因是,堆积到一个消费者上面,导致新的消费者也抢不过来,因为旧的消息早就推送了。
代码:
每次只推送2个ACK应答,这样其他的消费者就可以拿到了
channel.queueBind(
"queue.restaurant",
"exchange.order.restaurant",
"key.restaurant");
channel.basicQos(2);
channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
3.6 消息过期机制
3.6.1 队列爆满怎么办?
默认情况下,消息进入队列,会永远存在,直到被消费
大量堆积的消息会给RabbitMQ产生很大的压力
3.6.2 RabbitMQ的过期时间(TTL)
RabbitMQ的过期时间称为TTL (Time to Live),生存时间
RabbitMQ的过期时间分为消息TTL和队列TTL
消息TTL设置了单条消息的过期时间:代码
channel.addConfirmListener(confirmListener);
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
log.info("message send");
队列TTL设置了队列中所有消息的过期时间:代码
HashMap<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args
);
注意:args.put("x-expire", 15000);不要设置这个,这样队列就被删除了。
3.6.3 如何找到适合自己的TTL?
TTL的设置主要考虑技术架构与业务
TTL应该明显长于服务的平均重启时间
建议TTL长于业务高峰期时间
注意:不建议直接使用建议和死信一起使用
3.7 死信队列
3.7.1 如何转移过期消息?
消息被设置了过期时间,过期后会直接被丢弃
直接被丢弃的消息,无法对系统运行异常发出警报
需要使用RabbitMQ死信队列
,收集过期消息,以供分析
3.7.2 什么是死信队列
死信队列:队列被配置了DLX属性(Dead-Letter-Exchange)
当一个消息变成死信(dead message)后,能重新被发布到另一个Exchange,这个Exchange也是一个普通交换机
死信被死信交换机路由后,一般进入一个固定队列
3.7.3 死信队列设置方法
设置转发、接收死信的交换机和队列:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey:#
在需要设置死信的队列加入参数:
x-dead-letter-exchange = dlx.exchange
代码:
channel.exchangeDeclare(
"dlx.exchange",
BuiltinExchangeType.TOPIC,
true,
false,
null
);
channel.queueDeclare(
"dlx.queue",
true,
false,
false,
null
);
channel.queueBind(
"dlx.queue",
"dlx.exchange",
"#"
);
//设置死信队列
HashMap<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args
);
3.7.4 怎样变成死信
- 消息被拒绝(reject/nack)并且requeue=false(不让他重回队列)
//不重回队列
channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false);
- 消息过期(TTL到期)
args.put("x-message-ttl", 15000);
- 队列达到最大长度
args.put("x-max-length", 5);
3.8 小结
3.8.1 善用RabbitMQ高级特性
对于RabbitMQ的高级特性,要善加利用
接收端确认、死信队列是非常常用的特性
3.8.2 慎用RabbitMQ高级特性
不要无限追求高级,用上所有RabbitMQ的高级特性
重回队列、发送端确认是不常用的特性,谨慎使用
3.8.2 善用RabbitMQ管控台
管控台是RabbitMQ调试的利器
RabbitMQ高级特性多数都涉及交换机、队列的属性配置,可以在管控台确认配置是否生效
RabbitMQ高级特性很多都可以在管控台进行试验
3.8.3 本章小结
为了确保消息发送,使用了发送端确认机制
为了确保消息正确路由,使用了消息返回机制
为了保证消息正常梳理,使用了消费端确认机制
为了保证消费端稳定,使用消费端限流机制
为了中间件问题,使用过期时间机制
为了处理异常消息,使用死信机制
标签:队列,确认,笔记,学习,死信,消息,RabbitMQ,channel From: https://www.cnblogs.com/mrwyk/p/16905548.html