首页 > 其他分享 >Springcloud学习笔记63---RocketMq超时重试,导致重复消费的问题,解决方案

Springcloud学习笔记63---RocketMq超时重试,导致重复消费的问题,解决方案

时间:2024-04-11 10:44:38浏览次数:17  
标签:重试 Springcloud --- org apache import 超时 consumer rocketmq

1. 重复消费的背景

当Consumer处理时间过长,在超时时间内没有返回给Broker消费状态,那么Broker也会自动重试。

设定一个超时时间,达到超时时间的那个消费当作消费失败处理。

Java客户端中的DefaultPushConsumer中的构造方法中的consumeTimeout字段(默认15分钟)。

package william.rmq.consumer.quickstart;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;

import javax.annotation.PostConstruct;
import java.util.List;

/**
* @Auther: ZhangShenao
* @Date: 2018/9/7 11:06
* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
   @Value("${spring.rocketmq.namesrvAddr}")
   private String namesrvAddr;

   private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");


   @PostConstruct
   public void start() {
       try {
           consumer.setNamesrvAddr(namesrvAddr);

           //从消息队列头部开始消费
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

           //设置集群消费模式
           consumer.setMessageModel(MessageModel.CLUSTERING);

           //设置消费超时时间(分钟)
           consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);

           //订阅主题
           consumer.subscribe("DefaultCluster", "*");

           //注册消息监听器
           consumer.registerMessageListener(this);

           //启动消费端
           consumer.start();

           log.info("Message Consumer Start...");
           System.err.println("Message Consumer Start...");
       } catch (MQClientException e) {
           log.error("Message Consumer Start Error!!",e);
       }

   }

   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       if (CollectionUtils.isEmpty(msgs)) {
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }

       MessageExt message = msgs.get(0);
       try {
           //逐条消费
           String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
           System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
                   message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);

           //模拟耗时操作2分钟,大于设置的消费超时时间
           Thread.sleep(1000L * 60 * 2);
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       } catch (Exception e) {
           log.error("Consume Message Error!!", e);
           return ConsumeConcurrentlyStatus.RECONSUME_LATER;
       }
   }

}

解决方案:

(1) 调大consumeTimeout字段的值;

(2) 消费回调的方法进行幂等性校验,进行逻辑校验;

参考文献:

https://blog.csdn.net/m0_45406092/article/details/120531495  (【RocketMQ】消息重试、重试次数设置、死信队列)

https://zhuanlan.zhihu.com/p/25265380

标签:重试,Springcloud,---,org,apache,import,超时,consumer,rocketmq
From: https://www.cnblogs.com/luckyplj/p/18128301

相关文章

  • JAVA学习-深入研究容器.选择接口的不同实现
        Java集合框架提供了多个接口,每个接口都有不同的实现类,以满足不同的需求。以下是几个常用接口及其不同的实现类的概述:1.List接口:  -特点:有序集合,允许重复元素。  -实现类:ArrayList、LinkedList、Vector。  -使用方法:使用add()方法添加元素,使用g......
  • Kibana-4.6.6 marvel插件license过期重新注册
    1、注册新的license,https://register.elastic.co/marvel_register,注册完毕邮箱收取下载地址,将新的license下载到本地2、到license所在目录执行curl命令:curl-XPUT'http://<host>:<port>/_license'[email protected]将host和port替换为Elasticsearch的IP和端口执行curl命令报......
  • docker-compose部署kafka
    docker-compose.ymlversion:'2'services:zookeeper:image:develop-harbor.geostar.com.cn/3rd/zookeeper:3.5.5ports:-"2181:2181"kafka:image:develop-harbor.geostar.com.cn/3rd/wurstmeister/kafka:2.12-2.2.1......
  • 上海泗博推出OPLink软件---助力企业快速搭建OPC与MQTT平台通信
    OPLink是上海泗博自主研发的基于OPC数据采集及转发,OPC数据到MQTT通信的产品。它是基于上海泗博多年的OPC应用经验和工业通信产品的开发背景,推出的OPC/MQTT工业通信软件。这款软件设计简洁、实用、稳定。同时,OPLink还具备与KepwareLinkMaster相似的数据转发功能,可实现PLC设备间的......
  • App Store 警告 ITMS-91053: Missing API declaration
    问题:app虽然成功上架AppStore,但是邮件提示了如下警告:解决:解决方法是添加隐私清单文件。参考官方说明:官方文档其它相关链接:StackOverflow中关于这个问题的讨论这位作者分享了如何解决该问题这篇文章提供了解决该问题详细的指南......
  • python-API开发zk客户端
    前面于超老师讲完了,zk运维的基本命令行玩法,更多的还是开发需要通过代码和zk结合处理。大多数场景是java后端去操作。这里我们以运维更友好的python来学习。1.kazoo模块zookeeper是一个用于维护配置信息、命名、提供分布式同步和提供组服务。它自身是高可用的,只要宕机节点不......
  • P3478 [POI2008] STA-Station
    题目链接:既然让求深度之和,那么我就定义以\(i\)为根时深度之和为\(f_i\),现在就是思考状态转移的问题。如果以某种手段得到了\(f_1\),那么接下来的转移就好说了。设\(u\)为当前节点,\(j\)是当前节点的子节点。\(s_i\)表示以\(i\)为根的子树中的节点数量,则\(s_u=1+\sum{s......
  • 抗干扰低功耗段码LCD液晶显示屏驱动芯片(IC)-VK2C21A,用于温控器,电动车汽车仪俵等
    VK2C21是一个点阵式存储映射的LCD驱动器,可支持最大80点(20SEGx4COM)或者最大128点(16SEGx8COM)的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据,也可通过指令进入省电模式。其高抗干扰,低功耗的特性适用于水电气表以及工控仪表类产品。L23+01特点:•工作电压2.4-5.5V•内......
  • 打工人怎么干好活-软件工程师篇
    ​一如何避免职业病*   按时吃饭,养胃*   不要久坐(一个小时活动一会)*   垫高显示器,撑住腰二如何靠谱  提供抗风险能力。用60%的工期去实现一个60分的可用版本。然后用余下的时间去优化。有干劲是好的,但如果是用90%的工期去赶工一个90分的版本。如果最......
  • 软考数据库---2.SQL语言
    主要记忆:表、索引、视图操作语句;数据操作;通配符、转义符;授权;存储过程;触发器这部分等等整理一下:“”"1、数据定义语言。SQLDDL提供定义关系模式和视图、删除关系和视图、修改关系模式的命令。数据库DDL的全称是DataDefinitionLanguage。2、交互式数据操纵语......