首页 > 其他分享 >springboot 使用 rocketMQ

springboot 使用 rocketMQ

时间:2024-07-23 21:54:19浏览次数:12  
标签:springboot void rocketMQTemplate MsgModel 使用 new message public rocketMQ

POM 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

配置文件

rocketmq:
    name-server: 192.168.206.186:9876

更多的配置可以参考官网,官网写的很详细 https://rocketmq.apache.org/zh/docs/4.x/parameterConfiguration/01local#clientconfig%E9%85%8D%E7%BD%AE

生产者

不同的方法完成消息的花式发送

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Test
void contextLoads() {
    
    // 同步
    rocketMQTemplate.syncSend("myTopic1", "我是boot的一个消息");
    
    // 异步
    rocketMQTemplate.asyncSend("myTopic2", "我是boot的一个异步消息", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("成功");
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("失败" + throwable.getMessage());
        }
    });

    // 单向
    rocketMQTemplate.sendOneWay("myTopic3", "我是boot的一个单向消息");

    // 延迟(延迟级别为3,超时时间为 3000 毫秒)
    Message<String> msg = MessageBuilder.withPayload("我是boot的一个延迟消息").build();
    rocketMQTemplate.syncSend("myTopic4", msg, 3000, 3);

    // 顺序消息(消费者 需要单线程消费)
    List<MsgModel> msgModels = Arrays.asList(
            new MsgModel("qwer", 1, "下单"),
            new MsgModel("qwer", 1, "短信"),
            new MsgModel("qwer", 1, "物流"),
            new MsgModel("zxcv", 2, "下单"),
            new MsgModel("zxcv", 2, "短信"),
            new MsgModel("zxcv", 2, "物流")
    );
    msgModels.forEach(msgModel -> {
        // 发送,以 msgModel.getOrderSn() 分组,每组消息放到一个队列里(这里是两组消息)
        rocketMQTemplate.syncSendOrderly("myTopic5", JSON.toJSONString(msgModel), msgModel.getOrderSn());
    });
    
    // 带 tag 
    rocketMQTemplate.syncSend("myTopic6:tagA", "我是一个带tag的消息");
    
    // 带 key
    Message<String> message = MessageBuilder
        	.withPayload("我是一个带key的消息")
            .setHeader(RocketMQHeaders.KEYS, "keyA")
            .build();
    rocketMQTemplate.syncSend("myTopic7", message);
}

消费者

@RocketMQMessageListener 注解不同的配置完成消息的花式接收,举几个示例

普通消息

