首页 > 其他分享 >RocketMQ—RocketMQ消费重试和死信消息

RocketMQ—RocketMQ消费重试和死信消息

时间:2024-02-17 21:12:16浏览次数:30  
标签:重试 次数 死信 消息 consumer RocketMQ

RocketMQ—RocketMQ消费重试和死信消息

消费重试

生产者重试

设置重试的代码如下

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

一般情况下,我们不会在生产者方进行重试。

消费者重试

消费者在消费消息的过程中,下方三种情况会进行重试:

  • 业务报错了
  • 返回null 返回
  • 返回RECONSUME_LATER

代码如下:

/**
     * 重试的时间间隔
     * 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 默认重试16次
     * --------------
     * 重试的次数一般 5次
     * @throws Exception
     */
@Test
public void retryConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("retryTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            System.out.println(messageExt.getReconsumeTimes());
            System.out.println(new String(messageExt.getBody()));
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

消息默认重试16次:

能否自定义重试次数

设置重试次数的代码如下:

// 设定重试次数
consumer.setMaxReconsumeTimes(2);

消息的构成如下:

消息构成

如果使用了上述代码,就会为消息头设置重试次数。

死信消息

如果消息重试了最大次数还是失败怎么办

最大次数:如果没有设置最大次数,默认情况下,并发模式是16次,顺序模式是int的最大值。

如果重试了最大次数还是失败,就会变成死信消息,会被放进一个死信主题中去,这个死信主题的名字是有规律的,这个主题是

%DLQ%消费者组的名称

当消息处理失败的时候该如何正确的处理

  1. 可以监听死信消息,给管理员发送邮件或者短信通知,但是如果有多个死信消息,就要写多个监听器;
  2. 可以手动判断重试次数,如果大于某个次数,就记录下来,就不重试了,发送邮件或者短信通知。
try {
    handleDb();
} catch (Exception e) {
    // 重试
    int reconsumeTimes = messageExt.getReconsumeTimes();
    if (reconsumeTimes >= MAX_TIMES) {
        // 不要重试了
        System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

标签:重试,次数,死信,消息,consumer,RocketMQ
From: https://www.cnblogs.com/nicaicai/p/18018420

相关文章

  • 大白话-设计RocketMQ延迟消息
    延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单释放库存等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。为什么RocketM......
  • 解决淘宝登录频繁提示,验证失败,点击框体重试(error:9tFhU6)
    1、起因近期淘宝登录需要右划验证,才能点击登录,但是一直提示"验证失败,点击框体重试"类似错误如下图所示2、发现问题通常有这种问题的,大概率是安装了浏览器广告屏蔽插件经核查,我的浏览器安装的是AdGuard看了一下页面的调试日志,发现有一条用来通信的WebSocket协议被插件屏蔽了......
  • RocketMQ_详细配置与使用详解
    为什么要用MQ 应用解耦系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。 使用消息队列解耦合,系统的耦合性就会提高了。......
  • 动力节点最新RocketMQ基本操作-01
    动力节点最新RocketMQ基本操作1. RocketMQ简介MQ====MessageQueue编程中的 同步:排队一个一个走;一个动作做完以后,才能进行下一个异步:各走各的;两个动作可以同时做;官网:  http://rocketmq.apache.org/ RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是......
  • RocketMQ应用-基金购买秒杀实现
    架构支持根据实际业务场景,分析集群分流的具体处理方案,假设基金购买接口单次处理时间为500ms,tomcat使用默认线程数200,则单个tomcat处理基金购买接口的QPS=1000/500*200=400。场景1-4000QPS要求实现4000QPS的并发量,可以部署10个tomcat集群应用,使用nginx做负载均衡,轮询分配到tomc......
  • RocketMQ应用-消费幂等性问题解决
    重复消费产生原因生产者多次投递-投递时服务端接收后客户端网络原因确认失败,重新投递消费者扩容重试-消费者扩容导致正在消费的消息没有正常应答,服务端重新推送重复消费解决方案给消息增加唯一key,消费时校验key是否已经消费过消费者控制消息的幂等性(多次同样的操作结果一......
  • RocketMQ应用-实现周期性自动任务
    应用背景提供配置功能,用于固定周期的执行某个动作;如在基金交易的每个交易日结束时,需要根据当天交易量计算基金的收益,可以提供定时任务,在每天晚上固定的时间计算收益数据。功能设计提供任务数据表task_info和任务执行记录表task_log_info;通过扫描task_info表中所有的任务配置数......
  • RocketMQ消息消费
    本文只提供生产者和消费者部分的示例代码,其它配置部分见RocketMQ消息客户端生产与消费的基本实现技术框架JDK:javaversion"1.8.0_391"RocketMQSDK:rocketmq-spring-boot-starter:2.2.3消息消费原理消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消......
  • RocketMQ—引言
    RocketMQ—引言MQ介绍在学习RocketMQ之前,我们先来看以下MQ的意思。MQ是MessageQueue的首字母缩写。Message:意思为消息,在我们生活中可以是一句话/一个短信/一个邮件;在计算机领域,放到实际业务中,就是一条数据。Queue:意思为队列。是一种先进先出的数据结构。我们要学习MQ,重......
  • RocketMQ
    领域模型https://rocketmq.apache.org/zh/docs/domainModel/01main/概述ApacheRocketMQ是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。通信方式和传输模型的具体说明,请参见下文通信方式介绍和消息传输模型介绍。ApacheRocketMQ产品具......