RabbitMQ
1 MQ的基本概念
- RabbitMQ是一个开源的消息代理和队列服务器,它使用Erlang语言编写并运行在多种操作系统上,如Linux、Windows等。RabbitMQ可以接收、存储和转发消息(也称为“事件”)到连接的客户端。它适用于多种场景,包括异步通信、流量削峰、应用解耦等。
1.1 同步和异步
同步: 多个服务之间顺序执行
- 优点: 时效性强
- 缺点: 耦合度高,性能降低,有额外的资源消耗,有级联失效的问题
异步: 多个服务之间同时执行
- 事件驱动
- 事件发布者(Publisher)
- 中间人(Broker)
- 事件订阅者(Consumer)
1.2 MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
发送方称为生产者,接收方称为消费者
1.3 MQ的优势和劣势
1.3.1 优势
1.3.1.1 应用解耦
系统的耦合性越高,容错性就越低,可维护性就越低。
使用 MQ 使得应用间解耦,提升容错性和可维护性。
1.3.1.2 异步提速
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
我们可以在使用MQ后,流程如下
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
1.3.1.3 削峰填谷
添加MQ后,可以把请求先缓存在MQ中
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰 就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直 到消费完积压的消息,这就叫做"填谷"。
使用MQ后,可以提高系统稳定性。
1.4 常见的MQ产品
1.4.1 产品区别
1.4.2 使用建议
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2 初识RabbitMQ
2.1 简介
首先来了解一下AMQP
-
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
-
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。 Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
-
官网: https://www.rabbitmq.com/
2.2 安装RabbitMQ
将2024-AI大模型Java全链路工程师环境资料/第四模块/rabbitmq-server文件夹上传至虚拟机A的/root
[root@rabbitmq ~]# cd rabbitmq-server
[root@rabbitmq rabbitmq-server]# dnf -y localinstall rabbitmq-server-3.13.2-1.el8.noarch.rpm esl-erlang_26.2.3_1~rockylinux~8_x86_64.rpm
运行 RabbitMQ 服务器
[root@rabbitmq ~]# systemctl enable rabbitmq-server
[root@rabbitmq ~]# systemctl start rabbitmq-server
- 启动RabbitMQ服务和管理界面
[root@rabbitmq ~]# rabbitmq-plugins enable rabbitmq_management
默认用户访问
创建用户:通过以下命令创建一个用户: admin,密码为:Aq71056r
[root@rabbitmq ~]# rabbitmqctl add_user admin Aq71056r
设置用户标签为administrator
[root@rabbitmq ~]# rabbitmqctl set_user_tags admin administrator
重启rabbitmq-server服务
[root@rabbitmq ~]# systemctl restart rabbitmq-server
访问http://ip:15672,并通过用户名和密码进行登陆
-
用户名:admin
-
密 码: Aq71056r
点进账号里面,把Virtua lHost为【/】的set permission给用户
端口
5672是rabbitMQ的客户端端口
15672是rabbitMQ的ui界面访问端口
25672是rabbitMQ中的erlang发现口
相关概念
RabbitMQ 基础架构如下图
①Producer: 服务的生产者
②Consumer: 服务的消费者
③Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
④Virtual host:出于多用户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exccange/queue 等
⑤Connection:publisher/consumer 和 broker 之间的 TCP 连接
⑥Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,所以Connection中可以包含多个Channel
⑦Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
⑧Queue:消息最终被送到这里等待 consumer 取走
⑨Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exccange 中的查询表中,用于 message 的分发依据
3 消息模型
RabbitMQ 提供了7 种消息模型:
- 简单模式
- 工作队列模式
- 发布与订阅模式
- 路由模式
- 主题模式
- RPC 远程调用模式(了解)
- 发布确认模式(了解)
3.1 初始化
①创建一个maven项目
Name
: RabbitMQDemo
GroupId
: cn.tedu
②在pom.xml
中,添加如下依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.tedu</groupId>
<artifactId>RabbitMQDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies>
</project>
点击右上角的m标志,刷新maven,查看验证maven是否下载成功
3.2 简单模式
- 新建一个名为cn.tedu.simple的包
3.2.1 生产者代码
- 在cn.tedu.simple中创建一个Class,名为:SimpleProducer
- 用于测试简单模式中的生产者
- 1.连接到RabbitMQ服务器
- 2.创建连接对象(TCP)—所有的RabbitMQ服务器的客户端使用一个tcp连接对象
- 3.利用tcp连接对象创建channel(通道)
- 4.通过channel创建queue(队列)
- 5.准备数据
- 6.发送数据到rabbitmq中
- 7.关闭连接
package cn.tedu.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("simple.queue", false, false, false, null);
//5.准备数据(Hello World+当前系统的时间)
String s="Hello World"+System.currentTimeMillis();
//6.把消息发送到RabbitMQ中
//第一个参数;先忽略,写"",注意双引号中间不要加空格
//第二个参数:先写成队列名
//第三个参数:其他属性
//第四个参数:要发送的数据,要求是byte[]类型
cc.basicPublish("", "simple.queue", null, s.getBytes());
System.out.println("发送成功!");
//7.关闭连接
nc.close();
}
}
执行代码,在图形面板中可以看到队列(访问:192.168.8.100:15672)
3.2.2 消费者代码
- 在cn.tedu.simple中创建一个Class,名为:SimpleConsumer
- 用于测试简单模式中的消费者
- 1.连接到RabbitMQ
- 2.创建连接对象(TCP)—所有的RabbitMQ服务器的客户端使用一个tcp连接对象
- 3.利用tcp连接对象创建channel(通道)
- 4.通过channel创建queue(队列)
- 5.从RabbitMQ中获取数据,进行消费
package cn.tedu.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("simple.queue", false, false, false, null);
//处理数据的回调函数
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//Delivery可以获取要消费的数据
System.out.println(consumerTag);
byte[] a = message.getBody();
String s = new String(a);
//处理消息
System.out.println(consumerTag + "收到:" + s);
System.out.println("-----------");
}
};
//取消接收时的回调函数
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//第一个参数:指定队列名称
//第二个参数:先设置为true
//第三个参数:处理数据的回调函数
//第四个参数:取消接收时的回调函数
cc.basicConsume("simple.queue", true, d, c);
}
}
- 验证
- 运行生产者SimpleProducer
- 运行消费者SimpleConsumer,此时可以看到接收到的消息,如果未消费的则会在RabbitMQ队列中呈现
- 测试完毕,将生产者和消费者停止运行
3.3 工作队列模式
多个消费者从同一个队列中消费消息,对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
3.3.1 运行测试
- 新建一个名为cn.tedu.work的包
- 将cn.tedu.simple中的SimpleProducer和SimpleConsumer复制到cn.tedu.work
- 将cn.tedu.work中的SimpleProducer改名为WorkProducer
- 将cn.tedu.work中的SimpleConsumer改名为WorkConsumer
- 修改队列名WorkProducer和WorkConsumer中的队列名为:work.queue
- 准备WorkConsumer-1和WorkConsumer-2
- 此时运行WorkConsumer-1、WorkConsumer-2和WorkProducer会发现,两个消费者有一个消费了
但是现在测试生产者数据是每运行一次产生一个数据,比较不方便,所以可以借助于while循环,利用控制台产生数据,因此修改代码如下
package cn.tedu.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class WorkProducer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("work.queue", false, false, false, null);
String s = null;
//在一个While(true)中完成
while (true) {
System.out.print("输入数据: ");
//控制台输入消息,利用Scanner工具类完成
s = new Scanner(System.in).nextLine();
//如果输入quit关键字,则退出循环
if (s.equals("quit")) {
break;
}
//发送数据到RabbitMQ中
cc.basicPublish("", "work.queue", null, s.getBytes());
}
nc.close();
}
}
- 此时运行WorkProducer和WorkConsumer-1、WorkConsumer-2,生产者输入数据,消费者是负载均衡的
3.3.2 消息确认
3.3.2.1 问题
-
一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?
-
就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失,也就是RabbitMQ已经把消息给到消费者了,但是由于当即导致消息其实没有被处理
-
下方WorkConsumer模拟消息处理缓慢的情况(阻塞)
package cn.tedu.work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("work.queue", false, false, false, null);
//处理数据的回调函数
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//Delivery可以获取要消费的数据
System.out.println(consumerTag);
byte[] a = message.getBody();
String s = new String(a);
//处理消息
System.out.println(consumerTag + "收到:" + s);
//模拟消息处理缓慢,如果输入的消息包含.那就睡眠10s钟
if (s.contains(".")) {
try {
System.out.println(consumerTag + "开始处理消息...");
//睡眠10s钟
Thread.sleep(10000);
System.out.println(consumerTag + "处理消息完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("-----------");
}
};
//取消接收时的回调函数
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//第一个参数:指定队列名称
//第二个参数:先设置为true
//第三个参数:处理数据的回调函数
//第四个参数:取消接收时的回调函数
cc.basicConsume("work.queue", true, d, c);
}
}
- 此时,运行WorkProducer输入带有.的数据,会由其中一个消费者进行消费,但还是采用负载均衡的策略,则会导致该消费者,处于正在消费带有.的数据,同时后边有增加了一些待处理的数据,而另外一个消费者却很轻松,所以存在不合理
- 因此需要设置消息的"合理分发",但需要先了解下消息确认(上方)
- 现在杀掉一个正在处理消息的工作进程,该消息会丢失
3.3.2.2 解决方案
为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。
如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rabbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。
这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以
手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。
package cn.tedu.work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("work.queue", false, false, false, null);
//处理数据的回调函数
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//Delivery可以获取要消费的数据
System.out.println(consumerTag);
byte[] a = message.getBody();
String s = new String(a);
//处理消息
System.out.println(consumerTag + "收到:" + s);
//模拟消息处理缓慢,如果输入的消息包含.那就睡眠10s钟
if (s.contains(".")) {
try {
System.out.println(consumerTag + "开始处理消息...");
//睡眠10s钟
Thread.sleep(10000);
System.out.println(consumerTag + "处理消息完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("-----------");
//第一个参数:消息的标签,需要从消息中获取
//是否确认多条消息处理完毕,false代表只确认一条消息
cc.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//取消接收时的回调函数
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//第一个参数:指定队列名称
//第二个参数:先设置为true,设置为Autoack自动确认消息,只要消息发送给消费者
//就会认为自己的消息处理完毕了,从RabbitMQ中删除,如果消费者还没来的及消费,就宕机了
//就会造成消息丢失,所以我们不使用自动ack,将第二个值改为false
//此处设置为false,还没完成ack的确认,消息消费完毕还需还设置ack的主动提交
//手动ack的模式下:如果消费者还未发送消息回执的标签,这个消息会暂时保存在RabbitMQ中
//不会由于消费者宕机而造成数据丢失
//第三个参数:处理数据的回调函数
//第四个参数:取消接收时的回调函数
cc.basicConsume("work.queue", false, d, c);
}
}
使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。
- 但此时还没有实现合理的分发
忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存
可以使用下面命令打印工作队列中未确认消息的数量
rabbitmqctl list_queues name messages_ready messages_unacknowledged
当处理消息时异常中断, 可以选择让消息重回队列重新发送.
nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:
// requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
cc.basicNack(tag, multiple, requeue)
3.3.3 合理地分发
rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲,而rabbitmq对此一无所知, 仍然会均匀的分发消息
我们可以使用 basicQos(1)
方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息,而是把消息发给下一个空闲的消费者
package cn.tedu.work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("work.queue", false, false, false, null);
//处理数据的回调函数
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//Delivery可以获取要消费的数据
System.out.println(consumerTag);
byte[] a = message.getBody();
String s = new String(a);
//处理消息
System.out.println(consumerTag + "收到:" + s);
//模拟消息处理缓慢,如果输入的消息包含.那就睡眠10s钟
if (s.contains(".")) {
try {
System.out.println(consumerTag + "开始处理消息...");
//睡眠10s钟
Thread.sleep(10000);
System.out.println(consumerTag + "处理消息完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("-----------");
//第一个参数:消息的标签,需要从消息中获取
//是否确认多条消息处理完毕,false代表只确认一条消息
cc.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//取消接收时的回调函数
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//如果想实现消息的合理分发,还需要设置一个消费者同时只能处理一条消息
cc.basicQos(1);
//第一个参数:指定队列名称
//第二个参数:先设置为true,设置为Autoack自动确认消息,只要消息发送给消费者
//就会认为自己的消息处理完毕了,从RabbitMQ中删除,如果消费者还没来的及消费,就宕机了
//就会造成消息丢失,所以我们不使用自动ack,将第二个值改为false
//此处设置为false,还没完成ack的确认,消息消费完毕还需还设置ack的主动提交
//手动ack的模式下:如果消费者还未发送消息回执的标签,这个消息会暂时保存在RabbitMQ中
//不会由于消费者宕机而造成数据丢失
//第三个参数:处理数据的回调函数
//第四个参数:取消接收时的回调函数
cc.basicConsume("work.queue", false, d, c);
}
}
3.3.4 消息持久化
-
当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据
-
要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)
-
在RabbitMQ中删除现有的队列work.queue,因为对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错
3.3.4.1 生产者代码
- 设置队列持久化
- 设置消息持久化
package cn.tedu.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class WorkProducer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化,改为true,表示队列持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("work.queue", true, false, false, null);
String s = null;
//在一个While(true)中完成
while (true) {
System.out.print("输入数据: ");
//利用Scanner工具类完成
s = new Scanner(System.in).nextLine();
//如果输入quit关键字,则退出循环
if (s.equals("quit")) {
break;
}
//发送数据到RabbitMQ中,使用第三个参数设置消息持久化
cc.basicPublish("", "work.queue", MessageProperties.PERSISTENT_TEXT_PLAIN, s.getBytes());
}
nc.close();
}
}
3.3.4.2 消费者代码
- 设置队列持久化
package cn.tedu.work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer {
public static void main(String[] args) throws Exception {
//1.连接到RabbitMQ服务器
ConnectionFactory cf = new ConnectionFactory();
//利用连接工厂设置连接信息
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
//2.创建连接对象(TCP)---所有的RabbitMQ服务器的客户端使用一个tcp连接对象
Connection nc = cf.newConnection();
//3.利用tcp连接对象创建channel(通道)
Channel cc = nc.createChannel();
//4.通过channel创建队列
//第一个参数:队列名为simple.queue
//第二个参数:队列是否持久化,将第二个参数设置为true,表示队列持久化
//第三个参数:队列是否独占
//第四个参数:队列是否自动删除
//第五个参数:其他属性的设置
cc.queueDeclare("work.queue", true, false, false, null);
//处理数据的回调函数
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//Delivery可以获取要消费的数据
System.out.println(consumerTag);
byte[] a = message.getBody();
String s = new String(a);
//处理消息
System.out.println(consumerTag + "收到:" + s);
//模拟消息处理缓慢,如果输入的消息包含.那就睡眠10s钟
if (s.contains(".")) {
try {
System.out.println(consumerTag + "开始处理消息...");
//睡眠10s钟
Thread.sleep(10000);
System.out.println(consumerTag + "处理消息完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("-----------");
//第一个参数:消息的标签,需要从消息中获取
//是否确认多条消息处理完毕,false代表只确认一条消息
cc.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//取消接收时的回调函数
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//如果想实现消息的合理分发,还需要设置一个消费者同时只能处理一条消息
cc.basicQos(1);
//第一个参数:指定队列名称
//第二个参数:先设置为true,设置为Autoack自动确认消息,只要消息发送给消费者
//就会认为自己的消息处理完毕了,从RabbitMQ中删除,如果消费者还没来的及消费,就宕机了
//就会造成消息丢失,所以我们不使用自动ack,将第二个值改为false
//此处设置为false,还没完成ack的确认,消息消费完毕还需还设置ack的主动提交
//手动ack的模式下:如果消费者还未发送消息回执的标签,这个消息会暂时保存在RabbitMQ中
//不会由于消费者宕机而造成数据丢失
//第三个参数:处理数据的回调函数
//第四个参数:取消接收时的回调函数
cc.basicConsume("work.queue", false, d, c);
}
}
补充
3.4 发布订阅模式
是群发的概念,每条消息可以发送给多个消费者
3.4.1 交换机类型
在订阅模型中,多了一个 Exchange 角色
Exchange:交换机(X),一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失
Exchange和Queue之间还需要绑定才能发送消息
3.4.2 生产者代码
package cn.tedu.ps;
import com.rabbitmq.client.*;
import java.util.Scanner;
public class PSProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
cc.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
String s = null;
while (true) {
System.out.print("输入数据: ");
s = new Scanner(System.in).nextLine();
if (s.equals("quit")) {
break;
}
cc.basicPublish("logs", "", null, s.getBytes());
}
nc.close();
}
}
3.4.3 消费者代码
package cn.tedu.ps;
import com.rabbitmq.client.*;
import java.io.IOException;
public class PSConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
Connection nc = cf.newConnection();
final Channel cc = nc.createChannel();
String queueName = cc.queueDeclare().getQueue();
cc.exchangeDeclare("logs", "fanout");
cc.queueBind(queueName, "logs", "");
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a = message.getBody();
String s = new String(a);
System.out.println(consumerTag + "收到:" + s);
if (s.contains(".")) {
try {
System.out.println(consumerTag + "开始处理消息...");
Thread.sleep(10000);
System.out.println(consumerTag + "处理消息完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume(queueName, true, d, c);
}
}
3.5 路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
3.5.1 生产者代码
package cn.tedu.route;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class RouteProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
cc.exchangeDeclare("route.logs", BuiltinExchangeType.DIRECT);
String s = null;
while (true) {
System.out.print("输入数据: ");
s = new Scanner(System.in).nextLine();
System.out.print("输入数据: ");
String routingKey = new Scanner(System.in).nextLine();
if (s.equals("quit")) {
break;
}
cc.basicPublish("route.logs", routingKey, null, s.getBytes());
}
nc.close();
}
}
3.5.2 消费者代码
package cn.tedu.route;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
public class RouteConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
Connection nc = cf.newConnection();
final Channel cc = nc.createChannel();
String queueName = cc.queueDeclare().getQueue();
cc.exchangeDeclare("route.logs", "direct");
System.out.println("输入绑定键,用空格隔开: ");
String key = new Scanner(System.in).nextLine();
String[] bindingKeys = key.split("\\s+");
for (String bindingKey : bindingKeys) {
cc.queueBind(queueName, "route.logs", bindingKey);
}
DeliverCallback d = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a = message.getBody();
String s = new String(a);
System.out.println(consumerTag + "收到:" + s);
if (s.contains(".")) {
try {
System.out.println(consumerTag + "开始处理消息...");
Thread.sleep(10000);
System.out.println(consumerTag + "处理消息完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
CancelCallback c = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume(queueName, true, d, c);
}
}
3.6 主题模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
匹配零个或多个词item.#
能够匹配 item.insert.abc 或者 item.insert
*
匹配不多不少恰好1个词item.*
只能匹配 item.insert
3.6.1 生产者代码
package cn.tedu.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class TopicProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
cc.exchangeDeclare("topic.logs", BuiltinExchangeType.TOPIC);
String s = null;
while (true) {
System.out.print("输入数据: ");
s = new Scanner(System.in).nextLine();
System.out.print("输入路由key: ");
String routingKey = new Scanner(System.in).nextLine();
if (s.equals("quit")) {
break;
}
cc.basicPublish("topic.logs", routingKey, null, s.getBytes());
}
nc.close();
}
}
3.6.2 消费者代码
public class TopicConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.8.100");
cf.setPort(5672);//可选,5672是默认端口
cf.setUsername("admin");
cf.setPassword("Aq71056r");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
String queue = cc.queueDeclare().getQueue();
System.out.print("输入绑定键:");
String s = new Scanner(System.in).nextLine();
String[] a = s.split("\\s+");
for (String key : a) {
cc.queueBind(queue, "topic_logs", key);
}
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a = message.getBody();
String s = new String(a);
String key = message.getEnvelope().getRoutingKey();
System.out.println(s+"--"+key);
System.out.println("========================================");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume(queue, true,deliverCallback,cancelCallback);
}
}
标签:rabbitmq,String,队列,cf,RabbitMQ,cc,消息,Linux,Rocky
From: https://blog.csdn.net/qq_54523931/article/details/140813417