首页 > 其他分享 >rabbitmq在springboot中实战技巧

rabbitmq在springboot中实战技巧

时间:2023-07-04 17:33:07浏览次数:64  
标签:实战 springboot exchange 队列 factory rabbitmq rabbitAdmin new true

一.简介

rabbitmq是基于AMQP(Advanced Message Queuing Protocol:高级消息队列协议),采用Erlang语言编写的消息队列。

二、mq能用来做什么

  • 异步处理:将非核心业务(比如日志、邮件、监控等)从主流程剥离,提升主流程的响应时效。
  • 削峰:当并发大的情况下,可以将消息暂存在消息队列中,消费者按照自己的消费能力逐步处理,避免并发过大导致的系统响应时效延长甚至瘫痪的问题。
  • 解耦:生产者只负责发送到消息队列,消费者只负责从队列获取消息,无需直接对接,职责更加单一,也提升了系统的扩展性。

三.基本概念

在深入学习rabbitmq,你最好先简单了解下它的设计思想:

image

以上是rabbitmq官网提供的amqp简单模型,其中会涉及到如下概念:

Exchange

交换器,用来接受生产者发送的消息并将消息路由按照规则路由给指定队列。

Exchange类型

Direct

exchange仅当routingKey和bindingKey完全相同时,才会对应消息分发到对应队列

Fanout

发布订阅模式,exchange忽略路由规则,将消息全量分发到所有绑定的消费者。

Topic

与direct类似,只是支持routingKey的模糊匹配。

#:匹配0或多个单词
*:匹配1个单词

Headers

按消息头中指定参数进行路由。

Queue

队列,保存消息并发送给消费者。

Binding

消息队列和交换器之间的关联。

Routing Key

路由的key值,在exhcange类型为direct和topic时生效,exchange会将消息推送至绑定的、满足(direct)或部分满足(topic)路由key的queue中。

Binding Key

与routingkey类似,两者完全匹配才可以推送。

Publisher

生产者,产生的消息传送给exchange(非直接提供给消费者)。

Consumer

消费者,监听队列并消费消息。

这些概念你也需要了解

Connection

代表一个网络链接,比如TCP/IP套接字链接。

Channel

信道,多路复用链接中的一条独立的双向数据流通道,一个Connection中可以包含多个Channel。

Acknowledge

ack的模式,主要分为以下三种:

  • NONE:无需ack(自动ack),会导致prefetchCount失效
  • AUTO(springboot中有 ,默认):在程序执行完成后ack,在程序执行异常后unack(除了)
  • MANUAL:人工ack,需在代码中添加ack代码

如何配置消费者的acknowledge mode

1.默认全局指定的方式

在application.yml中加入如下配置:

spring: yml
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual # 开启手动确认,自动是auto

2.消费端自定义的形式

如果对于部分消费者需要自定义ack方式,可以采用重写containerFactory的方法

	@Bean("pointTaskContainerFactory")
	public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer
		, ConnectionFactory connectionFactory){
		Security.setProperty("crypto.policy", "unlimited");
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		//指定最大一次可接受消息数量
		factory.setPrefetchCount(1);
		//指定并发的消费者数量
		factory.setConcurrentConsumers(12);
		//消费者的ack模式
		factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
		configurer.configure(factory,connectionFactory);
		return factory;
	}
	
	@RabbitListener(queues="queueName",containerFactory="pointTaskContainerFactory")
        public void policyReceive(@Payload String Customer) throws Exception{
	    //业务代码略
	}
	

四.入门案例

官网的入门案例已经比较全面,不多做赘述,可以通过官网quick start入手。

https://www.rabbitmq.com/getstarted.html

五.实战使用

在实际使用过程中我们更多是通过springboot进行操作。除了常见的生产-消费形式外,接下来我们看看还可以有哪些进阶的玩法。

1.根据积压的消息数量动态增加/减少消费者数量

默认情况下,一个实例只会生成一个消费者,这对于我们肯定是不够的。
简单看下@RabbitListener注解的属性:
image.png
其中containerFactory主要就是用来生成rabbitListener的容器工厂,其默认实现为:SimpleRabbitListenerContainerFactory
该默认实现中有很多自定义的配置信息,要实现动态增加和删减消费者,则需要使用到以下属性:

// 最大并发消费数,SimpleMessageListenerContainer中默认10s
private Integer maxConcurrentConsumers;
// 启动消费者的最小间隔,SimpleMessageListenerContainer默认60s
private Long startConsumerMinInterval;
// 关闭消费者的最小间隔
private Long stopConsumerMinInterval;
// 活跃周期次数,默认十次
private Integer consecutiveActiveTrigger;
// 空闲周期次数,默认十次
private Integer consecutiveIdleTrigger;

我们可以通过自定义@RabbitListener的containerFactory实现该功能。

操作步骤

1.1 创建自定义RabbitListenerContainerFactory

