首页 > 其他分享 >【RocketMQ如何保证消息不丢失】

【RocketMQ如何保证消息不丢失】

时间:2024-11-08 11:47:12浏览次数:3  
标签:topic String 重试 Broker 发送 ConsumeConcurrentlyStatus 保证 丢失 RocketMQ

主要由生产者、Broker、消费者三方共同保证

1 生产者

常用发送消息分为同步发送异步发送两种(还有一种单向发送, 自行了解哈)

同步发送

消息发送会同步阻塞等待Broker返回结果。Broker确认收到消息后才会返回sendResult, 这个过程中发生异常就需要生产者重新发送。(代码片段如下)

String topic = "topic";
String tags = "tag";
String keys = "key";
String msgBody = "msgJsonStr";
Message msg = new Message(topic, tags, keys, msgBody.getBytes());
try{
	SendResult sendResult = producer.send(mag);
	//正常的业务逻辑
}cach(Exception e){
	//可以重试,也可以先存起来后续处理
}
异步发送

异步发送需要生产者重写SendCallback的onSuccess和onException方法, 用于给Broker进行回调(代码片段如下)

String topic = "topic";
String tags = "tag";
String keys = "key";
String msgBody = "msgJsonStr";
Message msg = new Message(topic, tags, keys, msgBody.getBytes());
SendResult sendResult = producer.send(mag, new SendCallback(){
	@Override
	public void onSuccess(SendResult sendResult){
		//正常的业务逻辑
	}
	@Override
	public void onException(Throwable e){
		//可以重试,也可以先存起来后续处理
	}
});

当然也可以让RocketMQ自己去重试

producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试3次
producer.setRetryTimesWhenSendAsyncFailed(3); //异步发送失败重试3次

2 Broker

默认情况, Broker在接收消息会先存到内存, 内存存储成功就直接返回给用户, 然后再异步刷盘, 这个过程中宕机就会丢数据, 所以可以将保存机制改成同步刷盘

flushDiskType=SYNC_FLUSH ##默认是ASYNC_FLUSH

通常RockerMQ都是以集群方式进行部署, Broker采用一主多从进行部署, 通过主从同步方式进行数据复制, 且一般都会进行Broker备份(RockerMQ支持Broker配置多实例), 多个Broker之间进行冗余备份, 保证数据的可靠, 默认情况, Broker接收到消息, 写入master成功就可以返回响应生产者, 接着消息会异步复制到slave节点, 这个中master挂了就会丢数据, 所以可以将同步机制改为同步复制方式 (master将数据同步到slave节点后再返回给生产者结果)

brokerRole=SYNC_MASTER ##默认是ASYNC_MASTER

3 消费者

设置手动提交, 消费完手动返回ack给Bocker, 在业务逻辑处理完后返回 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS, 注意要做好幂等避免重复消费

