首页 > 其他分享 >可靠消息服务事务(RocketMQ的分布式事务解决方案)

可靠消息服务事务(RocketMQ的分布式事务解决方案)

时间:2024-01-16 23:36:07浏览次数:31  
标签:RocketMQ 事务 String 可靠消息 生产者 rocketmq 服务端 消息

系统环境

Java: openjdk version“1.8.0_382”
rocketmq-all-5.1.4

整体机制

使用rocketmq的事务消息,分两个阶段保证分布式事务的最终一致性;

  • 一阶段:消息生产者(分布式事务发起方)发送半消息(消费者不接收半消息),之后完成本地事务的执行,根据执行结果选择将半消息投递给消费者或撤回半消息;
  • 二阶段:消息消费者(分布式事务的分支应用)接收到半消息后,完成分支事务的执行,该模式下分支事务要求最终一致,不支持回滚。

工作机制

事务消息交互流程如下图所示:

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

使用示例

生产者

添加依赖rocketMQ依赖

implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'

添加配置

application.yml中添加如下配置:rocketmq.name-server为rocketMQ的NameServer注册服务

rocketmq:
#调整为NameServer注册服务
  name-server: 127.0.0.1:9876
  producer:
#生产者分组,非必填,集群使用
    group-name: tradeTxProducer

Spring容器管理RocketMQTemplate和TransactionMQProducer

  • TransactionMQProducer:生产者,建立与RocketMQ服务端的连接
  • RocketMQTemplate:RocketMQ连接组件,管理“生产者”,提供统一的消息发送方法
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;
    @Value("${rocketmq.producer.group-name}")
    private String producerGroupName;

    /**
     * 事务消息生产者
     * @return
     */
    @Bean
    public TransactionMQProducer transactionMQProducer() {
        TransactionMQProducer producer = new TransactionMQProducer(producerGroupName);
        producer.setNamesrvAddr(nameServer);
        return producer;
    }

    /**
     * RocketMQ连接组件
     * @param transactionMQProducer
     * @return
     */
    @Bean
    public RocketMQTemplate rocketTransactionMQTemplate(TransactionMQProducer transactionMQProducer) {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        rocketMQTemplate.setProducer(transactionMQProducer);
        return rocketMQTemplate;
    }
}

业务模块中注入RocketMQTemplate的Bean对象,使用RocketMQTemplate发送事务消息

    //生成唯一交易单号
    String reqNo = TransactionIdGenerator.generateReqTransactionId();
    // 创建 RocketMQ 的 Message 实例
    Message<String> message = MessageBuilder
            .withPayload(JSON.toJSONString(fundParam))
            .setHeader(RocketMQHeaders.KEYS, TradeLocalTransactionListener.KEY_PREFIX + reqNo)
            .setHeader(TradeLocalTransactionListener.HEAD_KEY_REQ_NO, reqNo)
            .build();

    // 发送事务消息
    TransactionSendResult sendResult = rocketTransactionMQTemplate.sendMessageInTransaction("tradeTransactionTopic:tradeBuyTag", message, param);
    if (sendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
        logger.info("购买交易额度抢占事务消息已经发送");
        return;
    }

注意:此阶段发送的为半消息,消费者不可见

添加本地事务消息监听器,完成本地事务的执行以及校验

