首页 > 其他分享 >RocketMQ—RocketMQ消息重复消费问题

RocketMQ—RocketMQ消息重复消费问题

时间:2024-02-19 19:33:05浏览次数:30  
标签:消费 重复 索引 消息 key RocketMQ

RocketMQ—RocketMQ消息重复消费问题

重复消费问题的描述

什么情况下会发生重复消费的问题:

  1. 生产者多次投递消息:如果生产者发送消息时,连接有延迟,MQ还没收到消息,生产者又发送了一次消息;

  2. 消费者方扩容时会重试:

    topic中有四个queue,queue2中有一个消息MSG,如果此时只有一个消费者,MSG就会被被consumer1获取到;

    第1步

    但是consumer1获取到消息时,位点还没来得及前移,又有了一个消费者Consumer2,这样就会产生重平衡;

    第二步

    Consumer1消费0和1队列,Consumer2消费2和3队列,因为上一步的消息还没消费成功,所以会给Consumer2一份消息,这样就会产生重复消费的问题。

重复消费问题的解决方案

一般情况下,我们会在消费者端解决重复消费的问题,消费者端需要进行去重,去重关键点就是要找到消息的唯一标记,所以我们在发送消息时会带有一个key,消费者拿到相同的key就不进行操作了。(一般使用redis存储消费过的key,或者使用mysql存储消费记录,把key设置成唯一索引。)

mysql解决方案

简单建一个去重表,有id,key两个字段,key设置唯一索引。

错误做法

if(key存在){
	直接返回;
}
执行业务逻辑。
插入key记录到MySQL;

这样做是不正确的,因为不能保证并发安全,如果有两个线程同时进来,这样都会查不到这个key,都会执行消费的逻辑。

正确做法

try{
    插入key记录到MySQL;//MySQL唯一索引内部会保证唯一
}catch (SQLException e) {
    if (e instanceof SQLIntegrityConstraintViolationException) {
        // 唯一索引冲突异常
        // 说明消息来过了
        记录日志;
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    其他错误依然会报错,会重新投递到消息队列;
}
处理业务逻辑
如果业务报错 则删除掉这个去重表记录
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

标签:消费,重复,索引,消息,key,RocketMQ
From: https://www.cnblogs.com/nicaicai/p/18021795

相关文章

  • Kafka 消费者
    1.Kafka消费方式pull(拉)模式:consumer采用从broker中主动拉取数据。Kafka采用这种方式。push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是50m/s,Consumer1、Consumer2就来不及处理消息。pull模式不足之处是,如果......
  • sublime 选择有规律的数据,同时快速编辑多行内容 去除重复行或者只保留唯一值
     同时快速编辑多行内容:五种方式:1,鼠标选中多行,按下CtrlShiftL(CommandShiftL)即可同时编辑这些行;2,鼠标选中文本,反复按CTRLD(CommandD)即可继续向下同时选中下一个相同的文本进行同时编辑;3,鼠标选中文本,按下AltF3(Win)或CtrlCommandG(Mac)即可一次性选择全......
  • linux 中awk 根据多列读数据进行去重复
     001、(base)[b20223040323@admin1test2]$lstest.txt(base)[b20223040323@admin1test2]$cattest.txt##测试数据如下;根据第一列和第三列对数据进行去重复ID=gene-RIN1rna-XM_018043206.13615ID=gene-STRIP2rna-XM_018046935.13917ID=gene-ST......
  • RocketMQ—RocketMQ消费重试和死信消息
    RocketMQ—RocketMQ消费重试和死信消息消费重试生产者重试设置重试的代码如下//失败的情况重发3次producer.setRetryTimesWhenSendFailed(3);//消息在1S内没有发送成功,就会重试producer.send(msg,1000);一般情况下,我们不会在生产者方进行重试。消费者重试消费者在消......
  • Redis生成无规律不重复的纯数字券码
    需求描述在开发优惠券系统或票务系统的时候,经常要生成纯数字码,券码要求:12位纯数字,无规律,不重复。下面我提供一种思路,利用redis的List数据类型,Lpop+Rpush维护一个1万个码的队列队列数据结构保持1万个券码数量,可以根据项目实际情况自行调整Array([0]=>478439938353[1]=>......
  • 【Java 并发】【应用】经典的生产者、消费者
    1  前言闲来无事,复习复习并发中常用到的一些协调多线程的工具哈。2 基于Java队列的实现生产者跟消费者之间要协调,他俩会出现碰撞的地方就是存放东西的容器,所以我们可以直接拿一个线程安全的队列来做容器即可,比如我这里用的ArrayBlockingQueue:/***@author:xjx*@d......
  • 大白话-设计RocketMQ延迟消息
    延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单释放库存等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。为什么RocketM......
  • 简单的斐波那契数列通过chan实现生产者消费者模型
    1.实现斐波拉契数列写一个函数返回长度为n的斐波拉契slice数组funcfi(nint)[]int{ ifn<=0{ return[]int{} } fibs:=make([]int,n) fibs[0]=0 ifn>1{ fibs[1]=1 fori:=2;i<n;i++{ fibs[i]=fibs[i-1]+fibs[i-2] } } returnfibs}......
  • 【多线程例题】使用三个线程,分别可以打印A,B,C。要求实现三个线程协同打印,顺序打印出ABC
    顺序打印-进阶版方法一:三个线程竞争同一个锁,通过count判断是否打印三个线程分别打印A,B,C方法一:通过count计数打印(三个线程上同样的锁,打印一个,召唤所有锁,如果不满足条件,则wait等待,锁自动解锁)方法二:/***有三个线程,分别只能打印A,B和C*要求按顺序打印ABC,打印10次*输出示......
  • Linux 中实现去重复后仍然按照原来的顺序输出
     001、一般去重复[root@PC1test01]#lsa.txt[root@PC1test01]#cata.txt##测试文本cdcabb[root@PC1test01]#cata.txt|sort|uniq##去重复后也排序了abcd 002、去重复后保持原来的顺序[root@PC1test01]#lsa......