首页 > 其他分享 >Spring Boot集成RocketMQ实现分布式事务

Spring Boot集成RocketMQ实现分布式事务

时间:2024-06-05 17:31:58浏览次数:16  
标签:事务 Spring Boot 发送 rocketmq 消息 RocketMQ 分布式

        RocketMQ是由阿里巴巴集团开发的一款高性能、高可靠、分布式的开源消息中间件,它在2012年对外开源,并于2016年捐赠给Apache软件基金会,随后在2017年成为了Apache的顶级项目。RocketMQ的设计旨在满足互联网业务场景中的海量消息传递需求,尤其擅长处理高并发、大数据量以及实时计算场景。

主要特点和功能包括:

1. 分布式架构:

        RocketMQ采用了分布式部署架构,允许生产者、消费者和消息队列实例分布在不同节点上,从而实现水平扩展和高可用性。

2. 消息模型:

        支持发布/订阅(Pub/Sub)模式,生产者发送的消息可以被多个订阅该主题的消费者接收。

        支持点对点(P2P)模式,消息只能被一个消费者消费一次。

3. 消息类型:

        提供普通消息、事务消息、顺序消息、批量消息、定时消息、消息回溯等功能。

        事务消息确保分布式事务的一致性,顺序消息则能够保证消息在同一个队列内的严格顺序执行。

4. 性能与可靠性:

        高吞吐量和低延迟,适用于大规模分布式系统。

        支持持久化存储和可靠的消息投递,通过ack机制确保消息不丢失。

5. 客户端API:

        提供丰富的Java API,让开发者可以方便地进行消息的生产和消费。

        支持同步、异步和单向消息发送方式。

6. 网络通信:

        基于Netty框架构建高效网络通信层。

7. 运维管理:

         包含Name Server组件,提供服务发现和路由管理功能。

        提供易于使用的监控和管理工具,便于运维人员对消息集群进行管理和维护。

8. 生态集成:

        可以与Spring Cloud Alibaba等云原生生态体系紧密结合,简化微服务架构下的消息队列使用。

        RocketMQ作为一种成熟的企业级消息中间件,在大型分布式系统中扮演着关键角色,为了解耦系统组件、提高系统响应速度和稳定性、实现最终一致性等方面发挥着重要作用。


RocketMQ事务消息原理:

RocketMQ的分布式事务消息主要依赖于其特有的“半消息”(Half Message)机制来实现在分布式环境下的最终一致性事务。以下是其基本流程:


1. 第一阶段(Prepare/PreCommit Phase):+

        应用程序发起一个分布式事务操作,在这个过程中,首先执行本地事务(如数据库操作),然后通过RocketMQ的事务消息接口发送一条“半消息”。半消息不会立即对消费者可见,而是等待后续的确认指令。
        半消息发送完成后,RocketMQ会向生产者返回一个确认信号,但消息本身并不立刻投递给消费者。

2. 第二阶段(Commit/Rollback Phase):

        根据第一步本地事务的实际执行结果,应用程序需要决定是否提交或回滚这条半消息。

         如果本地事务执行成功,应用程序通知RocketMQ提交此条半消息,这时RocketMQ会将半消息转换成可消费的消息并投递给消费者。
        若本地事务执行失败,则应用程序通知RocketMQ回滚此条半消息,RocketMQ将会删除这条半消息,消费者永远无法看到这条消息。

3. 事务状态检查与自动回查:

        如果在第二阶段,由于网络等问题导致RocketMQ未收到明确的提交或回滚指令,RocketMQ服务端会定期对生产者进行回查,询问相关事务消息的状态,直到事务状态明确为止。
        生产者需要实现相应的逻辑来响应这些回查请求,确定事务状态并给出相应的反馈。

4. 最终一致性保障:

        通过上述机制,RocketMQ能够在分布式环境下实现最终一致性,即虽然不能实时保证每个节点的数据一致性,但在经过一段时间后,所有参与节点的数据会达成一致状态。