@Bean("pointTaskContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer
            , ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    //每一次ack可以发送多少条消息给一个消费者
    factory.setPrefetchCount(1);
    //初始化消费者数量
    factory.setConcurrentConsumers(3);
    //最大消费者数量
    factory.setMaxConcurrentConsumers(10);
    //ack的模式
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
  	//活跃周期次数
    factory.setConsecutiveActiveTrigger(10);
    //空闲周期次数
    factory.setConsecutiveIdleTrigger(10);
    configurer.configure(factory, connectionFactory);
    return factory;
}

1.2 修改@RabbitListener的containerFactory为自定义的工厂Bean

image.png

2.异常自动重试指定次数,重试间隔可以按指定系数增长

通过配置每次重试之间加上指数级别的间隔,可以很好的避免由于部分消费失败导致的后续消息无法消费的问题。

操作步骤

2.1 配置项

在application.properties或者配置中心对应配置文件中新增如下配置:

# 是否开启消费失败自动重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 自动重试的最大次数,包括首次请求
spring.rabbitmq.listener.simple.retry.max-attempts=6
# 自动重试初始化间隔,单位:ms
spring.rabbitmq.listener.simple.retry.initial-interval=2000
# 自动重试最大间隔,单位:ms
spring.rabbitmq.listener.simple.retry.max-interval=20000
# 间隔系数,例如:初始间隔为2s,间隔系数为2,则重试的间隔依次为2s、4s、8s、16s、20s(最大间隔为20s)
spring.rabbitmq.listener.simple.retry.multiplier=2

3.有限次重试后自动ack消息,不再重回队列,或者转发到其他队列

部分消息由于系统本身的异常导致无限循环unack消息重回队列头,很容易造成消息的积压,这种情况下结合上面的重试机制,并且为队列绑定死信转发(或者错误处理队列),将明确为reject的消息转移到死信队列(或者错误处理队列),在对应的队列中通过邮件、短信等其他形式将异常发送给对应的处理人。

操作步骤

3.1 自定义messageRecoverer Bean和错误队列绑定

@Bean
public DirectExchange errorExchange(){
    return new DirectExchange("error-exchange",true,false);
}

@Bean
@Qualifier("error")
public Queue errorQueue(){
    return new Queue("error-queue", true);
}

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
    return BindingBuilder.bind(errorQueue).to(errorExchange).with("error-routing-key");
}

@Bean
public MessageRecoverer messageRecoverer(){
    return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange","error-routing-key");
}

3.2 配置错误处理队列处理逻辑

@Slf4j
@RabbitListener(queues = "error-queue", containerFactory = "pointTaskContainerFactory")
public class ErrorReceiver {

    @RabbitHandler
    public void receive(@Payload String body, @Headers Map<String,Object> headers) {
        //消息正文
        log.info("==body : {}", body);
        //消息头,包括异常堆栈、消息类型等信息
        if (!CollectionUtils.isEmpty(headers)) {
            log.info("==headers : ");
            headers.entrySet().stream().forEach(s-> log.info("{}:{}", s.getKey(),  s.getValue()));
        }
        // 进行发送邮件、短信报警等操作,具体步骤自行实现

    }
}

4.动态创建queue、exchange、binding


springboot基于amqp协议的rabbitAdmin,可以支持我们在运行时动态创建queue、exchange、binding关系
这对于搭建非业务相关的平台类应用,区分不同来源数据和数据分流避免互相影响有着一定的参考意义。

常用API

exchange

//创建四种类型的 Exchange,均为持久化,不自动删除
rabbitAdmin.declareExchange(new DirectExchange("direct.exchange",true,false));
rabbitAdmin.declareExchange(new TopicExchange("topic.exchange",true,false));
rabbitAdmin.declareExchange(new FanoutExchange("fanout.exchange",true,false));
rabbitAdmin.declareExchange(new HeadersExchange("header.exchange",true,false));
//删除 Exchange
rabbitAdmin.deleteExchange("header.exchange");

queue

//定义队列,均为持久化
rabbitAdmin.declareQueue(new Queue("debug",true));
rabbitAdmin.declareQueue(new Queue("info",true));
rabbitAdmin.declareQueue(new Queue("error",true));
//删除队列      rabbitAdmin.deleteQueue("debug");
//将队列中的消息全消费掉
rabbitAdmin.purgeQueue("info",false);

binding

//绑定队列到交换器,通过路由键
rabbitAdmin.declareBinding(new Binding("debug",Binding.DestinationType.QUEUE,
        "direct.exchange","key.1",new HashMap()));

rabbitAdmin.declareBinding(new Binding("info",Binding.DestinationType.QUEUE,
        "direct.exchange","key.2",new HashMap()));

rabbitAdmin.declareBinding(new Binding("error",Binding.DestinationType.QUEUE,
        "direct.exchange","key.3",new HashMap()));

