首页 > 其他分享 >SpringBoot3集成RocketMq

SpringBoot3集成RocketMq

时间:2023-08-17 09:34:52浏览次数:35  
标签:集成 RocketMq producer send boot private SpringBoot3 message rocketmq

标签:RocketMq5.Dashboard;

一、简介

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包
rocketmq-all-5.0.0-source-release.zip

2、解压后进入目录,编译打包
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

2、修改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh

3、服务启动

1、该目录下
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/

2、启动NameServer
sh mqnamesrv

输出日志
The Name Server boot success. serializeType=JSON

3、启动Broker+Proxy
sh mqbroker -n localhost:9876 --enable-proxy

输出日志
rocketmq-proxy startup successfully

4、关闭服务
sh mqshutdown namesrv
Send shutdown request to mqnamesrv(18636) OK

sh mqshutdown broker
Send shutdown request to mqbroker with proxy enable OK(18647)

4、控制台安装

1、下载master源码包
rocketmq-dashboard-master

2、解压后进入目录,编译打包
mvn clean package -Dmaven.test.skip=true

3、启动服务
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

4、输出日志
INFO main - Tomcat started on port(s): 8080 (http) with context path ''

5、访问服务:localhost:8080

三、工程搭建

1、工程结构

2、依赖管理

rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-starter.version}</version>
</dependency>

3、配置文件

配置RocketMq服务地址,消息生产者和消费者;

rocketmq:
  name-server: 127.0.0.1:9876
  # 生产者
  producer:
    group: boot_group_1
    # 消息发送超时时间
    send-message-timeout: 3000
    # 消息最大长度4M
    max-message-size: 4096
    # 消息发送失败重试次数
    retry-times-when-send-failed: 3
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
  # 消费者
  consumer:
    group: boot_group_1
    # 每次提取的最大消息数
    pull-batch-size: 5

4、配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

@Configuration
public class RocketMqConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer sendMsgTimeout;

    @Value("${rocketmq.producer.max-message-size}")
    private Integer maxMessageSize;

    @Value("${rocketmq.producer.retry-times-when-send-failed}")
    private Integer retryTimesWhenSendFailed ;

    @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
    private Integer retryTimesWhenSendAsyncFailed ;

    @Bean
    public RocketMQTemplate rocketMqTemplate(){
        RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
        rocketMqTemplate.setProducer(defaultMqProducer());
        return rocketMqTemplate;
    }

    @Bean
    public DefaultMQProducer defaultMqProducer() {
        DefaultMQProducer producer = new DefaultMQProducer();
        producer.setNamesrvAddr(this.nameServer);
        producer.setProducerGroup(this.producerGroup);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
        return producer;
    }
}

四、基础用法

1、消息生产

编写一个生产者接口类,分别使用RocketMQTemplateDefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;

@RestController
public class ProducerWeb {
    private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    @GetMapping("/send/msg1")
    public String sendMsg1 (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
            // 发送消息
            rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK" ;
    }

    @Autowired
    private DefaultMQProducer defaultMqProducer ;

    @GetMapping("/send/msg2")
    public String sendMsg2 (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
            // 构建消息对象
            Message message = new Message();
            message.setTopic("boot-mq-topic");
            message.setTags("boot-mq-tag");
            message.setKeys("boot-mq-key");
            message.setBody(msgBody.getBytes());
            // 发送消息,打印日志
            SendResult sendResult = defaultMqProducer.send(message);
            log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消费

编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;

@Service
@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
public class ConsumerListener implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @Override
    public void onMessage(String message) {
        log.info("\n=====\n message:{} \n=====\n",message);
    }
}

五、参考源码

文档仓库:
https://gitee.com/cicadasmile/butte-java-note

源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent

标签:集成,RocketMq,producer,send,boot,private,SpringBoot3,message,rocketmq
From: https://www.cnblogs.com/cicada-smile/p/17636729.html

相关文章

