首页 > 其他分享 >RabbitMQ避免重复消费

RabbitMQ避免重复消费

时间:2023-12-04 11:36:00浏览次数:24  
标签:消费 重复 RabbitMQ 处理 避免 消息 messageId message

在Java中,可以使用消息队列来实现消息的异步处理,其中常用的消息队列有 RabbitMQ、ActiveMQ、Kafka 等。

什么是幂等性?

幂等性是指无论操作执行多少次,都是得到相同的结果,而不会产生其他副作用。

在rabbitMQ中

什么是消息重复消费?

同一条消息在MQ中被消费多次

出现重复消费的原因:

生产者发送一条消息到rabbitMQ,但rabbitMQ尚未收到消费者的确认,会认为消息消费未被消费而重新发送。

网络不稳定、消费者故障、网络分区、消息重复传递策略、消费者超时设置不当

为什么需要避免重复消费?

业务错误:我本来写的业务逻辑就是只要执行一次

数据重复:数据插入重复,破坏数据唯一性

资源浪费:占用系统资源,降低系统性能

如何避免消息重复消费?

消息去重

通过记录已经消费过的消息,在消息到达时检查它是否已经在记录中存在,从而避免重复处理。

  if (!processedMessages.contains(message)) {
                    processMessage(message);
                    processedMessages.add(message);
                }
消息幂等性
分布式锁(消息幂等性)

使用UUID生成唯一Id ,作为messageId

使用了唯一的消息ID来确保同一条消息只会被处理一次。

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .messageId(UUID.randomUUID().toString()) // 唯一标识
                    .build();
        if (!isMessageProcessed(messageId)) {
            processMessage(message);
            saveProcessedMessage(messageId);
        }

消费者先查询该消息是否已经被处理过,如果没有被处理过,则调用processMessage()方法处理该消息,并使用 saveProcessedMessage()方法保存已经处理过的消息。

            //手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false

在处理完消息后,还需要调用channel.basicAck(envelope.getDeliveryTag(), false)方法确认消息已经被消费。这是因为RabbitMQ是一个消息的投递机制,只有在消费者确认了消息已经被处理后,才会从消息队列中删除该消息。

使用redis实现避免重复消费

生产者

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(1) // 指定消息是否需要持久化 1-需要 2-不需要
                    .messageId(UUID.randomUUID().toString()) // 唯一标识
                    .build();

消费者

 String result = jedis.set(messageId, "0", "NX", "EX", 10);

 if (result != null && result.equalsIgnoreCase("OK")){
                    System.out.println("接收到消息:"+ new String(body,"UTF-8"));

                    //消费成功 set messageId - 1
                    jedis.set(messageId,"1");
                    channel.basicAck(envelope.getDeliveryTag(),false);

                }else {
                    //如果1中的setnx失败,获取key对应的value,如果是1,设置ack 如果是0 return

                    String s = jedis.get(messageId);
                    if ("1".equalsIgnoreCase(s)){
                        //消费完了
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }

spring-boot

如果存在,设置value为1;如果value是1,ack

事务性消费
消费状态追踪

标签:消费,重复,RabbitMQ,处理,避免,消息,messageId,message
From: https://www.cnblogs.com/yiluok/p/17874542.html

相关文章

  • hibernate使用原生sql查询Hibernate原生SQL多表查询字段名重复问题以及解决方法
    解决方案通过将别名.*换成{别名.*}hibernate会自动为我们生成别名,具体修改如下图: ......
  • RabbitMQ 消费者可靠性——失败重试机制
     效果:消费者抛异常后,会本地重试,如果本地重试次数达到最大重试次数之后,直接给队列返回reject,队列收到后就会丢弃该消息,也就是策略的第一种但就这样把删了不太好,所以有了失败消息处理策略  第二种ImmediateRequeueMessageRecoverer:消费者抛异常后,会本地重试,如果本地重试......
  • Python 的 tqdm 如果在内部使用print打印 会重复打印进度条 怎么避免这个问题?
    要避免在使用Python的tqdm库时在内部使用print打印时重复打印进度条,您可以通过使用tqdm库提供的特殊函数tqdm.write()来实现。tqdm.write()函数会将输出写入到标准输出,而不会干扰进度条的显示。下面是一个示例:fromtqdmimporttqdmimporttime#创建一个范围为10的进度条fori......
  • RabbitMQ 生产者可靠性——生产者重连
     我们配置的这个失败后的重连机制仅仅是发送者连接MQ失败的连接失败重试,如果消息发送抛出异常时不会重试,因为它只是连接失败的重试,不是消息发送的重试 spring:rabbitmq:host:192.168.88.130port:5672virtual-host:/hmallusername:hmallpassword......
  • RabbitMQ 消息转换器
     代码示例:1.引入依赖<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>2.在启动类中创建Beanpackagecom.itheima;importorg.springframework.amqp.rabbit.core.Rabbi......
  • RabbitMQ Java代码声明队列和交换机(方法一)
      交换机和队列的声明一般写在消费者模块里 代码示例:packagecom.itheima.config_RabbitMQ;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Configuration......
  • RabbitMQ Topic交换机
     代码示例:1.新建两个队列 2.创建交换机,名字叫hmall.topic,类型选择topic 3.hmall.topic交换机绑定第一步的两个队列,绑定过程中填写RoutingKey  4.编写消费者代码监听这两个队列@RabbitListener(queues="topic.queue1")publicvoidlistenQueue05(Str......
  • RabbitMQ Direct交换机
     代码示例:1.交换机绑定了两个队列,并给它们设置了RoutingKey2. publisher发送者给Direct交换机发消息时,第二个参数指定RoutingKey:@GetMapping("/mq03")publicvoidmq03(){StringexchangeName="hmall.direct";Stringmsg="hello,红色";//三个参数:......
  • RabbitMQ 发送消息到交换机
    发送消息到交换机的代码:@GetMapping("/mq02")//发送消息给交换机publicvoidmq02(){StringexchangeName="hmall.fanout";Stringmsg="hello,每个人";//三个参数:交换机名称、RoutingKey(暂时为空)、要发送的消息rabbitTemplate.convertAndSend(exchangeName,......
  • RabbitMQ Fanout交换机
     容易搞混的点:1.假如publisher给Fanout交换机发送了一条消息,那么Fanout交换机会给每一个绑定到它身上的队列都发送这条消息,也就是说有多少个队列跟它绑定了,这条消息就有几份,每个队列都收到一份。2.假如一个队列绑定了多个消费者,那么该队列在给消费者投递消息时就是轮询,一......