@Component
@RocketMQMessageListener(topic = "myTopic1", 
                         consumerGroup = "my-consumer-group1"
)
public class MyRocketConsumerListener1 implements RocketMQListener<MessageExt> {
    /**
     * 这个方法就是消费者的方法
     * 如果泛型制定了固定的类型 那么消息体就是我们的参数
     * MessageExt 类型是消息的所有内容
     * ------------------------
     * 没有报错 就签收了,如果报错了 就是拒收 就会重试
     * 
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

顺序消息

@Component
@RocketMQMessageListener(topic = "myTopic5",
        consumerGroup = "my-consumer-group2",
        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
        maxReconsumeTimes = 5 // 消费重试的次数
)
public class MyRocketConsumerListener2 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
        System.out.println(msgModel);
    }
}

带 tag

@Component
@RocketMQMessageListener(topic = "myTopic6",
        consumerGroup = "my-consumer-group3",
        selectorType = SelectorType.TAG,// tag过滤模式
        selectorExpression = "tagA || tagB"
)
public class MyRocketConsumerListener3 implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

广播模式,两个消费者同组,订阅的也是同一个Topic,myTopic4 的消息两个消费者都会消息

@Component
@RocketMQMessageListener(topic = "myTopic4",
        consumerGroup = "my-consumer-group4",
        messageModel = MessageModel.BROADCASTING // 广播模式
)
public class MyRocketConsumerListener4 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("my-consumer-group4消费者MyRocketConsumerListener4:" + message);
    }
}

@Component
@RocketMQMessageListener(topic = "myTopic4",
        consumerGroup = "my-consumer-group4",
        messageModel = MessageModel.BROADCASTING // 广播模式
)
public class MyRocketConsumerListener5 implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("my-consumer-group4消费者MyRocketConsumerListener5:" + message);
    }
}

标签:springboot,void,rocketMQTemplate,MsgModel,使用,new,message,public,rocketMQ
From: https://www.cnblogs.com/cyrushuang/p/18319716

相关文章

  • CMake使用方法(详细版)上
    目录CMake是什么如何配置使用方法基本流程使用cmake的示例只有源文件搜索文件set的使用头文件和源文件分离包含头文件变量操作生成动态库和静态库制作静态库制作动态库链接库文件日志参考CMake是什么         CMake是一个跨平台的项目构建工具,它......
  • 使用Docker和Kubernetes管理Java微服务
    使用Docker和Kubernetes管理Java微服务大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何使用Docker和Kubernetes来管理Java微服务。Docker和Kubernetes是现代微服务架构中不可或缺的工具,它们能够极大地简化应用程序的部署和管理,提高开发......
  • 使用Spring Boot构建高性能企业级应用
    使用SpringBoot构建高性能企业级应用大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何使用SpringBoot构建高性能企业级应用,从框架配置到性能优化,全方位提升你的应用性能。一、SpringBoot概述SpringBoot是基于Spring框架的一个快速......
  • linux 信号量sem 使用示例
    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录前言一、信号量是什么?二、代码示例1.posix2.systemV总结前言提示:这里可以添加本文要记录的大概内容:信号量主要用于进程间使用信号量:分为posix和systemV信号量posix信号量:sem_open:......
  • 使用Python自动备份MC服务器存档至轻量COS
    在存档意外炸档时,备份就显得尤为重要。本文以Minecraft1.18.2为例,教你如何使用轻量搭建一个MC服务器,并定时将存档备份入轻量COS,节省本地空间。前期准备一台安装了Linux原版系统的轻量(因为目前轻量COS仅支持挂载Linux系统镜像,不支持容器或自定义镜像)一个和轻量大区相同的轻......
  • 090、Python 写Excel文件及一些操作(使用xlwt库)
    要写Excel文件,我们需要使用第三方库。xlwt库是一个常用的写Excel的第三方库,它同时支持.xls和.xlsx。要使用第三方库,首选需安装:pipinstallxlwtxlutils安装完成后,我们就可以引入库来进行相关操作了。使用xrwt库写Excel文本,可以按以下步骤操作:1、第一步:创建工作簿(Wor......
  • 计算机编程—IT实战课堂 Springboot 电竞兴趣论坛系统
    计算机编程—IT实战课堂:Springboot电竞兴趣论坛系统随着电子竞技行业的迅猛发展,电竞爱好者对于交流平台的需求日益增长。结合IT实战课堂的教学实践,我们利用SpringBoot框架开发了一款集讨论、资源共享、赛事追踪于一体的电竞兴趣论坛系统。本文将深入探讨该项目的构思背景、......
  • redis的使用场景和持久化方式
    redis的使用场景热点数据的缓存。热点:频繁读取的数据。限时任务的操作:短信验证码。完成session共享的问题完成分布式锁。redis的持久化方式什么是持久化:把内存中的数据存储到磁盘的过程,同时也可以把磁盘中的数据加载到内存中。redis持久化分为两种:RDB和AOFRDB:什......
  • 使用脚本自动配置Java环境
    python脚本配置java环境适用Windows与LinuxLinux系统的全局配置文件路径:/etc/profile首先需要下载jdk,然后在命令行使用脚本使用方式:jdk路径不带bin目录pythonset_java_env.py/path/to/jdkset_java_env.py内容如下:importosimportsysdefset_java_env(java_path):......
  • WPF如何使用WebView,并且禁用F12和F5。
    客户端套浏览器壳,是如今比较浏览的客户端客户端开发方式。这篇文字简单来介绍一下如何在WPF中使用WebView安装WebView的nuget包可以直接执行安装命令Install-PackageMicrosoft.Web.WebView2。也可以通过nuget包管理器,安装Microsoft.Web.WebView2包。安装成功之后,改nuget......