首页 > 其他分享 >RocketMQ事务消息原理

RocketMQ事务消息原理

时间:2023-05-21 19:04:37浏览次数:50  
标签:事务 MQ 消息 本地 import 原理 RocketMQ


一、RocketMQ事务消息原理:

        RocketMQ 在 4.3 版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事务消息解决的是生产端的消息发送与本地事务执行的原子性问题,这里的界限一定要清楚,是确保 MQ 生产端正确无误地将消息发送出来,没有多发,也不会漏发,至于发送后消费端有没有正常的消费消息,这种异常场景将由 MQ 消息消费失败重试机制来保证。

        RocketMQ 设计中的 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者;而 RocketMQ 本身提供的存储机制则为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

1、RocketMQ 实现事务一致性的原理:

RocketMQ事务消息原理_Server

备注:本地事务的回滚依赖于本地DB的ACID特性,订阅方的成功消费由 MQ Server 的失败重试机制进行保证。

(1)正常情况:在事务主动方服务正常,没有发生故障的情况下,发消息流程如下:

步骤①:MQ 发送方向 MQ Server 发送 half 消息,MQ Server 标记消息状态为 prepared,此时该消息 MQ 订阅方是无法消费到的

步骤②:MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经成功接收

步骤③:发送方开始执行本地事务逻辑

步骤④:发送方根据本地事务执行结果向 MQ Server 提交二次确认,commit 或 rollback

最终步骤:MQ Server 如果收到的是 commit 操作,则将半消息标记为可投递,MQ订阅方最终将收到该消息;若收到的是 rollback 操作则删除 half 半消息,订阅方将不会接受该消息;如果本地事务执行结果没响应或者超时,则 MQ Server 回查事务状态,具体见步骤(2)的异常情况说明。

(2)异常情况:在断网或者应用重启等异常情况下,图中的步骤④提交的二次确认超时未到达 MQ Server,此时的处理逻辑如下:

步骤⑤:MQ Server 对该消息进行消息回查

步骤⑥:发送方收到消息回查后,检查该消息的本地事务执行结果

步骤⑦:发送方根据检查得到的本地事务的最终状态再次提交二次确认。

最终步骤:MQ Server基于 commit/rollback 对消息进行投递或者删除

2、RocketMQ事务消息的实现流程:

        以 RocketMQ 4.5.2 版本为例,事务消息有专门的一个队列 RMQ_SYS_TRANS_HALF_TOPIC,所有的 prepare 消息都先往这里放,当消息收到 Commit 请求后,就将消息转移到真实的 Topic 队列里,供 Consumer 消费,同时向 RMQ_SYS_TRANS_OP_HALF_TOPIC 塞一条消息。简易流程图如下:

RocketMQ事务消息原理_spring_02

        当应用模块的事务因为中断或者其他的网络原因导致无法立即响应的,RocketMQ 会当做 UNKNOW 处理,对此 RocketMQ 事务消息提供了一个补救方案:定时回查事务消息的事务执行状态,简易流程图如下:

RocketMQ事务消息原理_java-rocketmq_03

二、Springboot 整合 RocketMQ 实现事务消息:

        该部分将从 "下订单 + 扣减库存"的案例来介绍 SpringBoot 如何整合 RocketMQ 并使用事务消息保证最终一致性。核心思路是订单服务(生产端)向 RocketMQ 发送库存扣减消息,再执行本地订单生成逻辑,最后交由 RocketMQ 通知 库存服务扣减库存并保证库存扣减消息被正常消费。

        案例中使用到的服务分为两个,订单服务和库存服务;涉及到的数据库表主要有三个,订单表、存储表,本地事务状态表。由于这几个表都比较简单,这里就不将对应的建表语句粘贴出来了,同样对应的 Pojo对象、Dao层、Service层 代码也不粘贴出来了,下面只展示核心逻辑的代码。

1、启动 RocketMQ 服务端:


2、在父pom文件中引入依赖:

<!-- rocketmq 事务消息 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>

3、生产端代码:

        生产端的核心逻辑就是向 RocketMQ 投递事务消息,并执行本地事务,最后将本地事务的执行结果通知到 RocketMQ

(1)RocketMQ相关配置:

在 application.properties 配置文件中添加以下配置:

rocketmq.name-server=172.28.190.101:9876
rocketmq.producer.group=order_shop

(2)创建一个监听类:

实现 TransactionListener 接口,在实现的数据库事务提交方法executeLocalTransaction() 和回查事务状态方法checkLocalTransaction() 中模拟结果

/**
 * rocketmq 事务消息回调类
 */
