一.简介
rabbitmq是基于AMQP(Advanced Message Queuing Protocol:高级消息队列协议),采用Erlang语言编写的消息队列。
二、mq能用来做什么
- 异步处理:将非核心业务(比如日志、邮件、监控等)从主流程剥离,提升主流程的响应时效。
- 削峰:当并发大的情况下,可以将消息暂存在消息队列中,消费者按照自己的消费能力逐步处理,避免并发过大导致的系统响应时效延长甚至瘫痪的问题。
- 解耦:生产者只负责发送到消息队列,消费者只负责从队列获取消息,无需直接对接,职责更加单一,也提升了系统的扩展性。
三.基本概念
在深入学习rabbitmq,你最好先简单了解下它的设计思想:
以上是rabbitmq官网提供的amqp简单模型,其中会涉及到如下概念:
Exchange
交换器,用来接受生产者发送的消息并将消息路由按照规则路由给指定队列。
Exchange类型
Direct
exchange仅当routingKey和bindingKey完全相同时,才会对应消息分发到对应队列
Fanout
发布订阅模式,exchange忽略路由规则,将消息全量分发到所有绑定的消费者。
Topic
与direct类似,只是支持routingKey的模糊匹配。
#:匹配0或多个单词
*:匹配1个单词
Headers
按消息头中指定参数进行路由。
Queue
队列,保存消息并发送给消费者。
Binding
消息队列和交换器之间的关联。
Routing Key
路由的key值,在exhcange类型为direct和topic时生效,exchange会将消息推送至绑定的、满足(direct)或部分满足(topic)路由key的queue中。
Binding Key
与routingkey类似,两者完全匹配才可以推送。
Publisher
生产者,产生的消息传送给exchange(非直接提供给消费者)。
Consumer
消费者,监听队列并消费消息。
这些概念你也需要了解
Connection
代表一个网络链接,比如TCP/IP套接字链接。
Channel
信道,多路复用链接中的一条独立的双向数据流通道,一个Connection中可以包含多个Channel。
Acknowledge
ack的模式,主要分为以下三种:
- NONE:无需ack(自动ack),会导致prefetchCount失效
- AUTO(springboot中有 ,默认):在程序执行完成后ack,在程序执行异常后unack(除了)
- MANUAL:人工ack,需在代码中添加ack代码
如何配置消费者的acknowledge mode
1.默认全局指定的方式
在application.yml中加入如下配置:
spring: yml
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 开启手动确认,自动是auto
2.消费端自定义的形式
如果对于部分消费者需要自定义ack方式,可以采用重写containerFactory的方法
@Bean("pointTaskContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer
, ConnectionFactory connectionFactory){
Security.setProperty("crypto.policy", "unlimited");
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//指定最大一次可接受消息数量
factory.setPrefetchCount(1);
//指定并发的消费者数量
factory.setConcurrentConsumers(12);
//消费者的ack模式
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
configurer.configure(factory,connectionFactory);
return factory;
}
@RabbitListener(queues="queueName",containerFactory="pointTaskContainerFactory")
public void policyReceive(@Payload String Customer) throws Exception{
//业务代码略
}
四.入门案例
官网的入门案例已经比较全面,不多做赘述,可以通过官网quick start入手。
https://www.rabbitmq.com/getstarted.html
五.实战使用
在实际使用过程中我们更多是通过springboot进行操作。除了常见的生产-消费形式外,接下来我们看看还可以有哪些进阶的玩法。
1.根据积压的消息数量动态增加/减少消费者数量
默认情况下,一个实例只会生成一个消费者,这对于我们肯定是不够的。
简单看下@RabbitListener
注解的属性:
其中containerFactory主要就是用来生成rabbitListener的容器工厂,其默认实现为:SimpleRabbitListenerContainerFactory
。
该默认实现中有很多自定义的配置信息,要实现动态增加和删减消费者,则需要使用到以下属性:
// 最大并发消费数,SimpleMessageListenerContainer中默认10s
private Integer maxConcurrentConsumers;
// 启动消费者的最小间隔,SimpleMessageListenerContainer默认60s
private Long startConsumerMinInterval;
// 关闭消费者的最小间隔
private Long stopConsumerMinInterval;
// 活跃周期次数,默认十次
private Integer consecutiveActiveTrigger;
// 空闲周期次数,默认十次
private Integer consecutiveIdleTrigger;
我们可以通过自定义@RabbitListener的containerFactory实现该功能。
操作步骤
1.1 创建自定义RabbitListenerContainerFactory
@Bean("pointTaskContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer
, ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//每一次ack可以发送多少条消息给一个消费者
factory.setPrefetchCount(1);
//初始化消费者数量
factory.setConcurrentConsumers(3);
//最大消费者数量
factory.setMaxConcurrentConsumers(10);
//ack的模式
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//活跃周期次数
factory.setConsecutiveActiveTrigger(10);
//空闲周期次数
factory.setConsecutiveIdleTrigger(10);
configurer.configure(factory, connectionFactory);
return factory;
}
1.2 修改@RabbitListener的containerFactory为自定义的工厂Bean
2.异常自动重试指定次数,重试间隔可以按指定系数增长
通过配置每次重试之间加上指数级别的间隔,可以很好的避免由于部分消费失败导致的后续消息无法消费的问题。
操作步骤
2.1 配置项
在application.properties或者配置中心对应配置文件中新增如下配置:
# 是否开启消费失败自动重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 自动重试的最大次数,包括首次请求
spring.rabbitmq.listener.simple.retry.max-attempts=6
# 自动重试初始化间隔,单位:ms
spring.rabbitmq.listener.simple.retry.initial-interval=2000
# 自动重试最大间隔,单位:ms
spring.rabbitmq.listener.simple.retry.max-interval=20000
# 间隔系数,例如:初始间隔为2s,间隔系数为2,则重试的间隔依次为2s、4s、8s、16s、20s(最大间隔为20s)
spring.rabbitmq.listener.simple.retry.multiplier=2
3.有限次重试后自动ack消息,不再重回队列,或者转发到其他队列
部分消息由于系统本身的异常导致无限循环unack消息重回队列头,很容易造成消息的积压,这种情况下结合上面的重试机制,并且为队列绑定死信转发(或者错误处理队列),将明确为reject的消息转移到死信队列(或者错误处理队列),在对应的队列中通过邮件、短信等其他形式将异常发送给对应的处理人。
操作步骤
3.1 自定义messageRecoverer Bean和错误队列绑定
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error-exchange",true,false);
}
@Bean
@Qualifier("error")
public Queue errorQueue(){
return new Queue("error-queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error-routing-key");
}
@Bean
public MessageRecoverer messageRecoverer(){
return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange","error-routing-key");
}
3.2 配置错误处理队列处理逻辑
@Slf4j
@RabbitListener(queues = "error-queue", containerFactory = "pointTaskContainerFactory")
public class ErrorReceiver {
@RabbitHandler
public void receive(@Payload String body, @Headers Map<String,Object> headers) {
//消息正文
log.info("==body : {}", body);
//消息头,包括异常堆栈、消息类型等信息
if (!CollectionUtils.isEmpty(headers)) {
log.info("==headers : ");
headers.entrySet().stream().forEach(s-> log.info("{}:{}", s.getKey(), s.getValue()));
}
// 进行发送邮件、短信报警等操作,具体步骤自行实现
}
}
4.动态创建queue、exchange、binding
springboot基于amqp协议的rabbitAdmin,可以支持我们在运行时动态创建queue、exchange、binding关系
这对于搭建非业务相关的平台类应用,区分不同来源数据和数据分流避免互相影响有着一定的参考意义。
常用API
exchange
//创建四种类型的 Exchange,均为持久化,不自动删除
rabbitAdmin.declareExchange(new DirectExchange("direct.exchange",true,false));
rabbitAdmin.declareExchange(new TopicExchange("topic.exchange",true,false));
rabbitAdmin.declareExchange(new FanoutExchange("fanout.exchange",true,false));
rabbitAdmin.declareExchange(new HeadersExchange("header.exchange",true,false));
//删除 Exchange
rabbitAdmin.deleteExchange("header.exchange");
queue
//定义队列,均为持久化
rabbitAdmin.declareQueue(new Queue("debug",true));
rabbitAdmin.declareQueue(new Queue("info",true));
rabbitAdmin.declareQueue(new Queue("error",true));
//删除队列 rabbitAdmin.deleteQueue("debug");
//将队列中的消息全消费掉
rabbitAdmin.purgeQueue("info",false);
binding
//绑定队列到交换器,通过路由键
rabbitAdmin.declareBinding(new Binding("debug",Binding.DestinationType.QUEUE,
"direct.exchange","key.1",new HashMap()));
rabbitAdmin.declareBinding(new Binding("info",Binding.DestinationType.QUEUE,
"direct.exchange","key.2",new HashMap()));
rabbitAdmin.declareBinding(new Binding("error",Binding.DestinationType.QUEUE,
"direct.exchange","key.3",new HashMap()));
//进行解绑
rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("info")).
to(new TopicExchange("direct.exchange")).with("key.2"));
//使用BindingBuilder进行绑定
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("info")).
to(new TopicExchange("topic.exchange")).with("key.#"));
//声明topic类型的exchange
rabbitAdmin.declareExchange(new TopicExchange("exchange1",true,false));
rabbitAdmin.declareExchange(new TopicExchange("exchange2",true,false));
//exchange与exchange绑定
rabbitAdmin.declareBinding(new Binding("exchange1",Binding.DestinationType.EXCHANGE,
"exchange2","key.4",new HashMap()));
操作步骤
4.1 自定义rabbitAdmin Bean
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
4.2 添加对应接口
@RestController
@RequestMapping("/mq")
@Slf4j
public class RabbitAdminController {
@Autowired
private RabbitAdmin rabbitAdmin;
@PostMapping("/queue")
public void createQueue(@RequestParam("name") String name) {
log.info(name);
if (!StringUtils.isEmpty(name)) {
System.out.println(rabbitAdmin.declareQueue(new Queue(name)));
}
}
}
标签:实战,springboot,exchange,队列,factory,rabbitmq,rabbitAdmin,new,true
From: https://www.cnblogs.com/imaoburu/p/17526359.html