概念
消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。
默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:
我们需要将队列和消息都标 记为持久化。
代码实现
在声明队列的时候通过参数的方式将其队列声明为可持久化。
在发送消息时传入:MessageProperties.MINIMAL_PERSISTENT_BASIC
参数标识为持久化消息
/**
* 生成一个队列
* 1、队列名称
* 2、队列里面的消息是否持久化 默认情况下 消息存储在内存中。
* 3、该队列是否只供一个消费者进行消费,是否进行消息的共享,true--可以多个消费者消费。反之不可(默认)
* 4、表示是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除,true----自动删除,反之不删除
* 5、其他参数;例如,延迟消息,死期消息等
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
# 发送消息时,通过参数MessageProperties.PERSISTENT_TEXT_PLAIN标识为持久化消息
//发消息
String message = " hello world";
/**
* 发送一个消息
* 1、发送到那个交换机
* 2、路由的key值是哪个,本次是队列的名称
* 3、其他参数信息 MessageProperties.PERSISTENT_TEXT_PLAIN :表示消息实现持久化
* 4、发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没 有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要 更强有力的持久化策略,就需要用到发布确认模式。
发布确认
发布确认是一个保证RabbitMQ 可靠性的一个机制
保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。
生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会指定一个唯一的ID,一旦消息投递到队列中,就是发送成功了,broker会立刻发送一个确认ack 给生产者,这个时候,生产者就知道消息已经发送成功了。
如果队列和信息是持久化的,那么确认消息会在将消息写入磁盘之后再发出,broker返回的确认包含 确认消息的序列号,还可以设置 multiple,表示此序号前的所有消息都得到了处理。
一旦发布消息,生产者等待确认的同时继续发送下一条消息,如果rabbitMq自身内部错误导致消息为发送成功,生产者就可以再回调方法中继续处理。
保证消息不丢失:
- 设置要求队列必须持久化
channel.queueDeclare(QUEUE_NAME,isLasting,false,false,null);
- 设置要求队列中的消息必须持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.MINIMAL_PERSISTENT_BASIC,strMessage.getBytes());
- 发布确认
channel.confirmSelect();
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布 确认,都需要在 channel 上调用该方法
在mq的发布确认当中有三种策略
- 单个确认发布
- 批量确认发布
- 异步确认发布
单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。
/**
* 发布确认模式----单个发布确认
*/
public class ConfirmMessageProducre {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
// 开启发布确认
channel.confirmSelect();
String queueName = UUID.randomUUID().toString();
// 构建队列,表示构建一个queueName名字的队列,不持久化,排他队列,自动删除,不携带参数
channel.queueDeclare(queueName,false,false,true,null);
// 开始时间
Long beginTime = System.currentTimeMillis();
// 发送消息
for (int i = 0 ; i < 1000 ; i ++){
String strString = String.format("%s", i);
channel.basicPublish("",queueName,false,null,strString.getBytes(StandardCharsets.UTF_8));
// 等待确认: 表示确认存储到磁盘
boolean isSuccess = channel.waitForConfirms();
if (isSuccess){
System.out.println("发送消息成功,消息为:"+strString);
}
}
// 结束时间
Long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"单独确认消息,耗时"+(endTime-beginTime)+"ms"); // 用时102707ms
}
}
批量确认发布
先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。
发布1000条消息,每发送100条确认一次
/**
* 批量确认
*/
public class BatchProducre {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
// 开启发布确认
channel.confirmSelect();
String queueName = UUID.randomUUID().toString();
// 构建队列,表示构建一个queueName名字的队列,不持久化,排他队列,自动删除,不携带参数
channel.queueDeclare(queueName,false,true,true,null);
// 开始时间
Long beginTime = System.currentTimeMillis();
// 发送消息
for (int i = 0 ; i < 1000 ; i ++){
String strString = String.format("%s", i);
channel.basicPublish("",queueName,false,null,strString.getBytes(StandardCharsets.UTF_8));
// 每次发布100次消息进行依次确认
if (i%100==0){
boolean isSuccess = channel.waitForConfirms();
if (isSuccess){
System.out.println("发送消息成功");
}
}
}
// 结束时间
Long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"单独确认消息,耗时"+(endTime-beginTime)+"ms"); // 用时 1235 ms
}
}
异步发布确认
生产者发送消息与 接收确认这两个步骤不是同步的,是异步的,生产者只管发送,同时使用监听(addConfirmListener)返回的确认,对成功确认、失败确认两种情况分别进行处理。非常高效且安全
开启确认模式
- 声明确认成功的callback
- 声明确认失败的callback
- 开启确认监听 addConfirmListener() ,设置callback
- 信道发送消息,不需要额外设置接收waitForConfirm什么的
思路:
- 在发消息的时候记录下要发送消息的总和
- 在确认消息成功的回调函数上,通过 总和 - 成功 = 未发生的消息
- 在确认消息未成功的回调函数上,打印一下未确认的数据
两个回调函数,成功和失败的回调函数如下:
// 准备消息的监听器-----监听哪些消息成功了,哪些消息失败了。
// 消息确认成功的回调函数。
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
// 如果是批量,那么就直接删除
if (multiple){
//2.删除已经确认的消息 剩下的就是未确认的消息
// 通过 deliveryTag
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear(); // 删除
}else {
// 不是批量,那么就单个删除
outstandingConfirms.remove(deliveryTag); // 一般情况下都是单个,因为批量有可能导致消息丢失
}
System.out.println("确认的消息--->编号"+deliveryTag);
};
//消息确认失败的回调函数。
//deliveryTag 消息的编号 multiple:是否是批量确认
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
//3.打印一下未确认的消息都有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息--->编号"+deliveryTag+"内容是:"+message);
};
完整代码实现:
/**
* 异步发布
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
// 声明队列名称
String queueName = UUID.randomUUID().toString();
// 声明队列--> 不持久化,排他队列,关闭后自动删除,不携带其他参数
channel.queueDeclare(queueName,false,true,true,null);
// 开启确认发布
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表 适用于高并发的情况下
* 1、轻松的将序号于消息进行关联
* 2、轻松的批量删除内容数据 只要给到序号
* 3、支持高并发
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
// 准备消息的监听器-----监听哪些消息成功了,哪些消息失败了。
// 消息确认成功的回调函数。
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
// 如果是批量发布
if (multiple){
//2.删除已经确认的消息 剩下的就是未确认的消息
// 通过 deliveryTag
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear(); // 删除
}else {
// 单个删除
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息--->编号"+deliveryTag);
};
//消息确认失败的回调函数。
//deliveryTag 消息的编号 multiple:是否是批量确认
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
//3.打印一下未确认的消息都有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息--->编号"+deliveryTag+"内容是:"+message);
};
// 1.监听哪些消息成功了, 2监听哪些消息失败了
channel.addConfirmListener(ackCallback,nackCallback);
long beginTime = System.currentTimeMillis();
for (int i = 0 ; i < 1000 ; i ++){
String message = "消息"+i;
channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
//1.此处记录下 所有要发送的消息 ----- 消息的总和,getNextPublishSeqNo,在确认模式下返回下一个消息的序列号
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"单独确认消息,耗时"+(endTime-beginTime)+"ms"); // 用时 209 毫秒
}
}
以上 3 种发布确认速度对比
单独发布消息
- 同步等待确认,简单,但吞吐量非常有限。
批量发布消息
- 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题。
异步处理:
- 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些