首页 > 其他分享 >springboot 整合 recketMQ 详细步骤

springboot 整合 recketMQ 详细步骤

时间:2022-10-08 16:55:07浏览次数:59  
标签:recketMQ springboot 步骤 springframework 消息 import apache org rocketmq

前提 RocketMQ的部署环境可用

1 依赖包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>

2 yml的rocketMQ 的配置

rocketmq:
# 多个用;隔开
name-server: 10.22.15.61:9876;10.22.15.60:9876
producer:
# 生产组
group: cip-group
# 发送消息超时时间,默认 3000
sendMessageTimeout: 3000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 发送异步消息失败重试次数,默认2
retryTimesWhenSendAsyncFailed: 2



3 注入 RocketMQTemplate 就可以发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;



4 发送消息例子
package com.bs.it.esi.controller;

import com.alibaba.fastjson.JSON;
import com.bs.it.esi.config.IdWorker;
import com.bs.platform.core.annotations.WhiteList;
import com.bs.platform.core.vo.CommonResult;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/rocketmq")
@Api(tags = "rocket 消息发送测试")
public class RocketMqDemo {

@Autowired
private IdWorker idWorker;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private MQAdmin mqAdmin;

@GetMapping("send/{id}")
@WhiteList
@ApiOperation("发送消息")
public CommonResult send(@PathVariable("id") String id) throws UnsupportedEncodingException {
// MessageConst
// MessageBuilder.withPayload("123").setHeader("",""); 链式调用

for (int i = 0; i < 5; i++) {
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-7:AA", MessageBuilder.withPayload("mq发送消息测试AA" + i).build());
System.out.println(sendResult);
}
// rocketMQTemplate.send("rocket-topic-711",MessageBuilder.withPayload("mq发送消息测试a:").build());
// SendResult result = rocketMQTemplate.syncSend("rocket-topic-01", MessageBuilder.withPayload("mq发送消息测试a:"+i).build(), 3000);
return new CommonResult("发送成功!!");
}

@GetMapping("sendTransaction")
@WhiteList
@ApiOperation("事务消息")
public CommonResult sendTransaction(@RequestParam(name = "mes") String mes) throws UnsupportedEncodingException {
// 事务消息
// 事务消息就是将发送消息和本地数据库操作融合为同一个事务,二者要么都成功,要么都失败,不能出现一个操作成功另一操作失败的情况。
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("rocket-topic-7", MessageBuilder.withPayload(mes).build(),null);
System.out.println("-----"+result);
return new CommonResult("发送完成");
}

// 顺序消息
@GetMapping("sendselector")
@WhiteList
@ApiOperation("发送顺序消息")
public CommonResult sendselector(@RequestParam(name = "mes") String mes) {

rocketMQTemplate.setMessageQueueSelector((List<MessageQueue> mqs, Message msg, Object arg) -> {
int queuqInx = arg.toString().hashCode() % (mqs.size());
System.out.println("获取到的队列为:"+queuqInx+" msg "+ new String(msg.getBody()));
return mqs.get(queuqInx);
});
for (int i = 0; i < 5; i++) {
SendResult sendResult = rocketMQTemplate.syncSendOrderly("rocket-topic-7",
MessageBuilder.withPayload("发送的顺序消息" + i).build(), "2");
System.out.println("sendResult: "+sendResult);
}
return new CommonResult();
}

// 批量的消息
@GetMapping("batchSend")
@WhiteList
@ApiOperation("批量的消息")
public CommonResult batchSend(@RequestParam(name = "mes") String mes) {
List<org.springframework.messaging.Message> lists = new ArrayList<>();
for (int i = 0; i < 10; i++) {
org.springframework.messaging.Message<String> build = MessageBuilder.withPayload("批量发送消息的测试。。 :" + i).build();
lists.add(build);
}
List<List<org.springframework.messaging.Message>> partition = Lists.partition(lists, 300);
partition.forEach(li -> {
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-7", li, 20000);
System.out.println("--------------------------"+sendResult);
});
return new CommonResult();
}

// 控制消息消费的个数
@GetMapping("batchCount")
@WhiteList
@ApiOperation("控制消息消费的个数")
public CommonResult batchCount(@RequestParam(name = "mes") String mes) {
List<org.springframework.messaging.Message> lists = new ArrayList<>();
for (int i = 0; i < 10; i++) {
org.springframework.messaging.Message<String> build = MessageBuilder.withPayload("批量发送消息的测试。。 :" + i).build();
lists.add(build);
}
SendResult sendResult = rocketMQTemplate.syncSend("rocket-topic-7", lists, 10000);
System.out.println(sendResult);
return new CommonResult();
}

}