RocketMQ分布式事务的实现策略是将传统数据库事务的概念扩展到了消息队列中,利用半消息和两阶段提交思想,结合定时回查机制,使得在跨服务、跨系统的分布式环境中能够实现业务数据与消息传递的一致性,从而解决了微服务架构中的分布式事务难题。

在Spring Boot项目中集成RocketMQ并实现分布式事务的过程主要包括以下几个关键步骤:

1.依赖引入:

首先,需要在Spring Boot项目中引入RocketMQ及其支持事务消息的相关依赖,通常通过Maven或Gradle添加`rocketmq-spring-boot-starter`和`rocketmq-client`依赖。

 <!-- Maven -->
   <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-spring-boot-starter</artifactId>
       <version>最新稳定版本号</version>
   </dependency>

   <!-- 如果需要事务消息功能,还需要单独引入 -->
   <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>与starter版本保持一致</version>
   </dependency>

2. 配置RocketMQ:

在application.properties或application.yml文件中配置RocketMQ的服务地址(NameServer)以及生产者组名。

 properties
   rocketmq:
     name-server: xxx.xxx.xxx.xxx:9876 # RocketMQ NameServer地址
     producer:
       group: my-distributed-transaction-group # 生产者组名

3. 定义事务消息生产者:

        创建一个类,继承`org.apache.rocketmq.spring.core.RocketMQTemplate`或者使用`@RocketMQMessageListener`注解来创建一个具有事务处理能力的生产者。在需要发送事务消息的方法中,使用RocketMQ提供的事务消息API `executeTransaction` 方法。

 @Autowired
   private RocketMQTemplate rocketMQTemplate;

   @Transactional
   public void sendMessageInTransaction(Order order) {
       // (1) 发送半消息(prepare message)
       TransactionSendResult result = rocketMQTemplate.executeInTransaction(
               "transaction-topic", // 消息主题
               () -> { // 第一阶段:发送半消息前的操作
                   // 执行本地数据库操作,例如保存订单
                   orderService.save(order);
                   return order.getOrderNo(); // 返回用于生成消息内容的标识符
               },
               orderId -> { // 第二阶段:根据本地事务状态决定是否提交或回滚消息
                   Message msg = MessageBuilder.withPayload(orderId).build();
                   return new Message("transaction-topic", tags, msg); // 构建消息
               }
       );

       // 处理事务发送结果
       if (!result.isSuccess()) {
           // 处理发送失败的情况...
       }
   }
   

4. 处理事务消息确认:

        RocketMQ会根据你在第二阶段提供的回调函数返回的消息来判断事务状态。如果本地事务执行成功,则发送commit命令,否则发送rollback命令。RocketMQ服务器收到commit后才会将消息投递给消费者,收到rollback则会丢弃半消息。

5. 消费者端处理:

        消费者无需特殊处理事务消息,只需像处理普通消息一样订阅相应主题即可。RocketMQ会确保事务消息在被正确提交后才投递给消费者。

6. 异常处理及重试:

        考虑到网络波动、服务器故障等情况,RocketMQ提供了一定的重试机制来保证事务最终能达成一致。同时,应用端也需要有适当的异常处理机制和幂等设计,确保即使在异常情况下,整个分布式事务也能达到预期的结果。

        通过以上步骤,Spring Boot应用就能借助RocketMQ实现分布式事务了,其中的核心是利用RocketMQ事务消息的两阶段提交机制,确保消息和本地事务的一致性。

标签:事务,Spring,Boot,发送,rocketmq,消息,RocketMQ,分布式
From: https://blog.csdn.net/weixin_53391173/article/details/139387675

