首页 > 其他分享 >如何保证消息不被重复消费?

如何保证消息不被重复消费?

时间:2024-05-24 19:26:08浏览次数:40  
标签:消费 重复 Kafka 保证 消息 offset 数据

面试题

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

面试官心理分析

其实这是很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里要考虑的一个问题。

面试题剖析

回答这个问题,首先你别听到重复消息这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。

举个栗子。

有这么个场景。数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 Kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 Zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,Kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 Kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

注意:新版的 Kafka 已经将 offset 的存储从 Zookeeper 转移至 Kafka brokers,并使用内部位移主题 __consumer_offsets 进行存储。

编辑

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错

所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

编辑

当然,如何保证 MQ 的消费是幂等性的,在实际应用中需要结合具体的业务来看。

关注「小新杂谈社」公众号,回复 AI,即可获取本项目离线 PDF 文档,学习更加方便!

标签:消费,重复,Kafka,保证,消息,offset,数据
From: https://blog.csdn.net/u014349086/article/details/139154885

相关文章

  • 简单的事情重复做重复的事情认真做
    简单的事情重复做重复的事情认真做 工作是枯燥的,每天都是重复着同样的事,说着同样的话,如何才能让工作也起到保鲜作用呢?其实生活也是同样的,循环反复着经过,所以才产生了你的喜怒哀乐.才会有烦恼和兴奋的交替,那么在这个循环连中,怎么让他朝着你向往的方向发展而保持着新鲜感呢??~~探索!!......
  • 采集数据产品描述有超链接///设置免运费后,达到免送标准,其他运费不显示///给产品详情页
    //产品描述有超链接,去掉functionremove_product_hyperlinks($content){if(is_product()){//确保只在产品页面上应用$content=preg_replace('/<ahref=".*?">(.*?)<\/a>/','$1',$content);}return$content;}add_......
  • 重工业机台数据汇集,如何保证数据准确的同时,又保持高效率?
    在重工业中,如钢铁、矿产和能源行业,机台发挥着重要作用。它们用于加工和制造各种金属材料,如铁矿石、铜矿石和煤矿等。重工业机台在运行过程中会产生多种类型的数据,这些数据对于监测机台状态、优化生产过程以及进行故障诊断等都具有重要意义。以下是一些重工业机台可能产生的数据:......
  • 银行总部文件自动下发,如何保证不影响专线网络使用?
    银行在我国金融体系中占据重要地位,是我国市场经济的重要组成部分。我国商业银行随着自身不断发展,规模日益扩大,形成了“总行-分行-支行-营业网点”的典型层级管理模式。在日常中,银行总部存在文件下发的场景:银行总部会定期向分支行发送财务报表和审计报告,以确保分支行的财务状况符......
  • Xtrabackup 不备份 binlog 怎么保证一致性?
    公司大佬出的考核题中有个有意思的问题:已知:MySQL的内部两阶段提交,是为了解决binlog和redolog的一致性(在crashrecovery的过程中,如果发现某个事务的redolog已经完成prepare阶段,但未完成commit,那么会验证该事务是否在binlog中,如存在,则进行提交,否则进行回滚)。又......
  • 力扣 3.无重复字符的最长字串
    题目描述:给定一个字符串 s ,请你找出其中不含有重复字符的 最长子串的长度。示例 1:输入:s="abcabcbb"输出:3解释:因为无重复字符的最长子串是"abc",所以其长度为3。示例2:输入:s="bbbbb"输出:1解释:因为无重复字符的最长子串是"b",所以其长度为1。......
  • 文件夹加密如何实现?如何保证加密后文件不会被他人查看?
    文件夹加密技术:确保数据安全的关键步骤在我们的数字生活中,个人和企业的敏感信息经常需要通过电子方式存储和传输。为了防止这些信息落入错误的手中,文件夹加密成为了一种重要的数据保护手段。文件夹加密不仅可以防止未授权访问,还能在信息传播过程中保障数据的完整性和机密性。但如......
  • 力扣-1209. 删除字符串中的所有相邻重复项 II
    1.题目题目地址(1209.删除字符串中的所有相邻重复项II-力扣(LeetCode))https://leetcode.cn/problems/remove-all-adjacent-duplicates-in-string-ii/题目描述给你一个字符串 s,「k倍重复项删除操作」将会从s 中选择 k 个相邻且相等的字母,并删除它们,使被删去的字符串的......
  • 递归地获取当前目录下所有文件的后缀名(不重复)
    好的,这里是修改后的批处理脚本,它将递归地获取当前目录下所有文件的后缀名,并将不重复的后缀名输出到当前目录下的a.txt文件中,然后结束:@echooffsetlocalenabledelayedexpansion::初始化一个空的集合用来存储后缀名set"suffixList="::递归遍历当前目录及其子目录下的所......
  • 互斥锁,IPC机制,队列,生产者消费者模型
    Ⅰ互斥锁【一】什么是互斥锁互斥锁其实就是一种锁。为当前进程或线程添加额外的限制限制当前时间段只能由当前进程使用,当前进程使用完成后才能其他进程继续使用其作用是保证在同一时刻只有一个线程在访问共享资源,从而避免多个线程同时读写数据造成的问题。互斥锁的基本原......