//进行解绑
rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("info")).
        to(new TopicExchange("direct.exchange")).with("key.2"));

//使用BindingBuilder进行绑定
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("info")).
        to(new TopicExchange("topic.exchange")).with("key.#"));

//声明topic类型的exchange
rabbitAdmin.declareExchange(new TopicExchange("exchange1",true,false));
rabbitAdmin.declareExchange(new TopicExchange("exchange2",true,false));

//exchange与exchange绑定
rabbitAdmin.declareBinding(new Binding("exchange1",Binding.DestinationType.EXCHANGE,
        "exchange2","key.4",new HashMap()));

操作步骤

4.1 自定义rabbitAdmin Bean

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
}

4.2 添加对应接口

@RestController
@RequestMapping("/mq")
@Slf4j
public class RabbitAdminController {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @PostMapping("/queue")
    public void createQueue(@RequestParam("name") String name) {
        log.info(name);
        if (!StringUtils.isEmpty(name)) {
            System.out.println(rabbitAdmin.declareQueue(new Queue(name)));

        }
    }
}

标签:实战,springboot,exchange,队列,factory,rabbitmq,rabbitAdmin,new,true
From: https://www.cnblogs.com/imaoburu/p/17526359.html

相关文章

  • Springboot : 连接ldap超时问题
    Err:java.net.ConnectException:Connectiontimedoutwhenconnectingtoldap使用springbootldap连接账号所属ldap目录验证时,出现如上报错经检查,host,username,password等信息均无误,如下为代码中的配置信息示例hashEnv.put(Context.SECURITY_AUTHENTICATION,"simple"......
  • JS逆向实战20——某头条jsvm逆向
    声明本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除!网站目标网站:aHR0cHM6Ly93d3cudG91dGlhby5jb20v数据接口:aHR0cHM6Ly93d3cudG91dGlhby5jb20vYXBpL3BjL2xp......
  • 面试题-SpringBoot
    概述说说SpringBoot和Spring的关系springboot我理解就是把springspringmvcspringdatajpa等等的一些常用的常用的基础框架组合起来,提供默认的配置,然后提供可插拔的设计,就是各种starter,来方便开发者使用这一系列的技术,套用官方的一句话,spring家族发展到今天,已经......
  • 实战项目:构建基于Spring Boot和Vue.js的金融项目分享
    学习SpringBoot和Vue.js结合的前后端分离项目可以按照以下步骤进行:掌握SpringBoot:学习SpringBoot的基本概念和核心特性,如自动配置、起步依赖、注解驱动等。了解Spring框架的基本知识,如IoC容器、AOP、MVC模式等。学习Vue.js:学习Vue.js的基本语法、指令和组件,理解Vue实例、数据绑......
  • springboot封装redission的分布式锁逻辑为注解
    场景概述使用分布式锁的时候,每次都需要使用trycatch处理方法中的逻辑。考虑是否可以这块逻辑抽离出来。实现在自定义的注解中添加属性来设置锁的等待时间、租赁时间和时间单位importjava.lang.annotation.*;@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTI......
  • SpringBoot教学资料6-SpringBoot登录注册功能实现(带简单前端)
     项目样式:      SQL:CREATETABLE`t_user`(`id`int(11)NOTNULLAUTO_INCREMENT,`username`varchar(32)NOTNULL,`password`varchar(32)NOTNULL,PRIMARYKEY(`id`),UNIQUEKEY`username`(`username`))ENGINE=InnoDBAUTO_INCR......
  • SpringBoot SpringCloud Nacos等一些组件版本对应
    毕业版本依赖关系(推荐使用)由于SpringBoot2.4+和以下版本之间变化较大,目前企业级客户老项目相关SpringBoot版本仍停留在SpringBoot2.4以下,为了同时满足存量用户和新用户不同需求,社区以SpringBoot2.4为分界线,同时维护2.2.x和2021.x两个分支迭代。2021.x分支......
  • SpringBoot教学资料5-SpringBoot一对多查询(带简单前端)
    项目展示:  项目结构:SQL:CREATETABLE`t_article`(`id`int(20)NOTNULLAUTO_INCREMENTCOMMENT'文章id',`title`varchar(200)DEFAULTNULLCOMMENT'文章标题',`content`longtextCOMMENT'文章内容',PRIMARYKEY(`id`))ENGINE=......
  • SpringBoot教学资料4-SpringBoot简单增删改查(带前端)
    最终样式:增: 删:  改:  项目结构:     - springboot1.5.9以下兼容jdk1.7- springboot2.x.x版本兼容jdk1.8- springboot3.0及以上版本兼容jdk17- springboot2.1之后的版本已经兼容JDK11 pom.xml:<?xmlversion="1.0"encoding="UTF-8"?><......
  • SpringBoot教学补充资料3-Maven安装
    Maven下载地址:https://maven.apache.org/download.cgi下载后进行解压,记住解压路径。         mvn -v ......