//RocketMQTransactionListener注解中的rocketMQTemplateBeanName要与定义的RocketMQTemplate名称保持一致
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketTransactionMQTemplate")
public class TradeLocalTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private TradeMapper tradeMapper;

    private final Logger logger = LoggerFactory.getLogger(TradeLocalTransactionListener.class);

    /**
     * 交易单号变量
     */
    public static final String HEAD_KEY_REQ_NO = "HEAD_KEY_REQ_NO";

    /**
     * 消息key前缀
     */
    public static final String KEY_PREFIX ="tradeBuyKey-";

    /**
     * 执行本地事务,并返回执行结果,返回值含义如下:\n
     * RocketMQLocalTransactionState.COMMIT-本地事务提交,半消息对消费者可见
     * RocketMQLocalTransactionState.UNKNOWN-本地事务未知,需要重新校验本地事务
     * RocketMQLocalTransactionState.ROLLBACK-本地事务回滚,半消息撤销
     * @param message
     * @param param
     * @return
     */
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object param) {
        try{
            TradeEntity entity = BeanConverter.dataConvert(param, TradeEntity.class);
            String reqNo = (String) message.getHeaders().get(HEAD_KEY_REQ_NO);
            entity.setReqNo(reqNo);
            //插入交易信息
            tradeMapper.insertTradeInfo(entity);
            //模拟异常
            if (entity.getReqShr().equals(BigDecimal.valueOf(2000000L))) {
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            logger.error("抢占基金份额时,交易服务执行异常!", e);
            throw e;
        }
    }

    /**
     * 本地事务状态重校验 executeLocalTransaction()返回RocketMQLocalTransactionState.UNKNOWN时调用
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String reqNo = (String) message.getHeaders().get(HEAD_KEY_REQ_NO);
        return tradeMapper.queryTradeInfoByReqNo(reqNo) == null ? RocketMQLocalTransactionState.ROLLBACK : RocketMQLocalTransactionState.COMMIT;
    }
}

消费者

添加依赖rocketMQ依赖

同生产者

添加配置

application.yml中添加如下配置:

rocketmq:
  name-server: 112.124.36.250:9876
#服务端开启访问授权时需要配置
  consumer:
    access-key: ubuntu_zly_123
    secret-key: ubuntu_zly_123

添加消费者

使用@RocketMQMessageListener,topic指定订阅主体,consumerGroup指定消费组分组;实现RocketMQListener接口

@Component
@RocketMQMessageListener(
    topic = "tradeTransactionTopic",
    consumerGroup = "tradeTransactionGroup"
)
public class TradeTransactionListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(TradeTransactionListener.class);
    @Autowired
    private FundManageService fundManageService;

    @Override
    public void onMessage(String msg) {
        // 执行本地事务逻辑,返回事务状态
        try {
            logger.info("购买交易抢占基金份额消息消费开始:{}", msg);
            fundManageService.custBuyFund(JSON.parseObject(msg, FundInfoDto.class));
            logger.info("购买交易抢占基金份额消息消费结束");
        } catch (Exception e) {
            logger.error("交易消息消费失败!", e);
            throw e;
        }
    }
}

遇到的问题

依赖包版本冲突

  • 问题背景:
    Spring-boot项目在引入rocketmq-spring-boot-starter:2.2.3包后,与原有com.alibaba:fastjson:1.2.73冲突,导致应用启动失败。
  • 解决方案:
    fastjson包升级为com.alibaba:fastjson:2.0.42后解决。

参考文献

RocketMQ事务消息官方描述文档
RocketMQ集成Spring-boot的官方示例代码

标签:RocketMQ,事务,String,可靠消息,生产者,rocketmq,服务端,消息
From: https://www.cnblogs.com/zly1015/p/17968077

相关文章

  • Spring事务传播机制解析
    确保数据一致性的关键在Java的Spring框架中,事务管理是保证应用数据一致性和可靠性的关键。Spring提供了灵活的事务传播机制,它定义了事务边界,以及在嵌套方法调用时如何处理事务。本文旨在深入探讨Spring的事务传播行为,帮助开发者更好地理解和运用这一重要特性。事务传播机制简介......
  • Spring事务传播机制
    1.Spring对事物的支持一般有两种方式编程式事务管理:通过 TransactionTemplate或者TransactionManager手动管理事务,实际应用中很少使用,这不是本文的重点,就不在这里赘述。声明式事务管理:使用场景最多,也是最推荐使用的方式,直接加上@Transactional注解即可。2.Transactional注......
  • 无涯教程-SQL - Transactions(事务)
    事务是将一个或多个更改打包在一起保存到数据库,事务对于确保数据完整性和处理数据库错误很重要。事务性质事务具有以下四个标准属性,通常以首字母缩写ACID表示。原子性-确保工作单元内的所有操作均成功完成,否则,事务将在失败时中止,并且所有先前的操作都将还原到以前的状态......
  • 事务Transactional失效的这10个场景,你一定得知道!
    @Transactional失效的场景都有哪些呢?如图所示!以上我们列举了10种场景,接下来我们针对不同的场景来具体的分析下。一、代理不生效导致1、同一个类中,方法内部调用事务失效同一个类中,addOrder()方法无事务,addOrder2()方法存在事务,addOrder()调用addOrder2()。我们通过外部方法调用addOr......
  • Spring学习记录之Spring对事务的支持
    Spring学习记录之Spring对事务的支持前言这篇文章是我第二次学习b站老杜的spring相关课程所进行的学习记录,算是对课程内容及笔记的二次整理,以自己的理解方式进行二次记录,其中理解可能存在错误,欢迎且接受各位大佬们的批评指正;关于本笔记,只是我对于相关知识遗忘时快速查阅了解使......
  • Spring 事务详解
    JavaGuide(gitee.io)1、Spring事务管理接口介绍Spring框架中,事务管理相关最重要的3个接口如下:**PlatformTransactionManager**:(平台)事务管理器,Spring事务策略的核心。**TransactionDefinition**:事务定义信息(事务隔离级别、传播行为、超时、只读、回滚规则)。**Trans......
  • oracle的事务
    Oracle数据库中的事务具有四个基本特性,也称为ACID特性,包括原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。原子性:事务被视为不可分割的最小操作单位,事务中的所有操作要么全部提交成功,要么全部回滚失败,不会出现部分执行的情况。一致性:事务必须使数据库从......
  • 揭秘Spring事务失效场景分析与解决方案
    在Spring框架中,事务管理是一个核心功能,然而有时候会遇到事务失效的情况,这可能导致数据一致性问题。本文将深入探讨一些Spring事务失效的常见场景,并提供详细的例子以及解决方案。1.跨方法调用问题场景:当一个事务方法内部调用另一个方法,而被调用的方法没有声明为@Transactional......
  • RocketMQ——快速入门
    RocketMQ架构设计消息队列实现了消息投放和消息消费间的解耦,实现了异步处理消息的功能。RocketMQ作为消息中间件,在其存储消息的结构上实现了消息均衡投放、消息容灾、高可用(Dledger主从切换)、自动故障转移特点。先引入以下几个概念:Broker:实际存储消息的节点,接收来自生产者......
  • 详解Java之Spring框架中事务管理的艺术
    第1章:引言大家好,我是小黑,咱们今天聊聊Spring框架中的事务管理。不管是开发小型应用还是大型企业级应用,事务管理都是个不可避免的话题。那么,为什么事务管理这么重要呢?假设在银行系统中转账时,钱从A账户扣了,但没到B账户,这种情况就是事务管理处理不当的后果。显然,我们需要一种机制来......