一、场景
程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。
二、解决方案
2.1、排它锁 for update
把select 改成 select for update语句,用排他锁锁定。
select * from t_order where order_no = 'THIS_ORDER_NO' for update //开启事务 if(order.status != null) { return ;//消息重复,直接返回 }
缺点:
1、仅适用于InnoDB,且必须在事务块(BEGIN/COMMIT)中才能生效。在进行事务操作时,通过“for update”语句,MySQL会对查询结果集中每行数据都添加排他锁,其他线程对该记录的更新与删除操作都会阻塞。排他锁包含行锁、表锁。
2、引入了事务包裹而导致整个消息消费可能变长,并发度下降。
2.2、乐观锁
更新订单状态采取乐观锁,更新失败则消息重新消费之类。
2.3、消息表
基于关系型数据库插入消息表,即消息表+本地事务保证幂等。 数据库中增加一个消息消费记录表(幂等表),把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。
1、开启事务
2、插入消息表(处理好主键冲突问题)
3、更新订单表(原消费逻辑)
4、提交事务
【说明】:
1、这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。
2、如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。
【缺点】:
1、消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。
2、数据库的数据必须是在一个库,跨库无法解决。
【注意】:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。
2.4、基于消息幂等表的非事务方案
以上是去事务化后的消息幂等方案的流程,此方案是无事务的,是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)。
【风险】:问题出在并发场景,在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。
【处理风险】:插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序自行实现)。
2.5、更灵活的消息表存储媒介
我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:
1、性能上损耗更低;
2、超时时间可以直接利用Redis本身的ttl实现。
当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。
三、建议
1、消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了;
2、消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试;
3、一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等);
4、在第3部做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好第1步的回滚,使得下次重试消费成功。
标签:事务,消费,解决方案,投递,插入,消息,RocketMQ From: https://www.cnblogs.com/xiaobaicai12138/p/17829362.html