首页 > 其他分享 >Spring Boot 整合 RocketMQ 之事务消息

Spring Boot 整合 RocketMQ 之事务消息

时间:2024-10-20 19:48:41浏览次数:8  
标签:事务 Spring Boot 服务端 消息 本地 import RocketMQ

前言:

上一篇我们分享了 RocketMQ 完成顺序消息发送的案例,本篇我们来分享一下 RocketMQ 事务消息的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

Spring Boot 整合 RocketMQ 之顺序消息

事务消息的使用场景

分布式事务的诉求

分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。

在这里插入图片描述

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

传统XA事务方案:性能不足

为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。

基于普通消息方案:一致性保障困难

将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。

在这里插入图片描述

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:

  • 消息发送成功,订单没有执行成功,需要回滚整个事务。
  • 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。
  • 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。

基于 RocketMQ分布式事务消息:支持最终一致性

上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。

而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

在这里插入图片描述

RocketMQ事务消息的方案,具备高性能、可扩展、业务开发简单的优势。

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如图所示:

在这里插入图片描述

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

在这里插入图片描述

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

事务消息生产者代码

事务消息使用 rocketMQTemplate的sendMessageInTransaction 方法发送,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等,生产者代码的核心逻辑是向 RocketMQ 发送事务消息,并执行本地事务,最后将本地事务的执行结果通知到 RocketMQ,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等。

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class TransactionMessageProducer {


    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/10/12 16:58
     * @description 发送事务消息
     */
    public void sendMessageTransaction(String message) {
        //发送事务消息采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
        TransactionSendResult transactionSendResult = rocketMqTemplate.sendMessageInTransaction("transaction-topic",
                MessageBuilder.withPayload(message).build(),
                null);
        //消息发送状态
        SendStatus sendStatus = transactionSendResult.getSendStatus();
        //本地事务状态
        String localState = transactionSendResult.getLocalTransactionState().name();
        log.info("发送状态:{},本地事务执行状态:{}", sendStatus, localState);
    }

}

事务消息消费者代码

事务消息的消费者代码和普通消息的消费者代码一样,并没有什么特殊之处。

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-topic")
public class TransactionMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("事务消息消费成功,消息内容:{}", message);
    }
}

事务消息生产者监听器

事务消息中除了有消息生产者和消息消费者之外,还需要一个事务消息监听器,来监听本地事务的执行状态,自定义事务监听器,需要实现 RocketMQLocalTransactionListener 接口,还需要在类上加上 @RocketMQTransactionListener 注解,在方法中重写自己的业务逻辑。

package com.order.service.rocketmq.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

import java.nio.charset.StandardCharsets;

/**
 * @ClassName: TransactionMsgListener
 * @Author: Author
 * @Date: 2024/10/12 17:10
 * @Description: 事务消息监听器 监听本地事务执行的状态和检查本地事务状态
 */
@Slf4j
@RocketMQTransactionListener
public class TransactionMessageListener implements RocketMQLocalTransactionListener {

    /**
     * @param message:
     * @param obj:
     * @return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState
     * @date 2024/10/12 17:12
     * @description 执行本地事务 消息发送成功时候执行
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) {
        //开启本地事务
        //执行业务逻辑 入库
        //提交或者回滚本地事务
        //根据本地业务结果 来确认消息是否发送
        int result = 3;
        //消息
        String messageStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
        switch (result) {
            case 1:
                log.info("本地业务执行成功,消息正常发送,消息内容:{}", messageStr);
                //返回COMMIT状态的消息会立即被消费者消费到
                return RocketMQLocalTransactionState.COMMIT;
            case 2:
                //返回 UNKNOW 状态的消息会等待 Broker 进行需要进行回查正常发送,消息内容:{}", messageStr);
                log.info("本地业务执行结果未知,消息可能需要回滚,消息内容:{}", messageStr);
                return RocketMQLocalTransactionState.UNKNOWN;
            default:
                //业务执行失败 消息回滚
                log.error("本地业务执行失败,消息丢弃,消息内容:{}", messageStr);
                //返回 ROLLBACK 状态的消息会被丢弃
                return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /*
    //供参考
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) {
        //执行本地事务
        try {
            String orderId = new String((byte[]) message.getPayload());
            orderService.updateOrderState(orderId,"支付");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            //异常就回滚事务消息
            log.error("本地事务异常,消息需要回滚,消息类容:{}",message);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }*/