@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener
{
    @Resource
    private ShopOrderMapper shopOrderMapper;
 
    /**
     * half消息发送成功后回调此方法,执行本地事务
     *
     * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
     * @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
     * @return 返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
     */
    @Override
    @Transactional
    public LocalTransactionState executeLocalTransaction(Message message, Object arg)
    {
        log.info("开始执行本地事务:订单信息:" + new String(message.getBody()));
        String msgKey = new String(message.getBody());
        ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class);
 
        int saveResult;
        LocalTransactionState state;
        try
        {
            //修改为true时,模拟本地事务异常
            boolean imitateException = true;
            if(imitateException)
            {
                throw new RuntimeException("更新本地事务时抛出异常");
            }
 
            // 生成订单,本地事务的回滚依赖于DB的ACID特性,所以需要添加Transactional注解。当本地事务提交失败时,返回ROLLBACK_MESSAGE,则会回滚rocketMQ中的half message,保证分布式事务的一致性。
            saveResult = shopOrderMapper.insert(shopOrder);
            state = saveResult == 1 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
 
            // 更新本地事务并将事务号持久化,为后续的幂等做准备
            // TransactionDao.add(transactionId)
        }
        catch (Exception e)
        {
            log.error("本地事务执行异常,异常信息:", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
 
        //修改为true时,模拟本地事务超时,对于超时的消息,rocketmq会调用checkLocalTransaction方法回查本地事务执行状况
        boolean imitateTimeout = false;
        if(imitateTimeout)
        {
            state = LocalTransactionState.UNKNOW;
        }
 
        log.info("本地事务执行结果:msgKey=" + msgKey + ",execute state:" + state);
        return state;
    }
 
 
    /**
     * 回查本地事务接口
     *
     * @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态
     * @return 返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt)
    {
        log.info("调用回查本地事务接口:msgKey=" +  new String(messageExt.getBody()));
 
        String msgKey = new String(messageExt.getBody());
        ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class);
 
        // 备注:此处应使用唯一ID查询本地事务是否执行成功,唯一ID可以使用事务的transactionId。但为了验证方便,只查询DB的订单表是否存在对应的记录
        // TransactionDao.isExistTx(transactionId)
        List<ShopOrderPojo> list = shopOrderMapper.selectList(new QueryWrapper<ShopOrderPojo>()
                .eq("shop_id", shopOrder.getShopId())
                .eq("user_id", shopOrder.getUserId()));
 
        LocalTransactionState state = list.size() > 0 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
        log.info("调用回查本地事务接口的执行结果:" +  state);
 
        return state;
    }
}

        为了方便验证,上面 Demo 使用了两个 boolean 变量 imitateException、imitateTimeout 分别模拟了事务执行异常和超时的情况,只需要将布尔值设置为 true 即可。

(3)投递事务消息:

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
 
@Slf4j
@Service
public class ShopOrderServiceImpl extends ServiceImpl<ShopOrderMapper, ShopOrderPojo> implements ShopOrderService
{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private OrderTransactionListener orderTransactionListener;
 
    /**
     * 发送事务消息
     */
    @Override
    public boolean sendOrderRocketMqMsg(ShopOrderPojo shopOrderPojo)
    {
        String topic = "storage";
        String tag = "reduce";
 
        // 设置监听器,此处如果使用MQ其他版本,可能导致强转异常
        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(orderTransactionListener);
 
        //构建消息体
        String msg = JSONObject.toJSONString(shopOrderPojo);
        org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build();
        //发送事务消息,由消费者进行进行减少库存
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tag , message, null);
 
        log.info("Send transaction msg result: " + sendResult);
        return sendResult.getSendStatus() == SendStatus.SEND_OK;
    }
}

4、消费端代码:

        消费端的核心逻辑就是监听 MQ,接收消息;接收到消息之后扣减库存

(1)RocketMQ相关配置:

在 application.properties 配置文件中添加以下配置:

rocketmq.name-server=172.28.190.101:9876
rocketmq.consumer.group=order_shop

(2)消费监听类:

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
 
/**
 * 库存管理消费者类
 **/
@Component
@RocketMQMessageListener (consumerGroup = "order_storage", topic = "storage")
public class StorageConsumerListener implements RocketMQListener<String>
{
    @Resource
    private TStorageService tStorageService;
 
