首页 > 其他分享 >Spring Boot 集成 RabbitMQ 消息事务(消费者)

Spring Boot 集成 RabbitMQ 消息事务(消费者)

时间:2024-10-11 10:11:29浏览次数:6  
标签:事务 -- Spring 数据库 Boot RabbitMQ 消息 consumer public

1. Spring Boot 集成 RabbitMQ 消息事务(消费者)

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18
spring-boot-starter-jdbc 2.7.18

1.2. 概览

这里模拟一个常见的业务流程,消费者接收到一条 RabbitMQ 消息,此时消费者再发布一条 RabbitMQ 消息,同时更新数据库,更新失败时回滚数据库事务,同时拒绝先前接收到的消息,被拒绝的消息将被转发到死信队列,然后回滚消息事务,消费者发送出去的消息将被回滚,并不会真正发布到 RabbitMQ。这里涉及到分布式事务,本案例采用最大努力单阶段提交模式来实现事务管理。

sequenceDiagram participant producer as Spring Boot 应用(生产者) box lightYellow RabbitMQ participant exchange as 交换机 participant queue as 队列 participant deadLetterExchange as 死信交换机 participant deadLetterQueue as 死信队列 end participant consumer as Spring Boot 应用(消费者) participant mysql as 数据库 producer ->> exchange: 1. 发送消息 exchange ->> queue: 2. 转发消息 consumer ->> consumer: 3. 开启消息事务 queue ->> consumer: 4. 接收消息 consumer ->> consumer: 5. 开启数据库事务 consumer ->> exchange: 6. 发布消息 consumer ->> mysql: 7. 更新数据库 mysql ->> consumer: 8. 更新失败 consumer ->> consumer: 9. 回滚数据库事务,撤回第 7 步更新操作 consumer ->> consumer: 10. 回滚消息事务,撤回第 6 步发布操作 consumer ->> queue: 11. 拒绝消息 queue ->> deadLetterExchange: 12. 转发消息 deadLetterExchange ->> deadLetterQueue: 13. 转发消息

1.2.1. 最大努力单阶段提交模式

最大努力单阶段提交模式是相当普遍的,但在开发人员必须注意的某些情况下可能会失败。这是一种非 XA 模式,涉及了许多资源的同步单阶段提交。因为没有使用二阶段提交,它绝不会像 XA 事务那样安全,但是如果参与者意识到妥协,通常就足够了。许多高容量,高吞吐量的事务处理系统通过设置这种方式以达到提高性能的目的。

1.2.2. 成功的业务流程

flowchart TB 1["1. 开始消息事务"] --> 2["2. 接收消息"] --> 3["3. 开始数据库事务"] --> 4["4. 发布消息"] --> 5["5. 更新数据库"] --> 6["6. 提交数据库事务"] --> 7["7. 提交消息事务"]

1.2.3. 失败的业务流程

flowchart TB 1["1. 开始消息事务"] --> 2["2. 接收消息"] --> 3["3. 开始数据库事务"] --> 4["4. 发布消息"] --> 5["5. 更新数据库失败"] --> 6["6. 回滚数据库事务"] --> 7["7. 回滚消息事务"]

1.3. 新建数据库表

create table t_user
(
    id   int auto_increment primary key,
    name varchar(20) not null
);

1.4. Spring 配置

spring:
  application:
    name: spring-rabbit-transaction-consumer-demo
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: admin
    password: admin
    virtual-host: /
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/demo
    username: root
    password: root

1.5. 定义常量

public class RabbitTransactionConsumerConstants {
    public static final String QUEUE = "spring-rabbit-transaction-consumer-demo-queue";
    public static final String EXCHANGE = "spring-rabbit-transaction-consumer-demo-exchange";
    public static final String DEAD_LETTER_QUEUE = "spring-rabbit-transaction-consumer-demo-dead-latter-queue";
    public static final String DEAD_LETTER_EXCHANGE = "spring-rabbit-transaction-consumer-demo-latter-exchange";
    public static final String ROLLBACK_QUEUE = "spring-rabbit-transaction-consumer-demo-rollback-queue";
    public static final String ROLLBACK_EXCHANGE = "spring-rabbit-transaction-consumer-demo-rollback-exchange";
}

1.6. 配置交换机和队列

@Bean
public Queue queue() {
    return QueueBuilder.durable(QUEUE)
            .deadLetterExchange(DEAD_LETTER_EXCHANGE)
            .build();
}

@Bean
public FanoutExchange exchange() {
    return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(exchange());
}

@Bean
public FanoutExchange deadLetterExchange() {
    return ExchangeBuilder.fanoutExchange(DEAD_LETTER_EXCHANGE).durable(true).build();
}

@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable(DEAD_LETTER_QUEUE)
            .build();
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

@Bean
public FanoutExchange rollbackExchange() {
    return ExchangeBuilder.fanoutExchange(ROLLBACK_EXCHANGE).durable(true).build();
}

@Bean
public Queue rollbackQueue() {
    return QueueBuilder.durable(ROLLBACK_QUEUE)
            .build();
}

@Bean
public Binding rollbackBinding() {
    return BindingBuilder.bind(rollbackQueue()).to(rollbackExchange());
}

1.7. 定义 RabbitMQ 消息事务管理器

