首页 > 其他分享 >消息幂等(去重)解决方案

消息幂等(去重)解决方案

时间:2023-11-13 16:24:58浏览次数:21  
标签:事务 消费 解决方案 投递 插入 消息 RocketMQ

一、场景

       程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

       基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。

二、解决方案

       2.1、排它锁 for update

               把select 改成 select for update语句,用排他锁锁定。

select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务
if(order.status != null) {
    return ;//消息重复,直接返回
}

               缺点:

                        1、仅适用于InnoDB,且必须在事务块(BEGIN/COMMIT)中才能生效。在进行事务操作时,通过“for update”语句,MySQL会对查询结果集中每行数据都添加排他锁,其他线程对该记录的更新与删除操作都会阻塞。排他锁包含行锁、表锁。

                        2、引入了事务包裹而导致整个消息消费可能变长,并发度下降。

       2.2、乐观锁 

               更新订单状态采取乐观锁,更新失败则消息重新消费之类。

       2.3、消息表

                基于关系型数据库插入消息表,即消息表+本地事务保证幂等。 数据库中增加一个消息消费记录表(幂等表),把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。   

               1、开启事务

               2、插入消息表(处理好主键冲突问题)

               3、更新订单表(原消费逻辑)

               4、提交事务

            【说明】:

                1、这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。

                2、如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。

             【缺点】:

                  1、消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。

                  2、数据库的数据必须是在一个库,跨库无法解决。

             【注意】:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。

         2.4、基于消息幂等表的非事务方案

                 

                    

          以上是去事务化后的消息幂等方案的流程,此方案是无事务的,是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)。
        【风险】:问题出在并发场景,在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。

         【处理风险】:插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序自行实现)。

 

                        

       2.5、更灵活的消息表存储媒介

                我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:

                1、性能上损耗更低;

                2、超时时间可以直接利用Redis本身的ttl实现。

                当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

三、建议

        1、消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了;

        2、消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试;

        3、一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等);

        4、在第3部做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好第1步的回滚,使得下次重试消费成功。

 

          

          

 

标签:事务,消费,解决方案,投递,插入,消息,RocketMQ
From: https://www.cnblogs.com/xiaobaicai12138/p/17829362.html

相关文章

  • RocketMQ【消息丢失】
    一、流程图         二、消息丢失场景      2.1、场景1中生产者将消息发送给RocketMQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失。   2.2、场景2中消息需要持久化到磁盘中,这时会有两种情况导致消息丢失:     ......
  • 3分钟白话RocketMQ系列—— 如何消费消息
    =白话3分钟,快速了解RocketMQ如何消费消息。看完如果不了解,欢迎来打我。我们知道RocketMQ主要分为消息生产、存储(消息堆积)、消费三大块领域。前面已经介绍了生产消息、存储消息两大块内容,那接下来,我们白话一下RocketMQ是如何消费消息的,揭秘消息消费全过程。注意,如果白话中不小......
  • 火山引擎 DataLeap 计算治理自动化解决方案实践和思考
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群【导读】本文旨在探讨火山引擎DataLeap在处理计算治理过程中所面临的问题及其解决方案,并展示这些解决方案带来的实际收益。主要内容包括:探讨面临的痛点和挑战提供自动化的解决方案分析实践效果......
  • 3分钟白话RocketMQ系列—— 如何存储消息
    白话3分钟,快速了解RocketMQ如何存储消息。看完如果不了解,欢迎来打我。我们知道RocketMQ主要分为消息生产、存储(消息堆积)、消费三大块领域。那接下来,我们白话一下,RocketMQ是如何存储消息的,揭秘消息存储全过程。注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ4.9.4......
  • 火山引擎 DataLeap 计算治理自动化解决方案实践和思考
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 【导读】本文旨在探讨火山引擎DataLeap在处理计算治理过程中所面临的问题及其解决方案,并展示这些解决方案带来的实际收益。主要内容包括:探讨面临的痛点和挑战提供自动化的解决方案......
  • 视频监控/视频融合平台EasyCVR助力构筑智慧电力解决方案
    安防视频监控平台EasyCVR是一个具有强大拓展性、灵活的视频能力和轻便部署的平台。它支持多种主流标准协议,包括国标GB28181、RTSP/Onvif、RTMP等,还可以支持厂家的私有协议和SDK接入,例如海康Ehome、海大宇等设备的SDK。该平台不仅拥有传统安防视频监控的功能,还具备接入AI智能分析的......
  • 量化交易APP开发解决方案
    一、项目背景量化软件的需求日益的增加,开发一款量化交易app软件显得特别的钟涛,该软件的能够快速帮助用户获得市场的重要信息,通过分析得到相关的商机,制定相关的交易策略,提高交易的利润。二、功能设计1.数据获取:量化数据的获取,通过市场的接口,大数据的分析软件对接,得到市场的变化规......
  • 内网穿透解决方案之frp
    该方法需要有一台外网可以访问的固定ip机器,一般可以是云服务器windows、linux、macOS都支持从GitHub上下载frp其中包含客户端与服务端执行文件和配置文件服务端frpsfrps.toml客户端frpcfrpc.toml在云服务器上配置与运行服务端(以linux为例)以最简单的方式,服务端只......
  • opencv多目标跟踪算法报错与解决方案
    背景:在正确安装opencv-contrib-python4.8.1.78后,在使用opencv进行多目标跟踪时,出现如下错误:错误及解决方法1、AttributeError:module'cv2.cv2'hasnoattribute'MultiTracker_create'解决方法:将trackers=cv2.MultiTracker_create()改成trackers=cv2.legacy.Mu......
  • 基于AI智能分析网关的智慧视频监控一站式解决方案
    1、功能概述TSINGEE智能分析网关+EasyCVR智慧视频监控系统基于云-边-端一体化协同架构,可兼容多协议、多类型的设备接入,实现视频数据采集、海量视频汇聚与处理、按需调阅、全网分发、告警消息推送、数据级联共享、AI智能分析接入等视频能力服务,可广泛应用于安防监控、工地、工厂、......