文章目录
- 1.中间件
- 1.1 特点
- 1.2消息中间件
- 1.3 消息组成部分
- 1.4协议
- 1.5 协议组成
- 1.6 `AMQP`
- 1.6.1 生产者流转过程
- 1.6.2 消费者流转过程
- 1.7 `MQTT`
- 1.8 `OpenMessage`
- 1.9 `Kafka`
- 2. 安装
- 2.1 `rpm`安装
- 2.2 `docker `安装
- 3.`RabbitMQ`
- 3.1 核心概念
- 3.2 组成
- 3.3运行流程
- 3.4 消息模型
- 3.5 使用场景、解决问题
- 3.6 `SpringBoot 2.x` 整合使用
- 3.6.1 依赖
- 3.6.2 配置(fanout)
- 3.6.3 producer
- 3.6.4 consumer
- 3.6.5 `topic`匹配规则
- 3.7 持久化
- 3.7.1 消息什么时候需要持久化
- 3.7.2 消息什么时候会刷到磁盘
- 3.7.3 消息保存到磁盘个格式
- 3.7.4 文件何时删除
- 3.7.5 存储位置
- 3.8 排他队列(Exclusive Queue)
- 3.9 阻塞队列
- 4.`RabbitMQ`高级
- 4.1 过期时间
- 4.1.1 消息TTL
- 4.1.2 队列TTL
- 4.2 死信队列
- 4.2.1 消息死信的原因
- 4.2.1.1 业务交换机
- 4.2.1.2 死信交换机
- 4.2.1.3业务队列
- 4.2.1.4 死信队列
- 4.2.1.5 业务队列与业务交换机绑定
- 4.2.1.6 死信队列与业务交换机绑定
- 4.3 监控
- 4.3.1 监控
- 4.3.2 内存换页
- 4.3.4 磁盘预警
- 4.4 集群
- 4.5 分布式事务
- 4.5.1 可靠生产
- 4.5.2 可靠消费
- 4.5.3 适用场景
1.中间件
中间件(
Middleware
)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来。
1.1 特点
- 满足大量应用的需要
- 运行于多种硬件和 OS平台
- 支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
- 支持标准的协议
- 支持标准的接口
1.2消息中间件
利用可靠的消息传递机制进行系统和系统直接的通讯,通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯,是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务。
1.3 消息组成部分
- 协议
- 持久化机制
- 分发策略
- 容错
1.4协议
- 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流2. 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高3. 协议对数据格式和计算机之间交换数据都必须严格遵守规范。
1.5 协议组成
- 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
- 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
- 时序:时序是对事件发生顺序的详细说明
1.6 AMQP
AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。(支持者: RabbitMQ ActiveMQ)
- 分布式事务
- 消息的持久化
- 高性能和高可靠
1.6.1 生产者流转过程
1.6.2 消费者流转过程
1.7 MQTT
MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分。支持者: RabbitMQ ActiveMQ)
- 轻量
- 结构简单
- 传输快
- 不支持事务
- 不支持持久化
1.8 OpenMessage
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准。
- 结构简单
- 解析速度快
- 事务
- 持久化
1.9 Kafka
Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成。
- 结构简单
- 解析速度快
- 无事务支持
- 有持久化设计
2. 安装
2.1 rpm
安装
javascript:void(0)
2.2 docker
安装
javascript:void(0)
3.RabbitMQ
3.1 核心概念
- Server:又称Broker,接收客户端连接,实现AMQP服务。
- Connection: 应用程序和Broker的网络连接。TCPIP三次握手四次握手
- Channel: 网络信道,对应一个会话任务,进行消息的读写
- Message: 消息,服务与应用程序之间传送的数据,由Properties(对消息的描述)和body(消息)组成
- Virtual Host: 虚拟地址,用于进行逻辑隔离
- Queue: 队列
- Exchange: 交换机,根据路由键分发到队列
- Bindings: 交换机和队列之间的虚拟连接
- Routing key: 路由规则
3.2 组成
3.3运行流程
3.4 消息模型
- 简单模式
- 工作模式
- 公平分发(能者多劳)
- producer
跟简单模式一样
- consumer
-
maven
//简单模式
public class Consumer{
//3.接受内容
//指标定义出来
channel.basicQos(1);
channel.basicConsume("queue1",false,new DefaultConsumer(){
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(new String("收到消息是" + new String(meassage.getBody()),"UTF-8"));
//改成手动应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
},new CancelCallback(){
public void handle(String consumerTag) throws IOException {
System.out.println("接受失败了");
}
});
//4.关闭
channel.close();
connection.close();
}
-
springboot
# TODO 公平分发 0:(默认)轮询 1:公平分发
# listener:
# direct:
# prefetch: 1
# acknowledge-mode: manual
- 轮询
跟简单模式一样。
- 发布订阅模式
- 路由模式
- 主题模式
- 参数模式
- 发布确认模式
3.5 使用场景、解决问题
- 线程池异步处理(削峰)
- 解耦(模块分离,高内聚,低耦合)
- 消息可靠(持久化)
- 高可用(集群)
3.6 SpringBoot 2.x
整合使用
完整代码地址: https://gitee.com/dingwen-gitee/stu-spr-boo-rab.git
3.6.1 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.dingwen</groupId>
<artifactId>stu-spr-boo-rab</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dingwen.rabcon</groupId>
<artifactId>rab-con</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rab-con</name>
<description>rab-con</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.6.2 配置(fanout)
package com.dingwen.rabcon.config;
import com.dingwen.rabcon.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq 配置类
*
* @author dingwen
* 2021.06.17 10:36
*/
@Configuration
public class RabbitmqConfiguration {
/**
* 声明fanout模式的交换机
*
* @return {@link FanoutExchange}
*/
@Bean
public FanoutExchange fanoutExchange() {
// 持久化、关闭自动删除
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
}
/**
* 短信任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue smsQueue() {
// 持久化、排他性队列、关闭自动删除
return new Queue(RabbitConstant.FANOUT_QUEUE_SMS_NAME);
}
/**
* 邮件任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue emailQueue() {
// 持久化、排他性队列、关闭自动删除
return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME);
}
/**
* 短信队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
/**
* 邮箱队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
3.6.3 producer
package com.dingwen.rabpro.service.impl;
import com.dingwen.rabpro.constant.RabbitConstant;
import com.dingwen.rabpro.service.OrderService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* order service impl
*
* @author dingwen
* 2021.06.17 11:11
*/
@Service
public class OrderServiceImpl implements OrderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void placeOrder() {
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, null, UUID.randomUUID().toString());
}
}
3.6.4 consumer
package com.dingwen.rabcon.service.impl;
import com.dingwen.rabcon.constant.RabbitConstant;
import com.dingwen.rabcon.service.EmailService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* email service impl
*
* @author dingwen
* 2021.06.17 11:22
*/
@Service
@RabbitListener(queues = {RabbitConstant.FANOUT_QUEUE_EMAIL_NAME})
public class EmailServiceImpl implements EmailService {
@Override
@RabbitHandler
public void sendEmail(String message) {
System.out.println("E: message = " + message);
}
}
3.6.5 topic
匹配规则
- “*”: 只有一个一级
- “#”: 至少一个一级
3.7 持久化
3.7.1 消息什么时候需要持久化
- 消息本身在publish的时候就要求消息写入磁盘
- 内存紧张,需要将部分内存中的消息转移到磁盘
3.7.2 消息什么时候会刷到磁盘
- 写入文件前会有一个Buffer,大小为
1M
(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘) - 有个固定的刷盘时间:
25ms
,也就是不管Buffer满不满,每隔25ms
,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘 - 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作
3.7.3 消息保存到磁盘个格式
消息保存于
$MNESIA/msg_store_persistent/x.rdq
文件中,其中x为数字编号,从1开始,每个文件最大为16M
(16777216),超过这个大小会生成新的文件,文件编号加1。
3.7.4 文件何时删除
- 当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率
- publish消息时写入内容,
ack
消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件
3.7.5 存储位置
3.8 排他队列(Exclusive Queue)
只有自己可见的队列,即不允许其它用户访问
- 只对首次声明它的连接(Connection)可见
- 在其连接断开的时候自动删除(包括持久态队列)
- 是限制连接而不是通道
如果试图在一个不同的连接中重新声明或访问(如publish,consume)该排他性队列,会得到资源被锁定的错误
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'UserLogin2'
3.9 阻塞队列
当队列为空后者队列慢的情况下回等待,当队列有空不会空是进行操作。
4.RabbitMQ
高级
4.1 过期时间
可对消息设置过期时间(TTL),表示消息在这个时间内可以被消费。消息过期后被删除或放入死信队列。可对单条消息进行设置,也可以对整个队列进行设置。如果两种方法同时使用则按照TTL小的为准。
4.1.1 消息TTL
package com.dingwen.rabpro.service.impl;
import com.dingwen.rabpro.constant.RabbitConstant;
import com.dingwen.rabpro.service.OrderService;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* order service impl
*
* @author dingwen
* 2021.06.17 11:11
*/
@Service
public class OrderServiceImpl implements OrderService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void placeOrder() {
// 测试消息过期时间
MessagePostProcessor messagePostProcessor = (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setContentEncoding("UTF-8");
// 过期时间 5000 毫秒
messageProperties.setExpiration("5000");
return message;
};
// fanout
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, null, UUID.randomUUID().toString(), messagePostProcessor);
// direct topic
// 同上需要指定路由key
}
}
4.1.2 队列TTL
package com.dingwen.rabcon.config;
import com.dingwen.rabcon.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitmq 配置类
*
* @author dingwen
* 2021.06.17 10:36
*/
@Configuration
public class RabbitmqConfigurationFanout {
/**
* 声明fanout模式的交换机
*
* @return {@link FanoutExchange}
*/
@Bean
public FanoutExchange fanoutExchange() {
// 持久化、关闭自动删除
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
}
/**
* 短信任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue smsQueue() {
// 持久化、排他性队列、关闭自动删除
return new Queue(RabbitConstant.FANOUT_QUEUE_SMS_NAME);
}
/**
* 邮件任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue emailQueue() {
// 持久化、排他性队列、关闭自动删除
// TTL队列测试
Map<String, Object> args = new HashMap<>(1);
// 整形 毫秒
args.put("x-message-ttl", 5000);
return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME, true, false, false, args);
// return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME);
}
/**
* 短信队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
/**
* 邮箱队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
4.2 死信队列
DLX,dead letter exchange
,死信交换机。当一个队列中的消息变成死信后能被重新发送到指定的死信交换机中。DLX
也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq
就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可。
4.2.1 消息死信的原因
- 消息被拒绝
- 消息过期
- 队列达到最大长度
4.2.1.1 业务交换机
public FanoutExchange fanoutExchange() {
// 持久化、关闭自动删除
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
}
4.2.1.2 死信交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitConstant.DIRECT_QUEUE_DEAD_NAME, true, false);
}
4.2.1.3业务队列
/**
* 邮件任务队列
*
* @return {@link Queue}
*/
@Bean
public Queue emailQueue() {
// 持久化、排他性队列、关闭自动删除
// TTL队列测试
Map<String, Object> args = new HashMap<>(3);
// 整形 毫秒
args.put("x-message-ttl", 5000);
// 绑定死信交换机
args.put("x-dead-letter-exchange", RabbitConstant.DIRECT_EXCHANGE_NAME_DEAD);
args.put("x-dead-letter-routing-key", RabbitConstant.DIRECT_QUEUE__ROUTE_DEAD);
return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME, true, false, false, args);
// return new Queue(RabbitConstant.FANOUT_QUEUE_EMAIL_NAME);
}
4.2.1.4 死信队列
/**
* 死信队列
*
* @return {@link Queue}
*/
@Bean
public Queue deadQueue() {
return new Queue(RabbitConstant.DIRECT_QUEUE_DEAD_NAME, true, false, false);
}
4.2.1.5 业务队列与业务交换机绑定
/**
* 邮箱队列绑定到交换机
*
* @return {@link Binding}
*/
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
4.2.1.6 死信队列与业务交换机绑定
/**
* 死信队列绑定
*
* @return {@link Binding}
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(fanoutExchange());
}
4.3 监控
4.3.1 监控
当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,
RabbitMQ
会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
# 相对值方式
rabbitmqctl set_vm_memory_high_watermark <fraction> 建议0.4-0.6
# 绝对值方式
rabbitmqctl set_vm_memory_high_watermark absolute <memory_limit>
# 也可以修改配置文件
4.3.2 内存换页
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。默认情况下,内存到达的阈值是50%时就会换页处理。
也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时, 会进行换页动作。比如有
1000MB
内存,当内存的使用率达到了400MB
,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达
到极限400mb
之前,会把内存中的200MB
进行转移到磁盘中。从而达到稳健的运行。
vm_memory_high_watermark_paging_ratio = 0.75
4.3.4 磁盘预警
当可用磁盘空间低于配置的限制(默认为
50MB
)时,将触发警报,所有生产者将被阻止。目标是避免填满整个磁盘,这将导致节点上的所有写操作失败,并可能导致RabbitMQ
终止。为了减少填满磁盘的风险,所有传入的消息都被阻止。在内存不足的情况下,瞬时消息仍然被分页到磁盘,并且会占用已经有限的磁盘空间。如果磁盘警报设置得太低,并且信息被快速转出,则可能会耗尽磁盘空间,并在磁盘空间检查(至少间隔10秒)之间崩溃RabbitMQ
。更保守的方法是将限制设置为与系统上安装的内存量相同.
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
4.4 集群
https://www.jianshu.com/p/6376936845ff
4.5 分布式事务
分布式事务指事务的操作位于不同的节点上,需要保证事务的ACID特性。
解决方法:利用
RabbiMQ
的确认机制,生产者发送消息,必须要等到消费者确认收到信息,否则定时任务重发(使用消息冗余).消费端出现异常测进入死信队列人工处理。
4.5.1 可靠生产
4.5.2 可靠消费
4.5.3 适用场景
- 异步操作
- 业务容忍延迟