    /**
     * @param message:
     * @return org.apache.rocketmq.spring.core.RocketMQLocalTransactionState
     * @date 2024/10/12 17:13
     * @description 检查本地事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //检查本地事务状态 RocketMQ 回查
        //消息
        String messageStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
        //这里可以使用消息中的信息 例如订单 ID 来回查订单状态 来判断本地事务的状态 决定是回滚还是提交
        log.info("事务消息状态回查,事务消息内容:{}", messageStr);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

事务消息触发代码

@GetMapping("/send-transaction")
public String sendMessageTransaction(@RequestParam String message){
	transactionMessageProducer.sendMessageTransaction(message);
	return "success";
}

事务消息测试验证

事务消息正常情况验证

2024-10-14 17:53:56.215  INFO 24736 --- [nio-8086-exec-2] c.o.s.r.l.TransactionMessageListener     : 本地业务执行成功,消息正常发送,消息内容:send-transaction
2024-10-14 17:53:56.216  INFO 24736 --- [nio-8086-exec-2] c.o.s.r.p.TransactionMessageProducer     : 发送状态:SEND_OK,本地事务执行状态:COMMIT_MESSAGE
2024-10-14 17:53:56.223  INFO 24736 --- [MessageThread_1] c.o.s.r.c.TransactionMessageConsumer     : 事务消息消费成功,消息内容:send-transaction

结果符合预期。

事务消息 UNKNOW 情况验证

2024-10-14 17:57:18.750  INFO 13852 --- [nio-8086-exec-2] c.o.s.r.l.TransactionMessageListener     : 本地业务执行结果未知,消息可能需要回滚,消息内容:send-transaction
2024-10-14 17:57:18.752  INFO 13852 --- [nio-8086-exec-2] c.o.s.r.p.TransactionMessageProducer     : 发送状态:SEND_OK,本地事务执行状态:UNKNOW
2024-10-14 17:58:02.295  INFO 13852 --- [pool-2-thread-1] c.o.s.r.l.TransactionMessageListener     : 事务消息状态回查,事务消息内容:send-transaction
2024-10-14 17:58:02.302  INFO 13852 --- [MessageThread_1] c.o.s.r.c.TransactionMessageConsumer     : 事务消息消费成功,消息内容:send-transaction

结果符合预期。

事务消息失败情况验证

2024-10-14 17:59:18.589 ERROR 10064 --- [nio-8086-exec-2] c.o.s.r.l.TransactionMessageListener     : 本地业务执行失败,消息丢弃,消息内容:send-transaction
2024-10-14 17:59:18.590  INFO 10064 --- [nio-8086-exec-2] c.o.s.r.p.TransactionMessageProducer     : 发送状态:SEND_OK,本地事务执行状态:ROLLBACK_MESSAGE

结果符合预期。

总结:本篇简单分享了 RokcetMQ 事务消息的消息的使用,实际业务代码中使用只需加入自己的业务即可,希望可以帮助到有需要的小伙伴。

如有不正确的地方欢迎各位指出纠正。

标签:事务,Spring,Boot,服务端,消息,本地,import,RocketMQ
From: https://blog.csdn.net/weixin_42118323/article/details/142924216

相关文章

  • 【毕业设计】基于SpringBoot + Vue的工资信息管理系统
    一、项目背景随着信息技术的飞速发展,传统的工资管理方式已经无法满足现代企业对数据安全、效率和信息处理能力的需求。为了提高工资信息管理的效率和准确性,开发一套工资信息管理系统显得尤为重要。该系统通过信息化手段,能够有效管理员工工资、津贴、考勤记录等信息,为企业管理......
  • spring笔记
    @Slf4j@RestController@Validated1、Circularviewpath[register]:woulddispatchbacktothecurrenthandlerURL[/register]again.(循环视图路径)把@Controller改成@RestController (相当于@Controller和@ResponseBody的组合)2、@Slf4j注解使用,方便调试log.info......
  • springboot+vue奥迪汽车信贷系统 【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着全球汽车市场的蓬勃发展,汽车信贷作为一种重要的金融工具,极大地促进了汽车销售与消费。奥迪汽车作为国际知名的豪华汽车品牌,其市场占有率和消费者群体日益扩大。在竞争激烈的市场环境中,提供高效、便捷的汽车信贷服务成为奥迪汽车吸......
  • springboot+vue案款发放管理系统【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景在当今社会,随着法治建设的不断推进和法律服务的日益完善,各类司法案件数量急剧增长,案款管理成为了司法机关和相关机构面临的一大挑战。传统的案款管理方式往往依赖于人工操作和纸质记录,不仅效率低下,还容易出错,难以保证案款的准确、及时......
  • 【附源码】景区旅游网站系统(源码+数据库+论文+ppt一整套齐全),java开发springboot框架
    ......
  • 集成Spring Security详解
    集成SpringSecurity详解一、SpringSecurity简介SpringSecurity是一个功能强大且高度可定制的身份验证和访问控制框架,它专注于为Java应用程序提供全面的安全解决方案。作为Spring项目的一部分,SpringSecurity继承了Spring框架的灵活性和可扩展性,能够轻松地集成到任何Spr......
  • springboot+vue安卓实训教学管理系统【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着移动互联网技术的飞速发展,智能手机已成为人们日常生活中不可或缺的一部分。在教育领域,安卓平台因其广泛的用户基础和强大的应用开发能力,成为构建实训教学管理系统的理想选择。传统的实训教学管理方式往往依赖于纸质记录或PC端软件......
  • springboot+vue安卓旅游app的设计与实现【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着移动互联网技术的飞速发展,智能手机已成为人们日常生活中不可或缺的一部分。旅游业作为全球经济的重要组成部分,其数字化转型已成为必然趋势。近年来,安卓旅游APP作为连接旅游者与旅游资源的桥梁,凭借其便捷性、实时性和互动性,受到了......
  • springboot+vue安卓健康养生APP“YOUNG生” 后8【开题+程序+论文】
    系统程序文件列表开题报告内容研究背景随着现代生活节奏的加快,越来越多的年轻人开始关注自己的健康状况,追求更加科学、合理的养生方式。然而,由于工作繁忙、信息繁杂,很多人难以系统地学习和实践养生知识。在此背景下,开发一款面向年轻人的安卓健康养生APP“YOUNG生”显得尤为......
  • 微信小程序毕业设计-基于springboot+协同过滤推荐算法的成都美食分享系统设计和实现,基
    博主介绍:✌️码农一枚,专注于大学生项目实战开发、讲解和毕业......