首页 > 其他分享 >RocketMQ应用-消费幂等性问题解决

RocketMQ应用-消费幂等性问题解决

时间:2024-01-30 13:11:28浏览次数:22  
标签:消费 应用 重复 redis msgExt key 解决 logger RocketMQ

重复消费产生原因

  1. 生产者多次投递-投递时服务端接收后客户端网络原因确认失败,重新投递
  2. 消费者扩容重试-消费者扩容导致正在消费的消息没有正常应答,服务端重新推送

重复消费解决方案

  1. 给消息增加唯一key,消费时校验key是否已经消费过
  2. 消费者控制消息的幂等性(多次同样的操作结果一致)

幂等性保证方案

  1. 业务上判断重复消费是否会导致幂等性,如查询操作并不会影响操作结果,不需要处理
  2. 数据库提供重复表,每次操作前往重复表插入数据,插入成功后再消费
  3. 使用redis分布式锁的特性,消费前先将key存储在redis中,存储成功则进行消费

数据库重复表

去重表
创建数据库表,提供字段存储消息的key,并对该字段添加唯一索引;本文提供示例:

CREATE TABLE `mq_repeat_check`  (
  `mq_key` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  ## 将key设置为主键,确保唯一性
  PRIMARY KEY (`mq_key`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

业务代码添加幂等校验

@Component
@RocketMQMessageListener(
    topic = "mqRepeatTopic",
    consumerGroup = "mqRepeatConsumerGroup1",
    messageModel = MessageModel.BROADCASTING
)
public class MqRepeatTopicListener1 implements RocketMQListener<MessageExt> {

    private static final Logger logger = LoggerFactory.getLogger(MqRepeatTopicListener1.class);

    @Autowired
    private MqRepeatCheckService mqRepeatCheckService;

    @Override
    public void onMessage(MessageExt msgExt) {
        // 执行本地事务逻辑,返回事务状态
        try {
            String bodyStr = new String(msgExt.getBody());
            logger.info("【重复消费验证1】消息消费开始:{}", bodyStr);

            /*
             * 数据库重复表验证方式
             */
            Integer num = mqRepeatCheckService.addMqMsg(msgExt.getKeys());
            if (num < 1) {
                logger.info("【重复消费验证1】消息已经消费");
                return;
            }

            logger.info("【重复消费验证1】消息消费结束");
        } catch (Exception e) {
            logger.error("消息消费失败!", e);
            throw e;
        }
    }
}

redis分布式锁

利用redis中set命令的NX特性,将消息key存储在redis中,存储成功则进行消费
示例代码:

@Component
@RocketMQMessageListener(
    topic = "mqRepeatTopic",
    consumerGroup = "mqRepeatConsumerGroup2",
    messageModel = MessageModel.BROADCASTING
)
public class MqRepeatTopicListener2 implements RocketMQListener<MessageExt> {

    private static final Logger logger = LoggerFactory.getLogger(MqRepeatTopicListener2.class);

    @Autowired
    private RedisUtils redisUtils;

    @Override
    public void onMessage(MessageExt msgExt) {
        // 执行本地事务逻辑,返回事务状态
        try {
            String bodyStr = new String(msgExt.getBody());
            logger.info("【重复消费验证2】消息消费开始:{}", bodyStr);

            /*
             * redis分布式锁验证方式
             */
            boolean locked = redisUtils.setnx(msgExt.getKeys(),60*60*24);
            if (!locked) {
                logger.info("【重复消费验证1】消息已经消费");
                return;
            }

            logger.info("【重复消费验证2】消息消费结束");
        } catch (Exception e) {
            logger.error("消息消费失败!", e);
            throw e;
        }
    }
}

标签:消费,应用,重复,redis,msgExt,key,解决,logger,RocketMQ
From: https://www.cnblogs.com/zly1015/p/17996886

相关文章

  • 一个word的样式应用到另一个word
    参考:如何将一个word文档的样式应用到另一个word文档?_百度知道(baidu.com)......
  • burpsuite抓取修改http和https流量(proxy模块的简单应用)
    一、操作环境目标机:DVWA网站操作机:BurpSuite Prov2.1;FireFox浏览器二、操作步骤1.设置BP代理服务端口代理--选项--监听器(选项卡) 为什么不用8080?因为Tomcat默认端口和BP的默认监听端口一致,同时打开会导致端口冲突。2.设置Fir......
  • 软件测试|探索多态在程序设计中的奇妙应用与巧妙技巧!
    测试管理班是专门面向测试与质量管理人员的一门课程,通过提升从业人员的团队管理、项目管理、绩效管理、沟通管理等方面的能力,使测试管理人员可以更好的带领团队、项目以及公司获得更快的成长。提供1v1私教指导,BAT级别的测试管理大咖量身打造职业规划。多态是面向对象编程中......
  • 一次因PageHelper引起的多线程复用问题的排查和解决 | 京东物流技术团队
    A、ProblemDescription1\.PageHelper方法使用了静态的ThreadLocal参数,在startPage()调用紧跟MyBatis查询方法后,才会自动清除ThreadLocal存储的对象。2\.当一个线程先执行了A方法的PageHelper.startPage(intpageNum,intpageSize)后,在未执行到SQL语句前,因为代码抛异常而提前结束......
  • SQL变量数据加工在Java规则引擎中的应用案例分析
    SQL变量加工SQL加工背景,在决策配置过程中,一些复杂的逻辑或模型可通过自定义SQL脚本编写创建数据变量,通过SQL脚本可以便捷的从数据库中取数,并且自定义SQL支持传参,可满足更复杂多变的数据加工处理。注意,SQL变量加工和算子编排加工的方式不同,SQL变量加工依赖于对应数据源的服务器的性......
  • 一次因PageHelper引起的多线程复用问题的排查和解决 | 京东物流技术团队
    A、ProblemDescription1.PageHelper方法使用了静态的ThreadLocal参数,在startPage()调用紧跟MyBatis查询方法后,才会自动清除ThreadLocal存储的对象。2.当一个线程先执行了A方法的PageHelper.startPage(intpageNum,intpageSize)后,在未执行到SQL语句前,因为代码抛异常而提前结束......
  • 在K8S中,什么是有状态应用和无状态应用?
    在Kubernetes(简称K8S)中,有状态应用(StatefulApplication)和无状态应用(StatelessApplication)是根据应用程序对数据处理方式以及实例间的关系来区分的两种类型:无状态应用(StatelessApplication):数据处理:无状态应用不保存任何与客户端会话相关的持久化数据,每个请求的处理独立......
  • 最全的项目部署+持续集成解决方案:Jenkins + git + docker
    最全的项目部署+持续集成解决方案:Jenkins+git+docker:https://blog.csdn.net/m0_45806184/article/details/126408527?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-126408527-blog-128137274.235^v43^control&spm=1001.21......
  • win11家庭版没有本地用户和组怎么办 win11家庭版没有本地用户和组解决方法
    https://www.xtzjcz.com/pc/247809.htmlwin11家庭版没有本地用户和组怎么办呢,当用户发现自己的win11的系统是家庭版本的时候并且没有本地用户和组的开启功能的话要怎么操作才能拥有呢,其实这个功能在win11上是有的只不过一般家庭版不让用,下面就是关于win11家庭版没有本地用户和组......
  • 论文推荐:大语言模型在金融领域的应用调查
    这篇论文总结了现有LLM在金融领域的应用现状,推荐和金融相关或者有兴趣的朋友都看看论文分为2大部分:1、作者概述了使用llm的现有方法包括使用零样本或少样本的预训练模型,对特定于领域的数据进行微调,还有从头开始训练定制llm,并给出了关键模型的总结与评价。2、根据给定的用例、......