@Bean(name = "rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

1.8. 配置 SimpleMessageListenerContainer

@Bean
public ContainerCustomizer<SimpleMessageListenerContainer> simpleMessageListenerContainerCustomizer(RabbitTransactionManager rabbitTransactionManager) {
    return container -> {
        //Channel 开启事务
        // 由于消息事务只适用于发布(publish)和确认(ack)
        // 在消费端 Spring 默认会自动 ack
        // 因此,只有消费消息的情况下,并不需要开启事务
        // 只有在消费消息的同时还发布消息出去,才需要配置开启消息事务
        container.setChannelTransacted(true);
        //配置事务管理器
        container.setTransactionManager(rabbitTransactionManager);
        //设置拒绝消息的行为
        //值为 true 时,将抛出 AmqpRejectAndDontRequeueException 异常并重新消费该消息
        //值为 false 时,将消息转发到死信队列中
        container.setDefaultRequeueRejected(false);
    };
}

1.9. 定义数据库事务管理器

@Bean(name = "dataSourceTransactionManager")
@Primary
DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
    DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager(dataSource);
    transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(dataSourceTransactionManager));
    return dataSourceTransactionManager;
}

1.10. 测试

@Component
@Slf4j
public class SpringRabbitTransactionConsumerDemo implements ApplicationRunner {

    @Resource
    private JdbcTemplate jdbcTemplate;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        rabbitTemplate.convertAndSend(EXCHANGE, null, "Jason");
    }

    //Spring 数据库事务,指定事务管理器为 DataSourceTransactionManager
    @Transactional(rollbackFor = Throwable.class, transactionManager = "dataSourceTransactionManager")
    @RabbitListener(queues = {QUEUE})
    public void listen(Channel channel, Message<String> message) throws Throwable {
        //由于 Channel 已配置为开启事务,因此这里发送出去的消息将会在出现异常时回滚
        channel.basicPublish(ROLLBACK_EXCHANGE, "", null, "rollback".getBytes());
        //往数据库表插入两条主键 id 一样的数据,引起主键 id 重复异常
        jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, message.getPayload()));
        jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, message.getPayload()));
    }
}

1.11. 参考资料

标签:事务,--,Spring,数据库,Boot,RabbitMQ,消息,consumer,public
From: https://www.cnblogs.com/jason207010/p/18456412

相关文章

  • 深入探索Spring AI:源码分析流式回答
    在上一章节中,我们深入分析了SpringAI的阻塞式请求与响应机制,并探讨了如何增强其记忆能力。今天,我们将重点讲解流式响应的概念与实现。毕竟,AI的流式回答功能与其交互体验密切相关,是提升用户满意度的重要组成部分。基本用法基本用法非常简单,只需增加一个stream方法即可实现所需......
  • 模拟一个微服务架构项目来学习包括Nacos、EMQX、GateWay、RabbitMQ、Canal、Mybatis-P
    前言介绍下最近做的项目:为什么做这个项目?项目的核心用户目标是谁?面向新能源电车用户给目标用户提供了什么价值?方便快捷充电服务团队的作用?需求分析,概要设计,详细设计,开发,测试,部署,上线我的作用?1-2两个核心业务详细设计(业务流程,接口入参,接口出参,表结......
  • SpringBoot增删该查
    SpringBoot+Mybatis增删该查()1、xml基础配置<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation......
  • 基于SpringBoot + Vue的毕业设计选题系统的设计与实现 (角色:学生、教师、管理员)
    文章目录前言一、详细操作演示视频二、具体实现截图三、技术栈1.前端-Vue.js2.后端-SpringBoot3.数据库-MySQL4.系统架构-B/S四、系统测试1.系统测试概述2.系统功能测试3.系统测试结论五、项目代码参考六、数据库代码参考七、项目论文示例结语前言......
  • 基于JAVA+SpringBoot+Vue+协同过滤算法+爬虫的前后端分离的租房系统
    ✌全网粉丝20W+,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌......
  • Window10 系统 RabbitMQ 的简单使用
    启动RabbitMQ服务进入到rabbitmq的安装bin目录netstartRabbitMQ或者在rabbitmq的sbin目录下,双击执行rabbitmq-server.bat文件。停止RabbitMQ服务netstopRabbitMQ查看服务状态rabbitmqctlstatus页面访问RabbitMQ初始的登录网址和登陆密码http://......
  • rabbitmq 发送端和消费端 dotnet
    #RabbitMQ发送端usingSystem.Text;usingRabbitMQ.Client;varfactory=newConnectionFactory();factory.HostName="127.0.0.1";//RabbitMQ服务器地址factory.DispatchConsumersAsync=true;stringexchangeName="exchange1";//交换机的名字string......
  • 基于SpringBoot+Vue+uniapp的在线远程考试系统的详细设计和实现(源码+lw+部署文档+讲
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • 基于SpringBoot+Vue+uniapp的毕业就业信息管理系统的详细设计和实现(源码+lw+部署文档
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • 深入理解 : Spring BeanFactory
    深入理解:SpringBeanFactory1概述:SpringBeanFactory1.1什么是BeanFactory:SpringBean容器的抽象接口BeanFactory是Spring框架(spring-beans模块)中的一个接口,它是一个工厂类,用来创建和管理Spring中的Bean对象。BeanFactory接口定义了Spring容器的基本规范和行为,它......