相关文章

  • 灵动微电子 MM32F5277 boot分区实现之Flash驱动移植(二)
    前言    上篇文章,我们移植了nr_micro_shell串口shell组件到MM32F5277上,在此基础上,我们继续移植NorFlash和EmbddedFlash的驱动,并编写串口命令进行测试!NorFlash驱动移植    我们先到灵动微的官网下载官方的SDK,贴个官网链接:灵动微电子SDK下载https://mind......
  • Spring家族中的消息通信解决方案
    相信大家对消息通信架构以及各种消息中间件应该都不陌生。在分布式系统的设计和开发过程中,消息通信是用于实现系统解耦、提高扩展性的一大技术体系。而业界关于如何实现消息通信系统也有很多解决方案和对应的开发框架。不知道你有没有发现,在我们每天都在使用到Spring框架中,实际......
  • spring入门aop和ioc
    目录spring分层架构表现层服务层(业务层)持久层spring核心ioc(控制反转)1)接下来是代码示例:2)ioc容器的使用过程3)ioc中的bean管理4)实例化bean的三种方式aop(面向切面开发)定义优势AOP底层原理AOP相关的术语AOP入门aop注解开发aop纯注解开发Di(依赖注入)1)属性的set方法注入值的方式2)构造......
  • springMvc 配置 UReport2
    参考:https://blog.csdn.net/qq_42207808/article/details/112258835 1.配置pom.xml引入目前最新得2.2.9版本<dependency><groupId>com.bstek.ureport</groupId><artifactId>ureport2-console</artifactId&......
  • 基于SpringBoot的秒杀系统源码数据库
    基于SpringBoot的秒杀系统源码数据库社会发展日新月异,用计算机应用实现数据管理功能已经算是很完善的了,但是随着移动互联网的到来,处理信息不再受制于地理位置的限制,处理信息及时高效,备受人们的喜爱。本次开发一套基于SpringBoot的秒杀系统,管理员功能有个人中心,用户管理,商品类......
  • 基于springboot的二手车交易系统源码数据库
    基于springboot的二手车交易系统源码数据库如今社会上各行各业,都喜欢用自己行业的专属软件工作,互联网发展到这个时候,人们已经发现离不开了互联网。新技术的产生,往往能解决一些老技术的弊端问题。因为传统二手车交易信息管理难度大,容错率低,管理人员处理数据费工费时,所以专门为......
  • 基于springboot的纺织品企业财务管理系统源码数据库
    基于springboot的纺织品企业财务管理系统源码数据库在如今社会上,关于信息上面的处理,没有任何一个企业或者个人会忽视,如何让信息急速传递,并且归档储存查询,采用之前的纸张记录模式已经不符合当前使用要求了。所以,对纺织品企业财务信息管理的提升,也为了对纺织品企业财务信息进行......
  • 一个基于 React + SpringBoot 的在线多功能问卷系统(附源码)
    简介:一个基于React+SpringBoot的在线多功能问卷系统前端技术栈:React、React-Router、Webpack、Antd、Zustand、Echarts、DnDKit后端技术栈:SpringBoot、MySQL、MyBatisPlus、Redis项目源码下载链接: https://pan.quark.cn/s/2e32786e0c61部分页面静态预览: 主要前......
  • 基于springboot实现疫情信息管理系统项目【项目源码+论文说明】计算机毕业设计
    基于springboot实现疫情信息管理系统演示摘要近年来,信息化管理行业的不断兴起,使得人们的日常生活越来越离不开计算机和互联网技术。首先,根据收集到的用户需求分析,对设计系统有一个初步的认识与了解,确定疫情信息管理系统的总体功能模块。然后,详细设计系统的主要功能模块,通......
  • 128springboot汽车租赁管理系统租车订单还车汽车资讯论坛管理(源码+文档+PPT+运行视频+
    项目技术:springboot+Maven+Vue等等组成,B/S模式+Maven管理等等。环境需要1.运行环境:最好是javajdk1.8,我们在这个平台上运行的。其他版本理论上也可以。2.IDE环境:IDEA,Eclipse,Myeclipse都可以。推荐IDEA;3.tomcat环境:Tomcat7.x,8.x,9.x版本均可4.硬件环境:windows......