首页 > 其他分享 >springboot结合rocketmq的使用以及遇到的问题

springboot结合rocketmq的使用以及遇到的问题

时间:2024-03-18 15:33:50浏览次数:21  
标签:springboot 遇到 user org import public rocketmq String

rocketmq是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

首先需要下载安装rocketmq:

1.官网 https://rocketmq.apache.org/zh/docs/quickStart/01quickstart

2.点击下载二进制包

下载完后进行解压,根据文件夹的路径进行下面的环境变量配置

 

3.配置环境变量

ROCKETMQ_HOME:E:\rocketmq-all-5.2.0\rocketmq-all-5.2.0

 4.启动mqnamesrv

在bin目录下双击mqnamesrv.cmd

 窗口不闪退,这样就是启动成功,不要关闭窗口

 5.启动mqbroker(有顺序,先需要启动mqnamesrv)

同样在bin的目录里,我们输入cmd

输入 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

注意:在这里我遇到了一个问题

错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_311\lib\dt.jar;C:\Program

 根据度娘指引,需要修改runbroker.cmd文件种  %CLASSPATH%为其加上双引号

 保存修改,重新运行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

会弹出一个新的窗口,运行成功 不要关闭窗口

到此,rocketmq服务就已经全部启动了

 

接下来开始把rocketmq引入springboot

文件目录(我们测试程序只需要这四个文件):

1.配置pom

<!--        rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>

<!-- 还有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,会有安全漏洞) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
    </dependency>

 2.application.properties配置

#rocketmq
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=Pro_Group
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-async-failed=3
rocketmq.producer.retry-times-when-send-failed=3

 3.编写实体类

package com.gykg.yizhichun.entity;

import lombok.Data;

@Data
public class User {
private String id;
private String name;
private Integer age;
private String sex;
private String desc;
}

4.编写生产者类

package com.gykg.yizhichun.producer;

import com.alibaba.fastjson.JSON;
import com.gykg.yizhichun.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

@Slf4j
@Component
public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;

// 建议正常规模项目统一用一个TOPIC
private static final String topic = "RLT_TEST_TOPIC";

// 直接注入使用,用于发送消息到broker服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用
* 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
*/
public void send(User user) {
rocketMQTemplate.convertAndSend(topic + ":tag1", user);
// rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行
}

/**
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* (msgBody也可以是对象,sendResult为返回的发送结果)
*/
public SendResult sendMsg(String msgBody) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}

/**
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
*/
public void sendAsyncMsg(String msgBody) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
}

@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
log.info("【sendMsg】sendResult={}", "发送异常" + throwable.getMessage());
}
});
}

/**
* 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String msgBody, int delayLevel) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}

/**
* 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
*/
public void sendOneWayMsg(String msgBody) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
}

/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
public SendResult sendTagMsg(String msgBody) {
return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());
}

/***
* 服务生产者,顺序消息
* 把消息确保投递到同一条queue
* 保证了消息的顺序性
*/
public void sendFIFOMsg(List<User> users) {
//顺序消息
//选择器规则构建
rocketMQTemplate.setMessageQueueSelector((list, message, o) -> {
int id = Integer.valueOf((String) o);
int hash = (id % list.size());
return list.get(hash);
});
if (!CollectionUtils.isEmpty(users)) {
for (User user : users) {
MessageBuilder.withPayload(users.toString()).build();
rocketMQTemplate.sendOneWayOrderly(topic+":sendFIFOMsg", user, String.valueOf(user.getId()));
}
}
}
}

5.编写消费者类

package com.gykg.yizhichun.consumer;

import com.alibaba.fastjson.JSON;
import com.gykg.yizhichun.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Slf4j
@Component
public class MQConsumerService {
// Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
// selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")
public class ConsumerSend implements RocketMQListener<User> {
// 监听到消息就会执行此方法
@Override
public void onMessage(User user) {
log.info("tag1监听到消息:user={}", JSON.toJSONString(user));
}
}


// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
// 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two",selectorExpression = "xxx")
public class ConsumerSend2 implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
log.info("ConsumerSend2监听到消息:str={}", str);
}
}

// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("tag2监听到消息:msg={}", msg);
}
}

/**
* 消费者顺序消费消息
* 顺序消费
*/
@Service
@RocketMQMessageListener(consumerGroup = "Orderly-Consumer", topic = "RLT_TEST_TOPIC",selectorExpression = "sendFIFOMsg", consumeMode = ConsumeMode.ORDERLY)
public class OrderlyConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("线程"+Thread.currentThread()+"内容为:"
+ new String(message.getBody())+
"队列序号:"+message.getQueueId()+",消息msgId:"+message.getMsgId());
}
}
}

6.编写控制器类

package com.gykg.yizhichun.controller;

