RabbitMQ
1 安装
1.1 mac
当然是使用 mac 的神器 homebrew 咯。
# 切记先更新 brew
brew install rabbitmq
# 如果出现找不到的情况,需要重置国内源
export HOMEBREW_BOTTLE_DOMAIN=''
1.2 docker
拉取镜像
docker pull rabbitmq:management
创建并运行容器
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
启动图形化界面
docker exec -it myrabbit rabbitmq-plugins enable rabbitmq_management
查看日志
docker logs -f myrabbit
2 使用
2.1 添加用户并授权
新增用户
rabbitmqctl add_user admin admin
设置用户分配权限
rabbitmqctl set_user_tags admin administrator
用户级别分为:
- 1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
- 2、monitoring 监控者 登录控制台,查看所有信息
- 3、policymaker 策略制定者 登录控制台,指定策略
- 4、managment 普通管理员 登录控制台
2.1.1角色分类:
1.none
不能访问management plugin
2.management:
-
查看自己相关节点信息
-
列出自己可以通过AMQP登入的虚拟机
-
查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
-
查看和关闭自己的channels和connections
-
查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3.Policymaker
- 包含management所有权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4.Monitoring
- 包含management所有权限
- 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
查看所有的virtual hosts的全局统计信息。
5.Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看,创建和删除users
- 查看创建permisssions
- 关闭所有用户的connections
添加用户资源权限
# 对超级管理员意义不大
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
⚠️注意:guest用户只能在本机访问
3 案例-Simple
3.1 依赖
3.1.1 Java原生依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
3.1.2 Spring依赖
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-amqp -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
3.1.3 Springboot 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
3.1 实操
3.1.1 生产者
public class Producer {
public static void main(String[] args) {
// 配置工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 创建连接
try(Connection connection = factory.newConnection("测试队列")){
// 获取通道
Channel channel = connection.createChannel();
// 声明队列 参数含义:队列名称、是否持久化、排他性、自动删除(最后一个消费者消费之后)、附加参数
String queueName = "Queue1";
channel.queueDeclare(queueName,false,false,false,null);
// 准备消息
String message = "你好服务器,我是沈自在,我最帅了";
// 发送消息
channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
// 关闭一切
}catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.1.2 消费者
public class Consumer {
public static void main(String[] args) {
// 配置工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 创建连接
try(Connection connection = factory.newConnection("消费者")){
// 获取通道
Channel channel = connection.createChannel();
// 声明队列 参数含义:队列名称、是否持久化、排他性、自动删除(最后一个消费者消费之后)、附加参数
channel.basicConsume("testQueue", new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("s: "+s);
System.out.println("收到的消息是:"+ new String(delivery.getBody(),"UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("消息接受失败");
}
});
System.out.println("开始接受消息");
System.in.read();
}catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
4 AMQP协议
4.1 简介
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
4.2 AMQP生产者流转过程
可以看出来啊,每次连接的建立都需要3次握手和4次挥手
4.3 AMQP消费者流转过程
4.4 为什么RabbitMQ是基于channel而不是连接的操作
我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。
一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID。
信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢?
试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是多个 TCP 连接。
然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。
RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。
信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。
比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。
RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。
5 RabbitMQ中的核心组成部分
5.1 核心概念
- Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
- Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
- Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
- Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
- Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
- Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.交换机和队列之间的绑定。
- Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。消费者可能有多个,队列中的消息如果想要指定推送呢?就要通过这个key来查找投递给谁。
- Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
5.2 RabbitMQ的整体架构
5.3 RabbitMQ的运行流程
6 完整创建
6.1 生产者
package com.echo.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
//所有的中间价技术都是基于tcp/ip协议的,并在此协议上构建的。rabbitmq遵循的是amqp协议。
//所以既然如此,必然会有ip和port
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.45.135");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try{
//2.创建Connection,名字叫做Producer
connection = connectionFactory.newConnection("Producer");
//3.通过连接获取通道channel
channel = connection.createChannel();
//4.准备交换机名字
String exchange = "direct-message-exchange"; //自己在代码中定义的
//5.交换机类型
String exchangeType = "direct";
//6.在broker中声明交换机
//第三个参数是交换机是否持久化,如果持久化 在broker关闭之后,该交换机也不会被移除
channel.exchangeDeclare(exchange,exchangeType,true);
//7.声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
//8.绑定队列和交换机的关系,第三个参数是路由名字
channel.queueBind("queue5",exchange,"order");
channel.queueBind("queue6",exchange,"order");
channel.queueBind("queue7",exchange,"course");
//9.往队列里发送消息
String messageOrder = "Hello direct All Order";
String messageCourse = "Hello direct All Course";
channel.basicPublish(exchange,"order",null,messageOrder.getBytes());
channel.basicPublish(exchange,"course",null,messageCourse.getBytes());
System.out.println("消息发送成功");
}
catch (Exception e){
e.printStackTrace();
}
finally {
//7.关闭通道
if (channel != null && channel.isOpen()){
try {
channel.close();
}
catch (Exception e){
e.printStackTrace();
}
}
//8.关闭连接
if(connection != null && connection.isOpen()){
try {
connection.close();
}
catch (Exception e){
e.printStackTrace();
}
}
}
}
}
6.2 消费者
package com.echo.all;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
private static Runnable runnable = () -> {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.45.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("Consumer");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, false, false, null);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ":开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
};
public static void main(String[] args) {
// 启动4个线程去执行,这4个queue是之前使用web界面操作时,定义好的。
new Thread(runnable, "queue5").start();
new Thread(runnable, "queue6").start();
new Thread(runnable, "queue7").start();
}
}
7 RabbirMQ-Springboot
7.1 简单的配置
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange fanoutExchange(){
return new DirectExchange("fanout-exchange-order",true,false);
}
@Bean
public Queue emailQueue(){
return new Queue("email-fanout-queue");
}
@Bean
public Queue smsQueue(){
return new Queue("sms-fanout-queue");
}
@Bean
public Queue duanxinQueue(){
return new Queue("duanxin-fanout-queue");
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange()).with("email");
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange()).with("sms");
}
@Bean
public Binding daunxinBinding(){
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()).with("duanxin");
}
}
7.2 简单的用法
非常简单懒得粘贴了,直接用 rabbitTemplate 见名知意就好
8 TTL 过期时间
8.1 队列的TTL
@Bean
public Queue ttlQueue(){
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
return new Queue("ttl-direct-queue",true,false,false,args);
}
8.2 消息的 TTL
public void makeOrderTTLMessage(String userId,String productId,Integer num){
//1.根据商品ID查询库存是否充足
//2.保存订单
String orderId = UUID.randomUUID().toString().replace("-","");
System.out.println("订单:" + orderId + "生成成功TTLMessage.....");
//3.通过MQ来完成消息的分发
String exchangeName = "ttl-direct-exchange"; //交换机名称
String routeKey1 = "ttlMessage";
/**
* 队列与交换机的绑定等在消费方
*/
//给消息设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置过期时间5秒
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
//参数说明 param1:交换机名称,param2:路由key/queue队列名称,param3:消息内容
rabbitTemplate.convertAndSend(exchangeName,routeKey1,orderId,messagePostProcessor);
}