这是RabbitMQ消息从生产者到消费者的流程。
从图中可以看出消息可能在以下几个地方丢失
- 生产者处丢失:消息没有正确到达RabbitMQ的交换机。 解决策略:confirm机制
- RabbitMQ本身将消息丢失:因为一些原因导致RabbitMQ重启,导致内存中的消息丢失。 解决策略:消息持久化
- 消费者处丢失:消费者拿到消息之后还没有消费完就宕机了,导致消息丢失。 解决策略:手动ack
今天先说一个confirm机制和手动ack,持久化下次再说
消息会经过三次传输 - 从生产者到交换机
- 从交换机到队列
- 从队列到消费者
RabbitMQ也提供了对应的机制来保证这三次消息传输的可靠性,如下图
- 开启confirm机制后为每条消息分配一个唯一id,当生产者发送消息后RabbitMQ回调一个confirm方法,返回消息的id和是否成功发送的标识以及失败原因,来让我们消息是否成功发送,如果失败进行相应的处理
- 当消息从交换机没有正确到达队列时会回调一个return方法,返回消息、应答编码、失败原因、交换机以及路由键。回调后我们进行相应的处理。如果成功到达队列则不会回调。
- 默认是自动ack的,当消息到达消费者rabbitMQ就把消息删除了,这样是最高效的,但如果消费者因为某些原因没有正确消费消息,那么这条消息就丢失了。手动ack是指消费者正确处理完消息之后返回给rabbitMQ一个ack状态,告诉RabbitMQ可以把这条消息删除了,这样就保证了消息不会在消费者处丢失。
话不多说,上代码
创建工程,配置文件添加rabbitmq的配置
server:
port: 8082
spring:
rabbitmq:
username: guest
password: guest
port: 5672
host: localhost
publisher-confirm-type: correlated #设置发送者确认confirm机制
publisher-returns: true #消息未从交换机正确到达队列时发送回调
listener:
simple:
acknowledge-mode: manual #指定消息确认模式为手动确认
RabbitMQ配置类
@Component
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "message_confirm_exchange";
public static final String QUEUE_NAME = "message_confirm_queue";
private static final String ROUTING_KEY = "user.#";
@Bean
private TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
private Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
private Binding bindingTopic(){
return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTING_KEY);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置mandatory为true才能让消息没有到达队列时回调return方法,如果不设置的话默认为false,当消息没有到达队列就直接丢弃了
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
注册confirm和return回调
@Component
@Slf4j
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Resource
private RabbitTemplate rabbitTemplate;
//依赖注入完成后执行此方法
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 生产者确认confirm机制。消息无论是否到达交换机都会回调这个方法
* 20年12月之前的CorrelationData对象中只有一个id属性,我们需要自己绑定id和具体消息的关系来根据id找到消息
* 但20年12月之后的CorrelationData对象中多了一个returnedMessage属性,生产者在设置CorrelationData对象的消息id时可以把同时把发送的消息设置进去,这样在confirm回调中可以直接获取消息了,不用自己做绑定
* @param correlationData
* @param ack
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
//将字节数组转化为消息对象
MessageDTO messageDTO = ByteObjectConvert.byte2Object(correlationData.getReturnedMessage().getBody());
if (ack){
log.info("confirm,发送成功-----消息:{},id:{}",messageDTO,correlationData.getId());
}else{
log.info("confirm,发送失败-----消息:{},id:{},失败原因:{}",messageDTO,correlationData.getId(),s);
//发送失败,将失败消息存入数据库。入库后可以使用定时任务去扫描发送失败的消息进行重发
//获取要发送的交换机,路由键。在生产者发送消息时我自己定义了一个map用于存储消息要发送到的交换机和路由键,用于在confirm失败时获取然后进行相应处理
String exchangeAndRoutingKey = DataContainer.getExchangeAndRoutingKey(correlationData.getId());
String[] split = exchangeAndRoutingKey.split(",");
String exchange = split[0];
String routingKey = split[1];
log.info("\n 交换机:{},路由键:{}",exchange,routingKey);
//入库后删除map中的数据
DataContainer.del(correlationData.getId());
}
}
/**
* 如果消息未从交换机正确到达队列会回调这个方法,正确到达不会回调这个方法
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
byte[] body = message.getBody();
//将消息的字节数组转为对象
MessageDTO o = ByteObjectConvert.byte2Object(body);
//存入数据库
log.info("returns-----:{}\n replyCode:{},replyText:{},exchange:{},routingKey:{}",o,replyCode,replyText,exchange,routingKey);
//入库后删除map中的数据
String msgID = message.getMessageProperties().getCorrelationId();
DataContainer.del(msgID);
}
}
定义一个用于存储消息要发送到的交换机和路由键的map
/*
* @Description TODO (保存发送消息的交换机和路由键,为了在confirm为false时获取然后入库)
* 创建人: 程长新
* 创建时间:2023/9/14 15:08
**/
@Slf4j
@NoArgsConstructor
public class DataContainer {
private static ConcurrentHashMap<String,String> map = new ConcurrentHashMap<>();
public static void saveExchangeAndRoutingKey(String key, String value){
String put = map.put(key, value);
}
public static void del(String key){
String remove = map.remove(key);
}
public static String getExchangeAndRoutingKey(String key){
return map.get(key);
}
}
随便定义一个发送消息的实体
/**
* 因为要作为消息进行发送,所以这个实体必须是可序列化的。所以实现Serializable接口
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {
private int id;
private String msg;
}
自己定义一个byte数组与Object转换的工具类
/**
* byte数组与对象转换的工具类
*/
public class ByteObjectConvert {
public static byte[] object2Bytes(MessageDTO messageDTO) {
byte[] messageDTOBytes;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = null;
try {
objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(messageDTO);
messageDTOBytes = byteArrayOutputStream.toByteArray();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
try {
byteArrayOutputStream.close();
objectOutputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return messageDTOBytes;
}
public static MessageDTO byte2Object(byte [] body) {
// byte[] body = correlationData.getReturnedMessage().getBody();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
ObjectInputStream objectInputStream = null;
MessageDTO messageDTO = null;
try {
objectInputStream = new ObjectInputStream(byteArrayInputStream);
messageDTO = (MessageDTO) objectInputStream.readObject();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
try {
byteArrayInputStream.close();
objectInputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return messageDTO;
}
}
生产者发送消息
@Slf4j
@RestController
public class Producer {
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
// @ResponseBody
public String sendMessage(){
for (int i = 0; i < 3; i++) {
String correlationId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(correlationId);
String s = "message" + i;
MessageDTO messageDTO = new MessageDTO(i, s);
log.info("消息{}的id{}",s,correlationData.getId());
//将要发送的消息转为字节数组用于构建Message对象
byte[] messageDTOBytes = ByteObjectConvert.object2Bytes(messageDTO);
//构建Message对象时同时将唯一id correlationId设置进去
Message build = MessageBuilder.withBody(messageDTOBytes).setCorrelationId(correlationId).build();
//将发送的消息设置到CorrelationData对象中,可以直接在confirm回调方法中获取Message消息
correlationData.setReturnedMessage(build);
if (i == 1){
//为了保证消息confirm为false时还能拿到发送到的交换机和路由键,将消息id作为键,交换机和路由键作为值存入map
DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),"abc" + "," + "user.add");
// rabbitTemplate.convertAndSend("abc","user.add",messageDTOBytes,correlationData);//如果设置了MessageProperties,那么发送的消息就要构造成Message对象,否则设置的MessageProperties不起作用,因为如果不是Message对象,底层发送的时候还是会将对象构造为Message对象并创建一个空的MessageProperties对象,所以自己设置的会不起作用
rabbitTemplate.convertAndSend("abc","user.add",build,correlationData);//错误的交换机名称,用于模拟消息没有正确到达交换机的情况
}else if (i == 2){
DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),RabbitMQConfig.EXCHANGE_NAME + "," + "abc");
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "abc", build, correlationData);//写一个匹配不到的路由键,用于模拟消息没有从交换机到达队列的情况
}else {
DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),RabbitMQConfig.EXCHANGE_NAME + "," + "user.add");
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "user.add", build, correlationData);
}
log.info("生产者发送消息:{}",build);
}
return "发送成功";
}
}
消费者监听队列处理消息--开启手动ack
@Component
@Slf4j
public class Consumer01 {
@RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME})
public void receiveMessage01(MessageDTO msg, Message message, Channel channel){
// public void receiveMessage01(Message message, Channel channel){
// MessageDTO msg = ByteObjectConvert.byte2Object(message.getBody());
try {
/*if ("message1".equals(msg)){
int i = 1/0;
}*/
log.info("消费者01成功接收到消息:{}",msg);
//rabbitmq只有收到的消费者发送的ack消息,才会将消息删除,否则不会删除
//第一个参数表示该消息的index,第二个参数表示是否批量确认,批量的话小于这个tag的都会ack掉
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
log.error("消费者01处理消息:{},发生异常:{}",msg,e.getMessage());
try {
//如果消息没有被拒绝过就将消息重新入队,如果被拒绝过一次一直接丢弃
if (message.getMessageProperties().getRedelivered()){
//basicReject与basicNack一样,只不过basicNack可以批量处理,basicReject只能处理单个
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("消费者01丢弃消息:{}",msg);
}else {
//参数2为批量标识,如果设为true的话小于这个index的消息都将被确认
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
log.info("消费者01拒绝消息:{}",msg);
}
} catch (IOException ex) {
log.error("拒绝消息时发生异常:{}",ex.getMessage());
e.printStackTrace();
}
e.printStackTrace();
}
}
}
标签:可靠性,String,confirm,RabbitMQ,交换机,保证,消息,correlationData,public
From: https://www.cnblogs.com/ccx-lly/p/17704094.html