DefaultMQPushConsumer consume = new DefaultMQPushConsumer("groupName");
consume.setNamesrvAddr("nameserver:9898");
....//各种配置赋值
consume.registerMessageListener(new MessageListenerConcurrently(){
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs...){
		for(MessageExt msg : msgs){
			String msgId = msg.getMsgId();
			if(isDeal(msgId)){
				//重复消费
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
			try{
				//业务逻辑
				....
				//业务处理完手动提交
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}cache(Exception ex){
				//业务处理失败, 要重试
				return ConsumeConcurrentlyStatus.CONSUME_LATER;
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	}
});
consume.start();

标签:topic,String,重试,Broker,发送,ConsumeConcurrentlyStatus,保证,丢失,RocketMQ
From: https://blog.csdn.net/weixin_44541808/article/details/143570330

相关文章

  • Kafka 如何保证消息不丢失?【消息手动 ACK】
    前言:Kafka作为一个MQ它肯定会有消息丢失的场景,那我们如何做到让Kafka的消息不丢失呢?本篇我们来剖析一下Kafka如何做到消息不丢失。Kafka系列文章传送门Kafka简介及核心概念讲解SpringBoot整合Kafka详解Kafka@KafkaListener注解的详解及使用Kafka客户......
  • iedkcs32.dll文件丢失如何是好?详解找回IE浏览器加密DLL文件的方案
    在使用InternetExplorer(IE)浏览器时,有时可能会遇到iedkcs32.dll文件丢失的问题。这个文件是IE浏览器的一个重要组成部分,负责处理加密和解密任务,确保浏览器的安全通信。一旦iedkcs32.dll文件丢失,IE浏览器可能无法正常工作,甚至可能无法启动。那么,当iedkcs32.dll文件丢失时,我们应......
  • RabbitMQ如何保证发送的消息可靠(RabbitMQ的Confirm模式和2.Return模式)
    RabbitMQ如何保证发送的消息可靠(RabbitMQ的Confirm模式和2.Return模式)1、RabbitMQ消息Confirm模式(保证从生产者到交换机的消息可靠)1.1、Confirm模式简介1.2、具体代码实现1.2.1、application.yml开启确认模式1.2.2、生产者方式1:实现RabbitTemplate.ConfirmCallback生产......
  • KbdHebL3.dll文件丢失危机?解决KbdHebL3.dll缺失问题:希伯来语键盘布局组件恢复全攻略
    在使用计算机时,有时可能会遇到KbdHebL3.dll文件丢失的问题。KbdHebL3.dll是一个与希伯来语键盘布局相关的动态链接库(DLL)文件,它负责处理希伯来语键盘布局和输入法转换的代码。一旦该文件丢失,将会导致希伯来语键盘布局无法正常使用。以下是一份详细的恢复全攻略,帮助用户解决KbdHe......
  • 万象网管2004报错提示:taxinject.dll文件丢失,如何修复并启动Server.exe
    在使用万象网管2004这款网吧管理软件时,部分用户可能会遇到启动Server.exe时系统提示“taxinject.dll文件丢失”的错误。这一错误通常会导致Server.exe无法正常启动,从而影响网吧管理系统的正常运行。本文将详细介绍如何修复这一错误,并成功启动Server.exe。一、了解taxinject.d......
  • 上古卷轴msvcp120.dll文件丢失怎么办?上古卷轴游戏msvcp120.dll缺失问题的高效解决方案
    对于热爱《上古卷轴》系列游戏的玩家来说,遇到游戏因msvcp120.dll文件丢失而无法运行的情况无疑是一个令人沮丧的问题。这个动态链接库文件(DLL)是MicrosoftVisualC++RedistributablePackage中的一个关键组件,对于游戏的正常运行至关重要。一旦这个文件缺失,游戏可能无法加载必......
  • rsmhook64.dll文件丢失不再烦恼,这些方法帮你轻松解决
    rsmhook64.dll文件的缺失可能会导致某些应用程序无法正常运行。这里提供几种可能的解决方法,帮助您修复这个问题:1.重新安装相关软件如果rsmhook64.dll文件是某个特定程序的一部分,尝试卸载并重新安装该程序。这通常可以解决由于文件损坏或丢失引起的大多数问题。2.使用......
  • 《古剑奇谭网络版》qt5widgets.dll文件丢失全方位解决方法大全
    《古剑奇谭网络版》是一款大型多人在线角色扮演游戏。如果在运行该游戏时遇到qt5widgets.dll文件丢失的错误提示,这通常意味着你的系统缺少了Qt框架中的一个必要组件。Qt是一个跨平台的应用程序开发框架,广泛用于图形用户界面(GUI)的创建。要解决qt5widgets.dll文件丢失的问题,你可......
  • 基于kylin-v10的RocketMQ双主双从搭建
    环境:两台服务器(虚拟机)1、下载RocktMQrocketmq-all-5.3.0-bin-release.zip2、上传服务器后,解压安装包unziprocketmq-all-5.3.0-bin-release.zip3、进入到bin目录,修改jvm参数(结合服务器的配置去修改)cd /opt/rocketmq-all-5.3.0-bin-release/bin4、修改runbroker.shvi......
  • Kafka 消息丢失如何处理?
    今天给大家分享一个在面试中经常遇到的问题:Kafka消息丢失该如何处理?这个问题啊,看似简单,其实里面藏着很多“套路”。来,咱们先讲一个面试的“真实”案例。面试官问:“Kafka消息丢失如何处理?”小明一听,反问:“你是怎么发现消息丢失了?”面试官顿时一愣,沉默了片刻后,可能......