一、前言
本文基于作者对RabbitMQ使用的经验积累进行阶段性总结,希望没有使用经验的开发人员,看完本文可以直接上手。
1、RabbitMQ核心概念
Server:又称Broker,rabbitmq-server,一般指服务器运行的服务。
Connection:是客户端与 RabbitMQ 服务器之间的通信通道,用于发送和接收消息。
Channel:是一个虚拟连接,用于在单一的 RabbitMQ Connection 上管理消息的发布和消费。
Message:服务与应用程序之间传递的消息体,包含配置和消息主体,配置可以对消息进行修饰,比如消息的优先级,延迟等高级特性,主体就是消息体的内容,一般是json格式。
Virtual Host:是一个用于逻辑分组和资源分离的概念,它允许在单个RabbitMQ服务器上创建多个逻辑上相互隔离的消息代理。每个Virtual Host本质上是一个mini版的RabbitMQ服务器,拥有自己的Queue、Exchange、绑定和权限控制。通过Virtual Host,不同的应用或团队可以独立地管理自己的消息队列,避免了不同应用之间的冲突和干扰。用户连接到RabbitMQ时,需要指定要连接的Virtual Host名称,并且只能在指定的Virtual Host中进行操作。
Exchange:是消息的中转站,用于接收生产者发送的消息,并根据特定的路由规则将消息路由到一个或多个队列。RabbitMQ提供了多种类型的Exchange,包括Direct、Fanout、Topic和Headers等。
Bindings:是交换机和队列之间的连接规则。Bindings由交换机名称、队列名称和绑定键组成,它决定了消息如何从交换机路由到队列。当消息的Routing Key与绑定键匹配时,交换机会将消息发送到与之绑定的队列中。
Routing key:生产者发送消息时指定Routing Key,Exchange根据此Key将消息路由到一个或多个队列,队列与Exchange的绑定关系通过Binding Key确定。
Queue:队列也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
2、RabbitMQ交换机类型简述
直连交换机(Direct Exchange):根据消息的路由键(Routing Key)完全匹配的方式,将消息路由到指定的队列。适用于一对一的消息传递场景。
扇形交换机(Fanout Exchange):将消息广播到所有绑定到该交换机的队列,忽略路由键。适用于需要广播消息的场景。
主题交换机(Topic Exchange):根据消息的路由键和通配符模式进行匹配,将消息路由到一个或多个队列。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。适用于主题订阅模型,如邮件分类、日志级别过滤等。
头部交换机(Headers Exchange):不依赖路由键,而是根据消息的头部信息(Headers)进行匹配,在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。适用于需要根据消息的特定属性进行路由的场景。
二、SpringCloud stream 整合RabbitMQ
mq通过binder与应用联系,应用通过SpringCloud stream 的output(相当于生产者,生产消息)和input(相当于消费者,从队列中接收消息进行消费)通道与外界联系,消费者通@StreamListener 接听生产者的消息
配置文件 bootstrap.yml
server:
port : 9015
spring:
application:
name: clan-communication-service #注册到eureka的服务名称
profiles:
active: dev #指定配置文件-后缀
cloud:
config:
discovery:
enabled: true
service-id: clan-config
#rabbitmq配置
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: 127.0.0.1
username: guest
password: guest
virtual-host: /
bindings:
message-center-out:
destination: message-center #exchange名称,交换模式默认是topic
content-type: application/json #数据类型
message-center-input:
destination: message-center
content-type: application/json
eureka:
client:
register-with-eureka: true # 指定当前主机是否向Eureka服务器进行注册
fetch-registry: true # 指定当前主机是否要从Eurka服务器下载服务注册列表
service-url:
defaultZone: http://127.0.0.1:9000/eureka/
#日志设置
logging:
level:
com:
clan:
clan_base: debug
#关闭默认的feign熔断
feign:
hystrix:
enabled: false
hystrix:
command:
default:
execution:
isolation:
thread:
#设置熔断触发的超时时间(毫秒)
timeoutInMilliseconds: 5000
#rabbon设置
#ribbon:
# ReadTimeout: 60000
# ConnectTimeout: 60000
# maxAutoRetries: 0
消息生产者:OutputMessageBinding.java
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OutputMessageBinding {
/** Topic 名称*/
String OUTPUT = "message-center-out";
@Output(OUTPUT)
MessageChannel output();
}
OutputMessage .java
import com.clan.clan_communication.req.AccountReq;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@EnableBinding(OutputMessageBinding.class)
public class OutputMessage {
@Resource
private OutputMessageBinding outputMessageBinding;
public void sendMessage(AccountReq accountReq){
System.out.println("推送消息:" + accountReq.toString());
outputMessageBinding.output().send(MessageBuilder.withPayload(accountReq).build());
}
}
消息消费者InputMessageBinding.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface InputMessageBinding {
String INPUT = "message-center-input";
@Input(INPUT)
SubscribableChannel input();
}
CollectionReceiver.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@Slf4j
@EnableBinding(InputMessageBinding.class)
public class CollectionReceiver {
@StreamListener(InputMessageBinding.INPUT)
public void handle(String value){
log.info("[消息] 接收到发送消息MQ: {}", value);
}
}
注意点:
spring.cloud.stream.bindings.message-center-input.destination 中的 message-center-input 和 InputMessageBinding.INPUT 一致
- spring.cloud.stream.bindings.message-center-input.destination=message-center
- spring.cloud.stream.bindings.message-center-out.destination=message-center输入流和输出流的destination值要一致
三、使用场景及问题
1、RabbitMQ的作用
解耦:A系统把数据发送到BCD系统,如果通过接口调用,那么得在A系统去写调用各个系统的代码,如果再加一个E系统,又得改A系统代码,造成耦合,此时可以用mq,A系统产生一条消息,发送到mq,其他系统需要到消息就自己去mq里面消费
异步:A系统接收请求,需要在本地写库,还要在BCD三个系统写库,如果一个一个系统写耗时比较久才能得到回应,如果使用mq,A系统连续发送3条消息到mq,然后异步写入BCD库,此时A从接收请求到返回响应给用户,用时比较短。
削峰:减轻高并发下服务器的压力
2、如何保证消息的可靠性传输
消息丢失,如何处理?
丢失情况一:生产者将消息发送到rabbitmq,半路丢失
解决方案:
(1)开启rabbitmq事务,生产者收到异常报错,进行回滚重试,(同步的)
(2)开启confirm模式,mq接收到消息会反馈一个ok的消息,mq没能处理这个消息,就反馈一个失败的标志消息,(异步的)
丢失情况二:mq中丢失,mq弄丢了数据
解决方案:
开启RabbitMQ的持久化,消息写入之后持久化到磁盘
3、如何保证消息不被重复消费
(1)插入数据库的数据加个唯一键,这样可以保证不会出现脏数据
(2)在redis中存个id,每次操作数据之前查下id是否存在,存在的话就不做处理,不存在就操作数据库
4、消息队列有什么缺点
(1)系统可用性降低,引用外部依赖越多,容易挂掉,RabbitMQ挂,整个系统崩溃
(2)系统复杂度提高,消息有可能被重复消费,也有可丢失
(3)数据一致性问题,A处理成功了返回,BC成功,D失败,数据不一致
四、RabbitMQ监控
使用监控工具进行监控(如Prometheus或Zabbix)对服务器资源进行监控
- CPU状态(user、system、iowait&idle percentages
- 内存使用率(used、buffered、cached & free percentages)
- 虚拟内存统计信息(dirty page flushes, writeback volume)
- 磁盘I/O
- 装载上用于节点数据目录的可用磁盘空间
- beam.smp使用的文件描述符与最大系统限制
- 按状态列出的TCP连接(ESTABLISHED,CLOSE_WAIT,TIME_WATT)
- 网络吞吐量(bytes received,bytes sent) & 最大网络吞吐量
- 网络延迟(集群中所有RabbitMQ节点之间以及客户端之间)