首页 > 其他分享 >rocketmq--死信队列

rocketmq--死信队列

时间:2024-01-24 11:45:02浏览次数:23  
标签:-- 队列 死信 import org message rocketmq

在RocketMQ中,死信队列(Dead Letter Queue,DLQ)用于存放无法成功消费的消息。当消息重试消费次数超过设定的阈值后,消息将被转移到死信队列。使用Spring Boot集成RocketMQ时,可以通过以下步骤来处理死信队列中的消息。

首先,在pom.xml中添加RocketMQ Spring Boot Starter的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>你的版本号</version>
</dependency>

接下来,在application.propertiesapplication.yml中配置RocketMQ的相关属性:

# application.properties 示例
rocketmq.name-server=你的NameServer地址
rocketmq.producer.group=你的生产者分组
rocketmq.consumer.group=你的消费者分组

然后,创建生产者服务发送消息:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void send(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
}

创建消费者服务来消费消息,并在无法消费的情况下抛出异常,触发重试机制:

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "test-topic",
    consumerGroup = "你的消费者分组",
    consumeMode = ConsumeMode.ORDERLY,
    selectorType = SelectorType.TAG,
    selectorExpression = "*"
)
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        try {
            // 处理消息的逻辑
            System.out.println("Received message: " + message);
            // 假设在这里发生了异常,触发重试
            throw new RuntimeException("Failed to process message.");
        } catch (Exception e) {
            // 异常处理逻辑,例如记录日志等
            System.err.println("Error processing message: " + e.getMessage());
            // 抛出异常,让RocketMQ进行重试
            throw e;
        }
    }
}

在上面的消费者服务中,我们通过抛出异常来模拟消费失败的场景。RocketMQ默认会进行重试,重试次数可以通过consumer.maxReconsumeTimes属性进行配置。当超过最大重试次数后,消息会被转移到死信队列。

处理死信队列的消息,需要监听 %DLQ%你的消费者分组 主题:

@Service
@RocketMQMessageListener(
    topic = "%DLQ%你的消费者分组",
    consumerGroup = "你的消费者分组-DLQ",
    consumeMode = ConsumeMode.ORDERLY,
    selectorType = SelectorType.TAG,
    selectorExpression = "*"
)
public class DeadLetterQueueConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理死信队列的消息
        System.out.println("Received dead letter message: " + message);
        // 在这里编写处理死信消息的逻辑,例如记录日志、报警、人工干预等
    }
}

以上就是一个简单的死信队列处理例子。需要注意的是,死信队列的主题名称是固定格式的,以 %DLQ% 开头,后面跟上原消费者分组的名称。另外,在实际的业务场景中,你可能需要根据消息内容或错误类型进行更详细的错误处理逻辑。

标签:--,队列,死信,import,org,message,rocketmq
From: https://www.cnblogs.com/xylfjk/p/17984331

相关文章

  • 事务
    事务事务就是用户定义的一系列数据库操作,这些操作可以视为一个完成的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。很形象的一个例子,张三给李四转账100元,在程序里面,张三的余额就要-100,李四的余额就要+100整个事件是一个整体,哪一步错了,整个事件都是失败的......
  • Oracle12c 数据库 警告日志
    目录一:查看警告日志文件的位置二:警告日志内容三:告警日志监控:方案1:方案2:方案3: 正文 回到顶部一:查看警告日志文件的位置        Oracle12c环境下查询,alert日志并不在bdump目录下,看到网上和书上都写着可以通过初始化参数background_dump_dest来查看......
  • flex-shrink计算上的一些细节
    <!DOCTYPEhtml><htmllang="cmn-hans"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>Document</title&g......
  • vue3 axios 封装
    一、介绍二、代码三、问题 一、介绍Axios是一个基于promise的HTTP库,可以用在浏览器和node.js中。这里介绍的是在vue3中怎么封装二、代码1.基本使用1.1安装npminstallaxios1.2简单使用1.2.1局部使用importaxiosfrom'axio......
  • Meson 入门指南之一
    相关站点Meson官网Meson官方文档MesonGitHub项目Meson介绍Meson的简介Meson(TheMesonBuildSystem)是个项目构建系统,类似的构建系统有Makefile、CMake、automake…。Meson是一个由Python实现的开源项目,其思想是,开发人员花费在构建调试上的每一秒都是浪费,同样......
  • Vite安装React TS Tailwind项目
    安装Vitenpminstallvite@latest或者yarncreatevite##Vite安装React项目+TS安装完vite之后,我们就可以使用vite来创建项目了,vite为我们提供了很多模版,我们只需要选择我们需要的就可以了。我们可以在创建项目的时候在命令当中指定对应的模版。```sh#npm安装npm......
  • Linux下配置ip地址四种方法
    linux系统安装完,以后通过命令模式配置网卡IP。配置文件通常是/etc/sysconfig/network-scripts/ifcfg-interface-nameifconfig后显示的内容,lo代表loop回路。 一、Ifconfig命令第一种使用ifconfig命令配置网卡的ip地址。此命令通常用来零时的测试用,计算机启动后,ip地址的配置......
  • 数据可视化是如何被企业重视起来的?
    数据可视化,作为信息时代的一项重要技术,正在企业中崭露头角,逐渐成为业务决策和运营管理的得力助手。企业之所以对数据可视化如此重视,是因为它为企业带来了诸多实际利益和战略优势。下面我就以可视化从业者的角度来简单聊聊这个话题。首先,数据可视化为企业提供了更直观、更清晰的......
  • 计算机常用二进制
    记忆技巧:千三兆六(1k=10^31M=10^6)毫三微六(1mm=10^-31μm=10^-6)二进制:(前缀:0b/0B)(后缀:b/B)八进制:(前缀:0)(后缀:o/O)十进制:(前缀:无,可加+/-)(后缀d/D)十六进制:(前缀:0x/0X)(后缀:h/H)bit:(b)比特/位byte:(B)字节,等于8bit1KB=1024Bytes1MB=1024KB1GB=1024MB1TB=1024G......
  • Linux基本命令
    Linux基本命令pwd查看当前所在的路径完整路径相对路径lsdirll查看目录信息​ ls-a查看当前目录下的信息以及隐藏文件stat查看信息xxx--help查看命令的使用方式创建文件touch命令创建(创建但是不打开)vi/vim(创建一个文件并......