import com.gykg.yizhichun.entity.User;
import com.gykg.yizhichun.producer.MQProducerService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/rocketmq")
public class MqController {
@Autowired
private MQProducerService mqProducerService;

@GetMapping("/send")
public void send() {
User user = new User();
user.setAge(28);
user.setName("曹震");
user.setSex("男");
mqProducerService.send(user);
}

@GetMapping("/sendTag")
public ResponseEntity<SendResult> sendTag() {
SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
return ResponseEntity.ok(sendResult);
}

@GetMapping("/sendMsg")
public ResponseEntity<SendResult> sendMsg() {
SendResult sendResult = mqProducerService.sendMsg("曹震测试");
return ResponseEntity.ok(sendResult);
}

@GetMapping("/sendFIFOMsg")
public void sendFIFOMsg() {
List<User> users = new ArrayList<>();
User user = new User();
user.setId("1");
user.setSex("男");
user.setName("曹震");
user.setAge(28);
user.setDesc("创建订单");
users.add(user);

User user1 = new User();
user1.setId("2");
user1.setSex("男");
user1.setName("贾耀旗");
user1.setAge(25);
user1.setDesc("创建订单");
users.add(user1);

mqProducerService.sendFIFOMsg(users);
}
}

然后就是测试接口拉,这里要注意的是:不要忘记把producer和consumer这两个包加入扫描,不然是会获取不到bean 的

我们运行起来,测试 /rocketmq/sendFIFOMsg 接口,输出这个,测试成功

 巨人的肩膀:https://blog.csdn.net/qq_23126581/article/details/132496439

https://blog.csdn.net/Messy_Cat/article/details/124108281

 

标签:springboot,遇到,user,org,import,public,rocketmq,String
From: https://www.cnblogs.com/luzanzan/p/18080514

相关文章

  • springboot有事务隔离级别
    springboot有五种隔离级别1、DEFAULT:spring默认的事务隔离级别,以连接的数据库事务隔离级别为准;2、READ_UNCOMMITTED:读未提交,该隔离级别事务可以看到其他事务中未提交的数据。因为可以读到别人未提交的数据,如果对方事务发生回滚,容易导致脏读。3、READ_COMMITTED:读已提交,该隔离级......
  • Vue.js+SpringBoot开发企业项目合同信息系统
    目录一、摘要1.1项目介绍1.2项目录屏二、功能模块2.1数据中心模块2.2合同审批模块2.3合同签订模块2.4合同预警模块2.5数据可视化模块三、系统设计3.1用例设计3.2数据库设计3.2.1合同审批表3.2.2合同签订表3.2.3合同预警表四、系统展示五、核心代码5.1......
  • Vue.js+SpringBoot开发独居老人物资配送系统
    目录一、摘要1.1项目介绍1.2项目录屏二、功能模块三、系统展示四、核心代码4.1查询社区4.2新增物资4.3查询物资4.4查询物资配送4.5新增物资配送五、免责说明一、摘要1.1项目介绍基于JAVA+Vue+SpringBoot+MySQL的独居老人物资配送系统,包含了社区档案、......
  • 基于springboot+vue.js的在线考试系统(附带文章和源代码设计说明文档ppt)
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我成功案例代码参考数据库参考源码获取前言......
  • 基于springboot+vue.js的旅游管理系统(附带文章和源代码设计说明文档ppt)
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我成功案例代码参考数据库参考源码获取前言......
  • 基于springboot+vue.js的失物招领平台(附带文章和源代码设计说明文档ppt)
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我成功案例代码参考数据库参考源码获取前言......
  • SpringBoot项目轻松集成Sentinel:熔断限流实战及核心代码解析
    一、引言Sentinel是阿里巴巴开源的一款轻量级流量控制组件,提供丰富的微服务流量控制能力,包括流量控制、熔断降级、系统负载保护等。本文将带你一步步实现在SpringBoot项目中集成Sentinel,实现服务的熔断限流,并给出关键代码示例及注意事项。二、集成Sentinel步骤添加依赖在......
  • 在Tomcat下部署若依框架前后端分离SpringBoot+Vue3项目
     一、后端打包在ruoyi项目的bin目录下执行package.bat打包Web工程,生成war/jar包文件。然后会在项目下生成target文件夹包含war或jar1.jar包使用命令行执行:java–jarruoyi-admin.jar或者执行脚本:ruoyi/bin/run.bat注意事项【jar包部署,需要使用nginx代理,前端项目中的代理仅......
  • 基于SpringBoot的“乐校园二手书交易管理系统”的设计与实现(源码+数据库+文档+PPT)
    基于SpringBoot的“乐校园二手书交易管理系统”的设计与实现(源码+数据库+文档+PPT)开发语言:Java数据库:MySQL技术:SpringBoot工具:IDEA/Ecilpse、Navicat、Maven系统展示系统首页界面图用户注册界面图二手图书界面图留言反馈界面图个人中心界面图管理员......
  • 基于SpringBoot的“书籍学习平台”的设计与实现(源码+数据库+文档+PPT)
    基于SpringBoot的“书籍学习平台”的设计与实现(源码+数据库+文档+PPT)开发语言:Java数据库:MySQL技术:SpringBoot工具:IDEA/Ecilpse、Navicat、Maven系统展示平台首页界面图用户注册界面图付费专区界面图个人中心界面图后台登录界面图管理员功能界面图......