springboot/springcloud整合rabbitmq实战
简单例子
生产者
创建项目,引入jar
<!--springboot整合rabbitmq包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.15</version>
</dependency>
配置Rabbitmq数据源
spring:
#spring整合rabbtitmq数据库配置
rabbitmq:
username: root
virtual-host: /root/
password: tiger
host: 192.168.170.30
port: 5672
编写配置交换-队列代码
package com.aaa.sbm.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @FileName: RabbitmqConfig
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/11 9:23
* @Version: 1.0.0
*/
@Configuration //相当于rabbitmq-config.xml <beans>
public class RabbitmqConfig {
//定义常量,交换机的名称
public static final String EXCHANGE_NAME="exchange_topic_1";
//队列名称
private static final String QUEUE_NAME="queue12";
/**
* 配置交换机
* @return
*/
@Bean
public Exchange exchangeTopic(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
/**
*配置队列
* @return
*/
@Bean
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 配置绑定
* @return
*/
@Bean
public Binding bindExchangeToQueue(){
//把交换机和队列绑定时一定写绑定key = info.#
return BindingBuilder.bind(queue()).to( exchangeTopic()).with("info.#").noargs();
}
}
编写发送消息代码
package com.aaa.sbm.test;
import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @FileName: TestRabbitmqSendMsg
* @Description: 测试rabbitmq发送消息的类
* @Author: zhz
* @CreateTime: 2024/12/11 9:34
* @Version: 1.0.0
*/
@SpringBootTest
public class TestRabbitmqSendMsg {
//使用spring支持的设计模式-模版模式
// spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMsg(){
//定义发送字符串
String sendMsg = "hello.qy178!!!!";
// 参数1,交换机名称 参数2 routingKey 参数3 发送字符串对象
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
}
}
测试
先打开rabbit服务器,启动rabbitmq服务
systemctl start rabbitmq-server
消息者
创建项目,引入jar
<!--springboot整合rabbitmq包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.15</version>
</dependency>
配置Rabbitmq数据源
spring:
#spring整合rabbtitmq数据库配置
rabbitmq:
username: root
virtual-host: /root/
password: tiger
host: 192.168.170.30
port: 5672
编写接受消息监听器
复杂例子
需求
生产者有批量定时处理的任务,需要大量部门信息批量向中间件中存储,然后消费要进行消费处理。
生产者
package com.aaa.sbm.task;
import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @FileName: BatchScheduledHandlerDeptInfo
* @Description:
* @Author: zhz
* @CreateTime: 2024/12/11 10:30
* @Version: 1.0.0
*/
@Component
@EnableScheduling //开启定时功能
public class BatchScheduledHandlerDeptInfo {
@Resource
private RabbitTemplate rabbitTemplate;
//原子Long类型 线程安全的
private AtomicInteger atomicInteger = new AtomicInteger();
/**
* 每个5秒钟产生20条部门信息,并发送到消息中间中,使用多线程提高发送速度
*/
// * 秒 * 分 * 时 * 日 * 月 * 周
// / 每隔 - 范围 , 枚举
// */5 * * * * * 每隔5秒执行一次
// 5-15 * * * * * 任意一分钟的第5秒,第6秒。。。第15分别执行一次
// 5,10,15 * * * * * 任意一分钟的第5秒,第10秒,第15分别执行一次
// 0 30 6 01 * * 每月一周6.30执行一次
@Scheduled(cron = "*/5 * * * * *")
public void handlerDept(){
//使用线程池的固定线程池大小类型
ExecutorService executorService = Executors.newFixedThreadPool(20);
//启动线程
for (int i = 0; i < 20; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
Integer i = atomicInteger.getAndIncrement();
Dept dept = new Dept(i, "开发" + i + "部", "郑州" + i);
//使用fastjson把对象转为json发送,方便消费者读取
String deptJson = JSON.toJSONString(dept);
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.dept",deptJson);
}
});
}
System.out.println("每隔5秒,发送20个对象到rabbitmq中!!!");
}
}
消息者
package com.aaa.sbm.util;
import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @FileName: RabbitmqConsumerUtil
* @Description: Rabbitmq消费者工具类,用来监听并处理消息
* @Author: zhz
* @CreateTime: 2024/12/11 9:57
* @Version: 1.0.0
*/
@Component
public class RabbitmqConsumerUtil {
/**
* 监听方法
* @param message
*/
@RabbitListener(queues = "${queue.name}") //获取配置文件中配置的队列名称
public void handlerMsg(Message message){
//底层使用序列化过的字节对象进行传输
byte[] messageBodyByteArray = message.getBody();
String deptJson = new String(messageBodyByteArray);
System.out.println("消费的消息为:"+deptJson);
Dept dept = JSON.parseObject(deptJson, Dept.class);
System.out.println("dept对象:"+dept);
}
}
如何保证消息不丢失
示意图
生产者保证消息发送到交换机(confirm机制)
配置
#发布确定类型 none 不开启 不会确定回调 simple 同步 必须等交换机给我发送确认后,再处理业务 correlated 异步 速度快
publisher-confirm-type: correlated
代码实现
package com.aaa.sbm.test;
import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @FileName: TestRabbitmqSendMsg
* @Description: 测试rabbitmq发送消息的类
* @Author: zhz
* @CreateTime: 2024/12/11 9:34
* @Version: 1.0.0
*/
@SpringBootTest
public class TestRabbitmqSendMsgComfirm {
//使用spring支持的设计模式-模版模式
// spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
@Resource
private RabbitTemplate rabbitTemplate;
/* @Resource
private DeptDao deptDao;*/
@Test
public void testSendMsg(){
//deptDao.deleteById(1);
//定义发送字符串
String sendMsg = "hello.qy178!!!!";
//实例化确认回到接口
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// CorrelationData correlationData 相关数据,有一个唯一识别的ID和其他信息, boolean ack, @Nullable String cause
System.out.println("识别ID:"+(correlationData==null?"":correlationData.getId()));
//ack应答 如果为true说明消息到交换中 false出错没到
if(ack){
System.out.println("消息已经发送到交换机中!");
}else {
System.out.println("出现错误,错误原因为:"+cause);
}
}
};
//调用确定回调
rabbitTemplate.setConfirmCallback(confirmCallback);
// 参数1,交换机名称 参数2 routingKey 参数3 发送字符串对象 exchange_top_1111
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
// 打印提示
System.out.println("消息发送完毕!");
}
}
生产者保证消息发送到队列(returnsCallback机制)
配置
#开启返回回调
publisher-returns: true
代码实现
package com.aaa.sbm.test;
import com.aaa.sbm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @FileName: TestRabbitmqSendMsg
* @Description: 测试rabbitmq发送消息的类
* @Author: zhz
* @CreateTime: 2024/12/11 9:34
* @Version: 1.0.0
*/
@SpringBootTest
public class TestRabbitmqSendMsgReturnsCallback {
//使用spring支持的设计模式-模版模式
// spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
@Resource
private RabbitTemplate rabbitTemplate;
/* @Resource
private DeptDao deptDao;*/
@Test
public void testSendMsg(){
//deptDao.deleteById(1);
//定义发送字符串
String sendMsg = "hello.qy178!!!!";
RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("确定到队列的信息是:"+new String(returnedMessage.getMessage().getBody()));
System.out.println("交换机:"+returnedMessage.getExchange());
System.out.println("路由键:"+returnedMessage.getRoutingKey());
System.out.println("返回码:"+returnedMessage.getReplyCode());
System.out.println("返回字符串:"+returnedMessage.getReplyText());
}
};
//发送前,设置消息确定到队列的回调
rabbitTemplate.setReturnsCallback(returnsCallback);
// 参数1,交换机名称 参数2 routingKey 参数3 发送字符串对象
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"info.test.msg",sendMsg);
// 打印提示
System.out.println("消息发送完毕!");
}
}
消费者保证消息成功消费后再删除
概述
默认情况下,消息自动确认,消费者一旦拿到消息,确定消费,消费就可以删除了。但是消费者拿到消息后在处理业务过程中,可能出现错误,使用消息没有正确的处理完业务,也属于消息不可靠。这种情况下,开启手动确认消息,写代码完成业务处理成功后再确认消息成功消费,再删除,如果出现错误,不确定消息成功消费,并把消息返回队列。
配置
listener:
simple: #支持1个消费者
# 确认类型
acknowledge-mode: manual #none 禁止使用消息确认 auto 自动确认 manual手动确认
#direct: #支持多个消费者
#acknowledge-mode: manual
代码实现
package com.aaa.sbm.util;
import com.aaa.sbm.entity.Dept;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @FileName: RabbitmqConsumerUtil
* @Description: Rabbitmq消费者工具类,用来监听并处理消息
* @Author: zhz
* @CreateTime: 2024/12/11 9:57
* @Version: 1.0.0
*/
@Component
public class RabbitmqConsumerUtil {
/**
* 监听方法
* @param message
*/
/*@RabbitListener(queues = "${queue.name}") //获取配置文件中配置的队列名称
public void handlerMsg(Message message){
//底层使用序列化过的字节对象进行传输
byte[] messageBodyByteArray = message.getBody();
String deptJson = new String(messageBodyByteArray);
System.out.println("消费的消息为:"+deptJson);
Dept dept = JSON.parseObject(deptJson, Dept.class);
System.out.println("dept对象:"+dept);
}*/
/**
* 接受消息的方法,手动保证数据可靠
* @param message
* @param channel
*/
@RabbitListener(queues = "${queue.name}") //获取配置文件中配置的队列名称
public void handlerMsg(Message message, Channel channel){
//获取消息唯一数字标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//底层使用序列化过的字节对象进行传输
byte[] messageBodyByteArray = message.getBody();
System.out.println("消费的消息为:"+messageBodyByteArray);
//执行业务 //1 ,2,3,4....100
System.out.println(1/0);
//...
//...
//basicAck手动确认收到信息的方法
//long deliveryTag 消息唯一数字标识,
// boolean multiple 许多的 小于当前deliveryTag所有消息都确认,
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
e.printStackTrace();
////basicNack Nack=Not ack 手动不确认收到信息的方法
//long deliveryTag 消息唯一数字标识,
// boolean multiple 许多的 小于当前deliveryTag所有消息都确认
// boolean requeue 重回队列
try {
channel.basicNack(deliveryTag,false,true);
//channel.basicNack(deliveryTag,false,false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
死信队列
概念
死信(Dead Letter),是rabbitmq中的一种机制,因为某种原因造成消息丢失,丢失的消息就称为死信。把死信放入一个队列,不让它丢失,这个队列称为死信队列。通常死信队列会和一个交换机进行绑定,这个交换机称为死信交换机(Dead Letter Exchange),又叫DLX。
原因
示意图
代码实现
package com.aaa.sbm.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @FileName: RabbitmqDeadLetterQueueConfig
* @Description: 死信队列配置
* @Author: zhz
* @CreateTime: 2024/12/12 9:37
* @Version: 1.0.0
*/
@Configuration
public class RabbitmqDeadLetterQueueConfig {
public static final String BUSINESS_EXCHANGE_NAME="businessExchangeA";
private static final String DEAD_LETTER_EXCHANGE_NAME="deadLetterExchangeA";
private static final String DEAD_LETTER_QUEUE_NAME="deadLetterQueueA";
private static final String BUSINESS_QUEUE_NAME="businessQueueA";
private static final String ROUTING_KEY="deadLetter.info.#";
/**
* 配置业务交换机
* @return
*/
@Bean
public Exchange businessExchange(){
return ExchangeBuilder.topicExchange(BUSINESS_EXCHANGE_NAME).build();
}
/**
* 配置业务队列
* @return
*/
@Bean
public Queue businessQueue(){
//实例化参数集合
Map<String, Object> arguments = new HashMap<>();
//让当前队列绑定死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key",ROUTING_KEY);
//实例化队列,传递参数
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(arguments).build();
}
/**
* 把业务交换机和业务队列绑定
* @return
*/
@Bean
public Binding businessExchangeToQueueBind(){
return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY).noargs();
}
/**
* 配置死信交换机
* @return
*/
@Bean
public Exchange deadLetterExchange(){
return ExchangeBuilder.topicExchange(DEAD_LETTER_EXCHANGE_NAME).build();
}
/**
* 配置死信队列
* @return
*/
@Bean
public Queue deadLetterQueue(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).build();
}
/**
* 把死信交换机和死信队列绑定
* @return
*/
@Bean
public Binding deadLetterExchangeToQueueBind(){
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY).noargs();
}
}
测试
测试:消费者不确认消息也不重回队列
生产者
package com.aaa.sbm.test;
import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.config.RabbitmqDeadLetterQueueConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @FileName: TestRabbitmqSendMsg
* @Description: 测试rabbitmq发送消息的类
* @Author: zhz
* @CreateTime: 2024/12/11 9:34
* @Version: 1.0.0
*/
@SpringBootTest
public class TestRabbitmqDeadLetterSendMsg {
//使用spring支持的设计模式-模版模式
// spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
@Resource
private RabbitTemplate rabbitTemplate;
/* @Resource
private DeptDao deptDao;*/
@Test
public void testSendMsg(){
//deptDao.deleteById(1);
//定义发送字符串
String sendMsg = "hello.qy178!!!!";
// 参数1,交换机名称 参数2 routingKey 参数3 发送字符串对象
rabbitTemplate.convertAndSend(RabbitmqDeadLetterQueueConfig.BUSINESS_EXCHANGE_NAME,"deadLetter.info.test",sendMsg);
// 打印提示
System.out.println("消息发送完毕!");
}
}
消费者
//basicNack 不确认 boolean requeue=false 不重回队列
channel.basicNack(deliveryTag,false,false);
测试:消息超时丢失。
生产者
//实例化参数集合
Map<String, Object> arguments = new HashMap<>();
//设置业务队列存放消息后,消息的过期时间 x-message-ttl =time to live 单位是:毫秒
arguments.put("x-message-ttl",20000);
消费者
消费者20秒之内一定不要运行
测试:消息超出了队列的存放长度丢失。
生成者
设置长度
//实例化参数集合
Map<String, Object> arguments = new HashMap<>();
//设置业务队列存放消息后,消息的过期时间 x-message-ttl =time to live 单位是:毫秒
// arguments.put("x-message-ttl",20000);
//设置业务队列存放消息的最大长度10
arguments.put("x-max-length",10);
循环发送信息,发送数量大于长度
package com.aaa.sbm.test;
import com.aaa.sbm.config.RabbitmqConfig;
import com.aaa.sbm.config.RabbitmqDeadLetterQueueConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
/**
* @FileName: TestRabbitmqSendMsg
* @Description: 测试rabbitmq发送消息的类
* @Author: zhz
* @CreateTime: 2024/12/11 9:34
* @Version: 1.0.0
*/
@SpringBootTest
public class TestRabbitmqDeadLetterSendMsg {
//使用spring支持的设计模式-模版模式
// spring把对rabbitmq的操作封装成一个模版,里面提供了一系列的方法,可以直接使用
@Resource
private RabbitTemplate rabbitTemplate;
/* @Resource
private DeptDao deptDao;*/
@Test
public void testSendMsg(){
for (int i = 0; i < 15; i++) {
//deptDao.deleteById(1);
//定义发送字符串
String sendMsg = "hello.qy178!!!!"+i;
// 参数1,交换机名称 参数2 routingKey 参数3 发送字符串对象
rabbitTemplate.convertAndSend(RabbitmqDeadLetterQueueConfig.BUSINESS_EXCHANGE_NAME,"deadLetter.info.test",sendMsg);
}
// 打印提示
System.out.println("消息发送完毕!");
}
}
消费者
消费者不要运行
延迟队列
概念
延迟队列(Delay Queue),生成者生产消息后,不让消费者立马消费,而是过一段时间再让消费者消费,这种机制就是延迟队列。实现方式可以借助死信队列实现(本节就是这么做的),也可以通过插件实现(自己找帖子看下)。
应用场景
电商项目中最常见的订单超时功能
会议提醒功能
网站注册后,提醒登录
等等
示意图
代码实现
生产者
把死信队列代码中的businessExchange和businessQueue改为delayExchage和DelayQueue并设置DelayQueue的存储消息的过期时间
消费者
监控死信队列即可
集群
标签:队列,springframework,rabbitmq,org,import,com,public From: https://www.cnblogs.com/xiaomubupi/p/18643862