  • 【RocketMQ】MQ消息发送总结
    RocketMQ是通过DefaultMQProducer进行消息发送的,它实现了MQProducer接口,MQProducer接口中定义了消息发送的方法,方法主要分为三大类:send同步进行消息发送,向Broker发送消息之后等待响应结果;send异步进行消息发送,向Broker发送消息之后立刻返回,当消息发送成功/失败之后触发回调函数......
  • SpringBoot3.x 启动 refresh 过程解析
    Spring容器创建后,会调用它的refresh方法,refresh的时候会做很多事情:如完成配置类解析、各种BeanFactoryPostProcessor和BeanPostProcessor的注册、国际化配置的初始化、web内置容器的构造等等。web程序对应Spring容器为AnnotationConfigServletWebServerApplicationContext。Servlet......
  • 模拟集成电路设计系列博客——1.1.3 Cascode电流镜
    1.1.3Cascode电流镜Cascode电流镜是一种高输出阻抗电流镜,其基本结构如下图所示:首先从\(Q_2\)漏极看进去的输出阻抗仅为\(r_{ds2}\),其分析和基本电流镜非常一样。因此可以认为\(Q_4\)是一个带有\(r_{ds2}\)的源极退化电阻的电流源,利用之前的\((1.1.10)\)公式,可以得到:\[r_{out}......
  • 2023年8月陕西/深圳软考中级系统集成项目管理工程师报名
    系统集成项目管理工程师是全国计算机技术与软件专业技术资格(水平)考试(简称软考)项目之一,是由国家人力资源和社会保障部、工业和信息化部共同组织的国家级考试,既属于国家职业资格考试,又是职称资格考试。 系统集成项目管理工程师,属于软考三个级别中的“中级”。 考试合格者将颁发由中......
  • 模拟集成电路设计系列博客——1.1.1 基本电流镜
    1.1.1基本电流镜基本电流镜的结构如下图所示,两个晶体管都工作于饱和区,假设晶体管\(Q_1\)和\(Q_2\)完全匹配,并忽略晶体管有限输出阻抗的影响,那么\(Q_1\)和\(Q_2\)将会因为相同的栅压\(V_{gs}\)而输出相同的电流。然而如果考虑晶体管有限的输出阻抗,那么有着更大漏源电压的晶体管将......
  • chatglm2-6b模型在9n-triton中部署并集成至langchain实践
    一.前言近期,ChatGLM-6B的第二代版本ChatGLM2-6B已经正式发布,引入了如下新特性:①.基座模型升级,性能更强大,在中文C-Eval榜单中,以51.7分位列第6;②.支持8K-32k的上下文;③.推理性能提升了42%;④.对学术研究完全开放,允许申请商用授权。目前大多数部署方案采用的是fastapi+uvi......
  • 本地搭建spring-boot集成dubbo问题汇总
    1,java.lang.NoClassDefFoundError:com/alibaba/spring/util/PropertySourcesUtils<dependency> <groupId>com.alibaba.spring</groupId> <artifactId>spring-context-support</artifactId> <version>1.0.2</version>......
  • 【名师代练】带你玩转 RocketMQ,角逐「RocketMQ 首席评测官」
    RocketMQ背景ApacheRocketMQ诞生至今,一直服务于100%阿里集团内部业务、阿里云以及开源社区数以万计的企业客户。历经十多年双十一严苛流量验证的RocketMQ,承载了超过万亿级消息规模的洪峰压力。2021年ApacheRocketMQ更是进入全新5.0时代。立足于企业业务集成的核心场景,R......
  • 【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起实践RocketMQ的服务搭建及
    推荐超值课程:点击获取RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点1、能够保证严格的消息顺序2、提供丰富的消息拉取模式3、高效的订阅者水平扩展能力4、实时的消息订阅机制5、亿级消息堆积能力搭建一个双节点的RocketM环境背景:虚拟机:vmware12操作系统:ce......
  • 【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPushCons
    推荐超值课程:点击获取RocketMQ开源是使用文件作为持久化工具,阿里内部未开源的性能会更高,使用oceanBase作为持久化工具。在RocketMQ1.x和2.x使用zookeeper管理集群,3.x开始使用nameserver代替zk,更轻量级,此外RocketMQ的客户端拥有两种的操作方式:DefaultMQPushConsumer和DefaultMQPu......