    /**
     * rocketMQ消费者
     */
    @Override
    public void onMessage(String message)
    {
        System.out.println("消费者开始消费:从MQ中获取的消息是:" + message);
        ShopOrderPojo shopOrder = JSONObject.parseObject(message, ShopOrderPojo.class);
 
        // 1、幂等校验,防止消息重复消费--此处省略相关的代码逻辑:
        // TransactionDao.isExistTx(transactionId)
 
        // 2、执行消息消费操作--减少商品库存:
        TStoragePojo shop = tStorageService.getById(shopOrder.getShopId());
        shop.setNum(shop.getNum() - 1);
        boolean updateResult = tStorageService.updateById(shop);
 
        // 3、添加事务操作记录--此次省略代码:
        // TransactionDao.add(transactionId)
 
        System.out.println("消费者完成消费:操作结果:" + updateResult);
    }
}

至此,一个完整的基于 RocketMQ 事务消息实现的分布式事务的最终一致性就完成了。

标签:事务,MQ,消息,本地,import,原理,RocketMQ
From: https://blog.51cto.com/u_15973676/6320008

相关文章

  • 对$nextTick的理解,及其实现原理
    1.对$nextTick的理解:VUE中数据变化后,是异步更新DOM的,如果想数据变化后,操作dom,这个时候获取到的是没有变化的值eg:<divclass="msg">{{msg}}</div>mounted(){this.msg='我是测试文字'console.log(document.querySelector('.msg'......
  • Android 热补丁之 Tinker 原理解析
    阅读本文大概需要1分钟。哈,一晃好几天没更文了,然后后台又一大堆读者给我留言,说没有我的文章,感觉生活都没有动力了,对于这样的读者,我只能说:你肯定单身吧?主要是上周五开始就出去旅游了,跟着老板一起出去嗨了一把。所以一直没更新,但是,我是不会忘记你们的,这不,刚回归就给大家带来一篇干......
  • 聊聊Mybatis的实现原理
    使用示例平时我们使用的一般是集成了Spring或是SpringBoot的Mybatis,封装了一层,看源码不直接;如下,看看原生的Mybatis使用示例示例解析通过代码可以清晰地看出,MyBatis的操作主要分为两大阶段:第一阶段:MyBatis初始化阶段。该阶段用来完成MyBatis运行环境的准备工作,读取配置并初......
  • 跨域JSONP原理及调用具体示例
    上篇博客介绍了同源策略和跨域访问概念,其中提到跨域常用的基本方式:JSONP和CORS。 那这篇博客就介绍JSONP方式。  JSONP原理  在同源策略下,在某个服务器下的页面是无法获取到该服务器以外的数据的,但img、iframe、script等标签是个例外,这......
  • Geolocation背后的基本原理
    LBS在移动设备和浏览器里已应用地非常广泛,基于API能很轻松地实现一些和地址位置有关的应用,但一直对这些API背后是如何获取使用者的位置信息却不得而知,今天花了时间粗浅地研究了一下: 得到位置信息主要通过两种途径:GPS和WIFI1.GPSGPS基本原理是测量出已知位置的卫星到用户接收机之......
  • synchronized原理
    `synchronized`是Java中用来实现线程同步的关键字,它的主要作用是对代码块或方法进行加锁,保证在同一时刻只有一个线程能够执行被加锁的代码块或方法,从而避免多个线程同时访问共享资源导致的数据不一致问题。`synchronized`的实现原理是基于Java对象头中的monitor(监视器)实......
  • RocketMQ
    RocketMQ背景是阿里巴巴,经历双11考验,Java语言编写,非常好完整体系1、支持事务消息(实现解决分页式事务的问题)2、支持高并发顺序消息处理(采用内存队列+多线程处理)3、消费者支持tag过滤,减少我们带宽传输RocketMQ关键核心名称:NameServer:存放生产者、消费者、topic信息。去中心......
  • python中的装饰器原理和作用
    装饰器的作用就是用一个新函数封装旧函数(是旧函数代码不变的情况下增加功能)然后会返回一个新函数,新函数就叫做装饰器,一般为了简化装饰器会用语法糖@新函数来简化例子:这是一段代码,但功能太少,要对这个进行增强,但又不能改变代码。defhello():return"helloworld!"现在我......
  • Revit二次开发实战03(事务Transaction)
    Revit二次开发实战事务必须首先要启动Start,操作完成后提交事务Commit,如果执行异常,则要执行回滚操作RollBack;可以通过GetStatus获取事务的当前状态,根据事务状态决定程序的走向;事务Transaction是非托管对象,必须手动释放Dispose,或者放到using代码块中,让编译器自动释放;如果要对文......
  • 容器目录挂载原理
    前言就我目前的对容器的了解,使用namespace技术实现隔离,使用cgroups技术实现资源限制.但是具体是如何实现却从未深究过.闲来无事,挑其中的MountNamespace来康康,容器是如何实现目录隔离的.目录隔离在耗子叔的这篇文章中对此技术进行了介绍.在c函数库中,可通过如下方......