RabbitMQ入门
1、什么是MQ
- 消息队列(Message Queue),是基础数据结构中 “先进先出” 的一种数据结构。
- 一般用来解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。
2、MQ的作用
1、流量消峰
当有一家商店,最多可以访问100人访问,这时人流量特别大时,如果所有人一起去访问商店的话。显然这个商店会无法正常营业了(服务器宕机)。要想正常的营业的话就需要控制人流量,让后来的人排队 ,是商店里面的人正常购买商品。其中MQ就可以完成这个‘排队的动作’(缓存)。但是这样也会带来缺点,就是顾客的等待时间会变长。但是这比无法营业要好。
2、应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
MQ安装
在Linux上安装
- 在windows上下载erlang安装包和rabbitmq-server安装包(使用XFTP 或者是其他方式上传到LInux系统上)
- 解压erlang安装包
[root@centos79 rabbitmq]# rpm -Uvh erlang-25.3.2.5-1.el7.x86_64.rpm
yum install erlang
-
安装socat
yum install -y socat
-
解压rabbitmq-server
rpm -Uvh rabbitmq-server-3.13.0.beta.5-1.el8.noarch.rpm
- 启动rabbitmq-server
systemctl start rabbitmq-server
- 查看rabbitmq-server状态
systemctl status rabbitmq-server
- 将rabbitmq-servers实现开机运行
systemctl enable rabbitmq-server
产生问题
解决方法:
vim /etc/hostname #查看自己的主机名
ifconfig #查看自己IP
vi /etc/host 在后面加上 192.168.XXX.XXX(自己的IP地址) 主机名称
systemctl enable rabbitmq-server
- 暂停rabbitmq-server
systemctl stop rabbitmq-server
RabbitmqWeb管理或授权操作
默认情况下是没有安装rabbitmqWeb的插件,需要安装才会生效
rabbitmq-plugins enable rabbitmq_management
访问短号是15672:
注意:如果是使用的远程服务器或者是在本机电脑上使用虚拟机的话需要开启15672端口
说明:rabbitmq有一个默认的账号的密码是:
guest
默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。
新建一个用户
rabbitmqctl add_user <用户名称> <用户密码>
查看所有的用户
rabbitmqctl list_users
设置角色分配操作权限
rabbitmqctl set_user_tags 用户名称 用户级别
权限级别: 从上到下 权限降低
- adminstrator 可以登录控制台、查看所有的信息,可以对rabbitmq进行管理
- monitoring 监控者 登录控制台,查看所有的信息
- policymaker 策略制定者 登录控制台,指定策略
- managment 普通管理员,登录控制台。
设置权限
rabbitmqctl set_permissions -p / <usernamea> ".*"".*" ".*"
使用docker安装
docker环境安装
-
卸载旧的docker
sudo yum remove docker \ docker-client \ docker-client-latest \ docker-common \ docker-latest \ docker-latest-logrotate \ docker-logrotate \ docker-engine
-
安装docker仓库
-
安装dcoker所需依赖包
sudo yum install -y yum-utils
-
安装docker包管理地址
sudo yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo
-
-
安装docker引擎 客户端 以及容器
sudo yum install docker-ce docker-ce-cli containerd.io
-
启动docker
sudo systemctl start docker # 启动docker之后可通过下面命令查看当前docker版本 docker -v # 查看当前docker的镜像 docker images
-
设置docker自动启动
sudo systemctl enable docker
-
使用阿里云的容器镜像服务 -> 镜像工具 -> 镜像加速器
# 创建文件夹 sudo mkdir -p /etc/docker # 写入文件地址 sudo tee /etc/docker/daemon.json <<-'EOF'{"registry-mirrors": ["https://oibhuvgh.mirror.aliyuncs.com"]}EOF # 重新加载配置文件 sudo systemctl daemon-reload # 重启 dockersudo systemctl restart docker
docker安装rabbitmq
docker run -d --restart always --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
参数说明:
参数解释:
- `-d`:设置程序后台运行。
- `--restart always`:设置服务自启动。
- `--name`:指定运行后的容器名称。
- `-e`:设置环境。
这里主要设置账号密码为admin。
默认账号密码为guest ,只能在 localhost访问,由于需要外网访问,所以创建admin用户进行登录。
- `-p`:设置公网IP地址的端口号对应容器内部的端口号。
- `rabbitmq:management`:安装可视化管理组件。
使用MQ
MQ的工作原理
名词解释
Broker :即RabbitMQ的实体服务器。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。
Exchange :消息交换机。指定消息按照什么规则路由到哪个队列Queue。
Queue :消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
Binding :绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
Routing Key:路由关键字。Exchange根据Routing Key进行消息投递。定义绑定时指定的关键字称为Binding Key。
Vhost:虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。
Producer:消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
Consumer:消息消费者。消息的接收者,一般是独立的程序。
Connection:Producer 和Consumer 与Broker之间的TCP长连接。
Channel:消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。在RabbitMQ Java Client API中,channel上定义了大量的编程接口。
为什么rabbitmq是基于Channel而不是基于Connection?
Connection只能为一个生产者(Producer)所服务可以为多个消费者(Consumer)服务。
因为Connection建立连接需要经过三次握手和四次挥手,这个过程是非常的消耗时间的,降低了性能。Rabbitmq通过将创建connection这个长连接来提高性能,Connection可以创建多个Channel,每一个channel都可以发送消息。实现多线程并发的效果。我们可以将Connection和Channel看作是进程和线程。
Virtual Host(虚拟主机)相当于是数据库,实现一个分区的效果,其中Exchange(交换机)相当于是数据库里面的表,不同的交换有着不同效果和类型
创建java开发环境
导入rabbitmq依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
RabbitMQ的六种工作模式以及代码实现
官方文档:https://www.rabbitmq.com/getstarted.html
实现简单模式
生产者(Producer)
package Simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
//所有的中间件技术都是基于TCP/IP协议的基础之上构建新的协议规范,只不过rabbitmq遵循的是AMQP协议
//基于TCP/IP协议所以一定离不开的是IP
//创建一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置工厂IP建立于Rabbitmq的连接
factory.setHost("127.0.0.1");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
//设置端口
factory.setPort(5672);
//通过连接工厂建立连接
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection("生产者");
//通过连接及建立通道
channel = connection.createChannel();
//通过通道声明队列,创建交换机,绑定关系,路由key,发送消息,和接收消息
String queueName="queue1";
/**
* param1: queue:String 队列的名称
* param2: durable:boolean 队列持久化设置 true表示持久化 false 表示不会持久化 持久化会存入到磁盘中 不持久化存放在内存中,一般不持久化
* param3: exclusive:boolean 是否表现出排他性(是否只对一个消费者消费(是否共享))
* param4: autoDelete:boolean 是否自动删除 随着最后一个消费者消费完毕之后是否自动的删除
* param5: arguments:map 携带附加参数
*/
channel.queueDeclare(queueName,false,false,true,null);
//准备好发送的消息
String msg="Hello Rabbitmq";
/**
* param1: exchange String 交换机
* param2: routingKey String 路由(队列名称)
* param3: props BasicProperties 一些参数
* param4: boby byte[] 消息,注意这里的类型是byte[]
*/
channel.basicPublish("",queueName,null,msg.getBytes());
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
//关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//关闭连接
if(connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
注意: channel.basicPublish("",queueName,null,msg.getBytes());这里面的第一参数表示的是交换机,我们在这里并没有指定交换机,那么是不是我们可以有没有交换机的队列呢?不是的,如果没有指定的交换机的话,会有一个默认的交换机进行绑定
实现一个消费者
package Simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
//所有的中间件技术都是基于TCP/IP协议的基础之上构建新的协议规范,只不过rabbitmq遵循的是AMQP协议
//基于TCP/IP协议所以一定离不开的是IP
//创建一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置工厂IP建立于Rabbitmq的连接
factory.setHost("127.0.0.1");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
//设置端口
factory.setPort(5672);
//通过连接工厂建立连接
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection("消费者");
//通过连接及建立通道
channel = connection.createChannel();
//通过通道声明队列,创建交换机,绑定关系,路由key,发送消息,和接收消息
String queueName="queue1";
/**
* param1: queue String 队列名称
* param2: autoAck boolean 自动应答 true表示自动的应答 false
* param3: 消费者未成功的消费是的回调
* param4: 取消时的回调
*/
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(new String(message.getBody(),"UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接收消息失败");
}
}
);
} catch (Exception e) {
e.printStackTrace();
}finally {
//关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//关闭连接
if(connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
实现Fanout模式 (publish/subscribe发布订阅(共享资源))
模式结构
使用图像可视化界面实现
-
创建相应的交换机(交换机类型是Fanout类型)
-
将交换机和队列进行绑定
-
发送消息
使用java代码实现
-
创建交换机
//设置虚拟主机 factory.setVirtualHost("/"); String exchangeName="fanout-exchange"; String type="fanout"; /** * params1 exchange String 交换机的名称 * params2 type String 指定交换机的类型 * params3 durable boolean 是否持久化 */ channel.exchangeDeclare(exchangeName,type,true);
-
将交换机和队列进行绑定
/** * params1 queue String 队列的名称 * params2 exchange String 交换机的名称 * params3 routingKey String 路由名称 */ channel.queueBind(queueName,exchangeName,routeKey);
-
发送消息
/** * param1: exchange String 交换机 * param2: routingKey String 路由(队列名称) * param3: props BasicProperties 一些参数 * param4: boby byte[] 消息,注意这里的类型是byte[] */ channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
合并生产者代码
package FanOut;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
//所有的中间件技术都是基于TCP/IP协议的基础之上构建新的协议规范,只不过rabbitmq遵循的是AMQP协议
//基于TCP/IP协议所以一定离不开的是IP
//创建一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置工厂IP建立于Rabbitmq的连接
factory.setHost("127.0.0.1");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
//设置端口
factory.setPort(5672);
//设置虚拟主机
factory.setVirtualHost("/");
//通过连接工厂建立连接
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection("生产者");
//通过连接及建立通道
channel = connection.createChannel();
//通过通道声明队列,创建交换机,绑定关系,路由key,发送消息,和接收消息
String queueName="queue1";
/**
* param1: queue:String 队列的名称
* param2: durable:boolean 队列持久化设置 true表示持久化 false 表示不会持久化 持久化会存入到磁盘中 不持久化存放在内存中,非持久化也会存盘 ,并且会随着系统的重启而丢失
* param3: exclusive:boolean 是否表现出排他性(是否只对一个消费者消费(是否共享))
* param4: autoDelete:boolean 是否自动删除 随着最后一个消费者消费完毕之后是否自动的删除
* param5: arguments:map 携带附加参数
*/
channel.queueDeclare(queueName,false,false,true,null);
//准备好发送的消息
String msg="Hello Rabbitmq";
String exchangeName="fanout-exchange";
String routeKey="";
String type="fanout";
/**
* params1 exchange String 交换机的名称
* params2 type String 指定交换机的类型
*/
channel.exchangeDeclare(exchangeName,type);
/**
* params1 queue String 队列的名称
* params2 exchange String 交换机的名称
* params3 routingKey String 路由名称
*/
channel.queueBind(queueName,exchangeName,routeKey);
/**
* param1: exchange String 交换机
* param2: routingKey String 路由(队列名称)
* param3: props BasicProperties 一些参数
* param4: boby byte[] 消息,注意这里的类型是byte[]
*/
channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
//关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//关闭连接
if(connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
优化
我们可以将连接工厂创建连接封装成为一个工具类
package ConnectionFactoryUtils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CoonectionUtils {
private static ConnectionFactory factory;
//也可以将其封装成一个静态方法获取相应的名称
static {
//创建一个连接工厂
factory=new ConnectionFactory();
//设置工厂IP建立于Rabbitmq的连接
factory.setHost("127.0.0.1");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
//设置端口
factory.setPort(5672);
//设置虚拟主机
factory.setVirtualHost("/");
}
public static Connection GetConnection(String producer) throws IOException, TimeoutException {
//获取连接对象
Connection connection = factory.newConnection(producer);
return connection;
}
}
实现Direct模式(Routing路由模式)
模式结构
分析:Direct模式结构与Fanout模式比较相似,但是Direct模式会有一个筛选过滤的作用通过routKey相当于数据库脸面的(where rountKey=XXX);
使用图形化界面实现
Direct模式和Fanout模式的区别
- 将交换机的类型设为Direct
- 在交换机和队列绑定时需要输入相应的routeKey。
- 发送消息会根据指定的routeKey匹配发送
使用Java实现Direct模式
同样我们时通过与Fanout模式的比较来理解
因为Direct模式的本质是更具routeKey来发送的所以需要设置具体的routeKey
注意: 一个队列可以拥有多个routeKey
String routeKey="eamil";
topic 主题模式(路由模式的一种)
模式结构
- 星号、#号代表通配符
- 星号代表只能由一级
一个单词
,#号代表0个或多个单词 - 路由功能添加模糊匹配
- 消息产生者产生消息,把消息交给交换机
- 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
Java实现
//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
//1.参数一:交换机名称 参数二:交换机类型
channel.exchangeDeclare("topic_exchange","topic");
Work模式
模式结构
- 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样。保证一条消息只能被一个消费者使用
- 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢
work轮询模式
一消费者一条,按均分配
Java代码实现
work模式默认的就是轮询接收,但是在第二个参数是autoAck(自动应答)必须为true
轮询模式不会因为某一个消费者处理速度比较快时,就会导致处理的消息的数量不一样
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("Work1接收到"+new String(message.getBody(),"UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接收消息失败");
}
});
work不公平分发
一个消费者处理运行快时,就会消费多条的消息,一个消费者运行的慢时就会消费少量的消息。
Channel finalChannel = channel;
//设置Qos的值设置预处理值,不管是处理速度是快还是慢都需要处理QOS值
finalChannel.basicQos(20);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("Work1接收到"+new String(message.getBody(),"UTF-8"));
}
//实现一个手动的回答
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("接收消息失败");
}
});
SpringBoot整合Rabbitmq
-
创建一个SpringBoot工程,导入Rabbitmq的启动器
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
完成我们的Rabbitmq的连接配置
基本的配置如下,当然我们在导入相应的第三方的时候,他们会有一个配置类的。
查看配置类我们可以发现如果我们是在本机安装的Rabbitmq的话,是可以不需要配置rabbitmq的相当配置的,默认会使用guest用户登录。
spring: rabbitmq: host: username: password: virtual-host: port:
使用SpringBoot创建Fanout模式
创建一个生产者
-
实现交换机的创建和交换机与队列的绑定
package RabbitmqSixModel.Configuration; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfiguration { //这里指定创建的交换机类型就是XXXExchange的实现类 @Bean public FanoutExchange createFanoutExchange(){ return new FanoutExchange("Fanout_Exchange",true,false); } //声明队列 @Bean public Queue SSMQueue(){ return new Queue("SSMQueue_Queue",true,false,true); } @Bean public Queue EmailQueue(){ return new Queue("EmailQueue_Queue",true,false,true); } @Bean public Queue QQQueue(){ return new Queue("QQQueue_Queue",true,false,true); } //将队列与交换机进行绑定,使用的是指挥者模式。BindingBuilder指挥者指挥着创建的步骤 @Bean public Binding SSMQueueBinding(){ return BindingBuilder.bind(SSMQueue()).to(createFanoutExchange()); } @Bean public Binding EmailQueueBinding(){ return BindingBuilder.bind(EmailQueue()).to(createFanoutExchange()); } @Bean public Binding QQQueueBinding(){ return BindingBuilder.bind(QQQueue()).to(createFanoutExchange()); } }
-
发送消息
package RabbitmqSixModel.FanoutServer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class FanoutServerImp { @Autowired private RabbitTemplate rabbitTemplate; /** * @param OrderId * @param ProductedId * @param num */ public void makerOrder(String OrderId,String ProductedId,int num){ //创建一个订单 OrderId= UUID.randomUUID().toString(); //使用Rabbitmq分发消息 /** * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send */ String exchangeName="Fanout_Exchange"; String routingKey =""; rabbitTemplate.convertAndSend(exchangeName,routingKey,OrderId); } }
创建一个消费者
package RabbitmqSixModel.FanoutServer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
//监听相应的管道
//queues是一个String[]表示可以监听多个
@RabbitListener(queues={"EmailQueue_Queue"})
@Service
public class EamilQueueConsumer {
//设置落脚点
@RabbitHandler
public void Consume(String message){
System.out.println("EmailQueue_Queue队列的消息=="+message);
}
}
使用SpringBoot实现Direct模式
Direct模式主要要靠的是RouteKey进行筛选。
所以在绑定Queue是只需要Building.bin().to().with(RouteKey);
也可以使用注解来实现交换机,队列的创建和绑定。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "duanxin.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key="#.duanxin.#"
))
在创建交换机和队列时在生产者和消费者都可以创建,但是最好是在消费者创建。
RabbitMQ过期时间的设置
设置整个队列里面的消息的过期时间
在Configuration配置类里面配置x-message-ttl
的参数就可以了,注意默认单位时ms
@Bean
public Queue SSMQueue(){
Map<String,Object> map =new HashMap<>();
map.put("x-message-ttl",5000);//设置队列的过期时间为5s
return new Queue("SSMQueue_Queue",true,false,true,map);
}
判断自己是否配置成功,如果配置成功Queue会显示出TTL
设置某个消息的过期时间
MessagePostProcessor message=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//这个地方的参数类型为字符串
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName,routingKey,OrderId,message);
当然如果一个消息自己本身就设置的有过期时间,让后又存在于过期队列中,那么他的过期时间应该是两个当中比较少的那个时间。
注意:消息本身的过期时间,当消息的存在的时间到达过期时间之后也会加入到死信队列中去,但是需要指定死信交换机。利用这个特性可以是实现一个指定延长时间的延时队列。
死信队列
什么是死信队列
其实死信队列就是一个普通的交换机
,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理的。但是你可以配置某个交换机为此队列的死信交换机,该队列的消息成为死信后会被重新发送到此 DLX 。至于怎么处理这个DLX中的死信就是看具体的业务场景了,DLX 中的信息可以被路由到新的队列。接盘侠
哪几种条件会是消息进入到死信队列
-
消息被拒绝
-
消息过期(过期队列里面的消息过期。消息本身设置的过期时间过期不会加入到死信队列中去)
-
队列的长度到达规定到长度之后就会使后来的消息加入到死信队列。
显示DLX证明配置成功,在这个队列里面的消息过期或者是超过了长队之后就会加入到指定的死信队列中。
死信队列就是一个普通的交换机
@Configuration public class DLXConfiguration { @Bean public FanoutExchange createDLXFanoutExchange(){ return new FanoutExchange("DLX_Fanout_Exchange",true,false); } @Bean public Queue DLXEmailQueue(){ return new Queue("DLX_EmailQueue_Queue",true,false,true); } @Bean public Binding DLXEmailQueueBinding(){ return BindingBuilder.bind(DLXEmailQueue()).to(createDLXFanoutExchange()); } }
-
@Bean public Queue SSMQueue(){ Map<String,Object> map =new HashMap<>(); map.put("x-message-ttl",5000);//设置队列的过期时间为5s map.put("x-dead-letter-exchange","DLX_Fanout_Exchange"); //设置死信交换机,将消息转发到相应的交换机 //map.put("x-dead-letter-routing-key","");//如果你死信交换机的类型就需要配置我这里思想交换机就一个队列就设置类型为Fanout,Fanout类型不需要配置 map.put("x-max-length",5);//设置队列的长度是5 return new Queue("SSMQueue_Queue",true,false,true,map); }
注意:当消息的数量超过对列的长度之后,RabbitMQ默认的做法是将处于队列头部的信息(队列中最老的消息)丢弃或变成死信。可以通过设置overflow的值来改变这种方式。
延迟对列
顾名思义是对消息进行延迟处理,使用场景在购物时下单,常常可以看见支付时间还剩余多少,当时间到达了规定的时间之后,就会发送消息告知用户支付已经被取消。
Rabbitmq本身是没有延时对列的。
方法一:可以使用TTL(过期对列)+DLX死信对列来实现,只需要使用一个消费者去监听死信对列判断这个操作是否已经被处理。没有被处理的告知用户。
- 不推荐。原因:死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。死信队列没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信(解决方法:向单个队列投递相同延迟时间的消息)。
方法二:采用rabbitmq-delayed-message-exchange 插件实现。(RabbitMQ 3.6.x开始支持)
- 推荐。原因:它解决了死信队列的消息投递问题:在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。
方法一基于TTL+DLX实现延迟队列
代码实现
package RabbitmqSixModel.Configuration;
//设置一个普通的队列,并指明好死信交换机
@Configuration
public class PostPoneQueue {
@Bean
public DirectExchange postPoneExchange(){
return new DirectExchange("PostPoneExchange",true,false);
}
@Bean
public Queue postPoneQueueName(){
Map<String,Object> args=new HashMap<>();
args.put("x-dead-letter-exchange","DLX_Direct_Exchange");
args.put("x-dead-letter-routing-key","email");
return new Queue("postPoneQueue",true,false,false,args);
}
@Bean
public Binding postPoneQueueBinding(){
return BindingBuilder.bind(postPoneQueueName()).to(postPoneExchange()).with("postPone");
}
}
//死信交换机
@Configuration
public class DLXConfiguration {
@Bean
public DirectExchange createDLXFanoutExchange(){
return new DirectExchange("DLX_Direct_Exchange",true,false);
}
@Bean
public Queue DLXEmailQueue(){
return new Queue("DLX_EmailQueue_Queue",true,false,true);
}
@Bean
public Binding DLXEmailQueueBinding(){
return BindingBuilder.bind(DLXEmailQueue()).to(createDLXFanoutExchange()).with("email");
}
}
//发送消息核心代码
MessagePostProcessor message=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//这个地方的参数类型为字符串
message.getMessageProperties().setExpiration("5000"); //设置过期时间,这里可以采用一个变量来动态设置时间。
message.getMessageProperties().setContentEncoding("UTF-8");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
发送多条不同时间延迟的消息时,会出现一个排队的现象,后面消息时间可能已经出现了过期,但是由于死信队列的原因导致了后面的消息需要等待前面的消息过期之后才会加入到死信队列。满足一个FIFO,出现这种排队的原因,因为RabbitMQ会检查第一个消息是否过期,如果过期则会丢到死信队列。
基于插件的延迟队列
下载插件rabbitmq_delayed_message_exchange
官网下载地址: https://www.rabbitmq.com/community-plugins.html
在windows上安装此插件的话
首先需要插件复制到plugins目录下
到/sbin目录下,使用cmd执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在linux或远程服务器上安装
首相需要插件解压到plugins目录下
#启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
再重新启动rabbitmq服务
检查是否安装成功
到rabbitmq的web界面查看交换机的类型
原理:原来发生延迟的地方是队列,使用插件将交换机声明为x-delayed-message之后,发生延迟的地方是在交换机。
模式结构
SpringBoot实现延迟交换机的创建和绑定
配置类
package RabbitmqSixModel.Configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayConfiguration {
//交换机名称
public static final String DELAY_EXCHANGE_NAME="delay_exchange";
//队列名称
public static final String DELAY_QUEUE_NAME="delay_queue";
//routingKey
public static final String DELAY_ROUTINGKEY_NAME="delay_routingKey";
//声明交换机
@Bean
public CustomExchange DelayExchange(){
Map<String, Object>args=new HashMap<>();
args.put("x-delayed-type","direct");
/**
* params1:name String 交换机名称
* params2:type String 交换机类型
* params3:durable boolean 是否持久化
* params4:autoDelete boolean 是否自动删除
* params5:arguments Map 其他的一些参数
*/
return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
//声明队列
@Bean
public Queue DelayQueue(){
return new Queue(DELAY_QUEUE_NAME,true,false,false);
}
//交换机与队列绑定
@Bean
public Binding DelayQueueBindingDelayExchange(
@Qualifier("DelayExchange") CustomExchange exchange,
@Qualifier("DelayQueue") Queue queue
){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTINGKEY_NAME).noargs();
}
}
生产者(Producer)
@Service
public class DelayServerImp {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param OrderId
* @param ProductedId
* @param num
*/
public void makerOrder(String OrderId,String ProductedId,int num){
//创建一个订单
OrderId= UUID.randomUUID().toString();
System.out.println("发送消息为"+ OrderId);
//使用Rabbitmq分发消息
/**
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
*/
String exchangeName="delay_exchange";
String routingKey ="delay_routingKey";
MessagePostProcessor message=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(500);//设置延迟时间,可以使用变量
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName,routingKey,OrderId,message);
}
}
消费者
@Component
public class DelayQueueConsumer {
@RabbitListener(queues = {"delay_queue"})
public void delayConsumer(String msg){
System.out.println("消息为"+msg);
}
}
实现效果
发布确认
发布确认主要是为了消息丢失。
在生产者发送消息之后,消息会发送到消息对列中,然后需要将消息加载到磁盘中,只有在消息被压入到磁盘中才达到了消息持久化。当开启了发布确认的模式之后,,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所匹配的队列之后,broker就会发送一个确认给生产者,(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出
发u确认模式需要满足的条件
- 设置要求对列必须持久化
- 设置要求对列中的消息持久化
- 开启发布确认模式(Channel.confirmSelect)
单个发布确认
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirm()这个方法只有在消息被确认的时候才返回,如果在指定时间范围内 这个消息没有被确认那么它将抛出异常。
但是正是因为他是同步确认方式,所以每发送一个就必须等到确认之后才可以继续发送下一个,所以显而易见,速度就特别的慢。
boolean b = rabbitTemplate.waitForConfirms(100);
批量发布确认
上面那种方式非常慢,批量发布确认其实就是对单个发布确认做出了一点优化,就是单个发布确认是每发送一条就确认一次,批量发布确认是一次发布多少条,然后确认,相对于单个发布确认来说,可以节省不少的时间。
缺点:当发送消息出现问题时,我们无法知道是那个消息出现问题
异步发布确认
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论可靠性还是效率都比上面两种方式好,它是利用回调函数来达到可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
//消息接收成功 回调函数
ConfirmCallback ackCallback = ((deliveryTag, multiple) -> {
//2.删除掉已经确认的消息 剩下的就是未确认的消息
System.out.println("确认的消息:" + deliveryTag);
});
/*
*params1:消息的标签
*params2:是否是批量确认
*/
ConfirmCallback nackCallBack = (deliveryTag, multiple) -> {
//3.打印一下未确认的消息都有哪些
System.out.println("未确认的消息:" + deliveryTag);
};
/*
*params1: 监听那些消息发送成功的消息
*params2: 监听那些消息发送失败
*/
//准备消息的监听器 监听那些消息成功了 那些消息失败了
channel.addConfirmListener(ackCallback, nackCallBack);
处理未确认的消息
RabbitMQ之发布确认(单个发布确认,批量发布确认,异步发布确认)_rabbitmq批量确认_qq_42212926的博客-CSDN博客
拓展
存在一个问题当Rabbitmq出现宕机时,我们的broker将会不存在,即不会有交换机(exchange)和队列(queue),所以这个时间使用发布确认模式,是不会返回出相应的接送消息的。并且这时我们的生产者也不会知道rabbitmq发生了宕机。解决方法是需要一个回调函数
-
首先在配置文件中开启发布确认模式
publisher-confirm-type: correlated # NONE 表示发布确认模式被禁用 # CORRELATED,表示消息成功到达Broker后触发ConfirmCalllBack回调 # SIMPLE:,发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑, 标签:String,队列,rabbitmq,RabbitMQ,交换机,消息,public From: https://www.cnblogs.com/wzl66/p/17910119.html