其中 事务消息需要另外的实现一个RocketMQLocalTransactionListener类

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener()
public class MYRocketMQLocalTransactionListener implements RocketMQLocalTransactionListener {

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("开始执行本地事务---");
System.out.println("message: "+message);
if (new String((byte[]) message.getPayload()).length() > 5){
return RocketMQLocalTransactionState.COMMIT;
}else if (message.getPayload().toString().length() == 8){
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("消息的回查----");
return RocketMQLocalTransactionState.COMMIT;
}
}

消费者监听消息:

import com.bs.platform.core.redis.RedisCacheUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(topic = "fdc_draw_chart_topic",
consumerGroup = "fdc-group")
// CONCURRENTLY 并发(消息无序) ORDERLY 单线程(有序)
public class RocketConsumer implements RocketMQListener<MessageExt> {

@Autowired
private RedisCacheUtils redisCacheUtils;
@Override
public void onMessage(MessageExt messageExt) {
// Object aaa = redisCacheUtils.get("aaa");
// if (aaa == null){
// redisCacheUtils.set("aaa","1",10000);
// log.info("程序出错,消息从新拉取消费");
// throw new RuntimeException("出错了");
// }

log.info("----------------------------------------------------------------- :"+new String(messageExt.getBody()).toString());
}
}


更加细致的消息发送和监听 可以参考官网 欢迎交流









标签:recketMQ,springboot,步骤,springframework,消息,import,apache,org,rocketmq
From: https://www.cnblogs.com/niCong/p/16769466.html

相关文章

  • SpringBoot Enum 传参行为学习记录
    在学习springboot枚举值传参序列化时,发现以下行为。@Data@ToStringpublicclassEnumRequest{@ApiModelProperty("订单号")privateStringorderSn;@A......
  • springboot中mybatis-plus简单配置与使用
    依赖mybatis-plus与mybatis依赖不兼容,同时使用会导致报错<dependency><groupId>com.oracle</groupId><artifactId>ojdbc6</artifactId><version>${or......
  • SpringBoot实战派读书笔记---响应式编程
    1.什么是WebFlux?WebFlux不需要ServletAPI,在完全异步且无阻塞,并通过Reactor项目实现了ReactorStreams规范。WebFlux可以在资源有限的情况下提高系统的吞吐量和......
  • Springboot集成阿里云短信
    目录1前言2准备工作2.1了解流程2.2配置信息2.3短信签名和模板2.3.1签名2.3.2模板2.3.3存入数据库3SDK4集成Springboot4.1集成4.2测试5后记1前言​ 线上系......
  • 有这10大原则7大步骤,什么电路图都能看懂
    01​电路简化的基本原则初中物理电学中的复杂电路可以通过如下原则进行简化:☀第一:不计导线电阻,认定R线≈0。有电流流过的导线两端电压为零,断开时开关两端可以测得电压(电路中......
  • SpringBootServletInitializer的作用
    https://blog.csdn.net/luckyzsion/article/details/81135438?spm=1001.2101.3001.6650.7&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendF......
  • 从SpringBoot启动,阅读源码设计
    目录一、背景说明二、SpringBoot工程三、应用上下文四、资源加载五、应用环境六、Bean对象七、Tomcat服务八、事件模型九、配置加载十、数据库集成十一、参考源码服务启......
  • SpringBoot加载自动配置类的流程
    自定义过starter的同学应该都知道,自动配置类需要用EnableAutoConfiguration注解修饰,并且需要将自动配置类配置在spring.factories中。但自动配置类是如何被SpringBoot加......
  • 整合springboot
    案例1@RestControllerpublicclassUserInfoHandler{/***编程式校验*@paramuserInfo*@return*/@GetMapping("/addUser")......
  • 使用spring security 简单步骤
    1.添加springsecurity依赖,即在配置文件中添加依赖。2.添加springsecurity配置文件,spring-security.xml;3.在web.xml加载springsecurity配置文件及springSecurity......