系统环境
Java: openjdk version“1.8.0_382”
rocketmq-all-5.1.4
整体机制
使用rocketmq的事务消息,分两个阶段保证分布式事务的最终一致性;
- 一阶段:消息生产者(分布式事务发起方)发送半消息(消费者不接收半消息),之后完成本地事务的执行,根据执行结果选择将半消息投递给消费者或撤回半消息;
- 二阶段:消息消费者(分布式事务的分支应用)接收到半消息后,完成分支事务的执行,该模式下分支事务要求最终一致,不支持回滚。
工作机制
事务消息交互流程如下图所示:
- 生产者将消息发送至Apache RocketMQ服务端。
- Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
使用示例
生产者
添加依赖rocketMQ依赖
implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'
添加配置
application.yml中添加如下配置:rocketmq.name-server为rocketMQ的NameServer注册服务
rocketmq:
#调整为NameServer注册服务
name-server: 127.0.0.1:9876
producer:
#生产者分组,非必填,集群使用
group-name: tradeTxProducer
Spring容器管理RocketMQTemplate和TransactionMQProducer
- TransactionMQProducer:生产者,建立与RocketMQ服务端的连接
- RocketMQTemplate:RocketMQ连接组件,管理“生产者”,提供统一的消息发送方法
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group-name}")
private String producerGroupName;
/**
* 事务消息生产者
* @return
*/
@Bean
public TransactionMQProducer transactionMQProducer() {
TransactionMQProducer producer = new TransactionMQProducer(producerGroupName);
producer.setNamesrvAddr(nameServer);
return producer;
}
/**
* RocketMQ连接组件
* @param transactionMQProducer
* @return
*/
@Bean
public RocketMQTemplate rocketTransactionMQTemplate(TransactionMQProducer transactionMQProducer) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(transactionMQProducer);
return rocketMQTemplate;
}
}
业务模块中注入RocketMQTemplate的Bean对象,使用RocketMQTemplate发送事务消息
//生成唯一交易单号
String reqNo = TransactionIdGenerator.generateReqTransactionId();
// 创建 RocketMQ 的 Message 实例
Message<String> message = MessageBuilder
.withPayload(JSON.toJSONString(fundParam))
.setHeader(RocketMQHeaders.KEYS, TradeLocalTransactionListener.KEY_PREFIX + reqNo)
.setHeader(TradeLocalTransactionListener.HEAD_KEY_REQ_NO, reqNo)
.build();
// 发送事务消息
TransactionSendResult sendResult = rocketTransactionMQTemplate.sendMessageInTransaction("tradeTransactionTopic:tradeBuyTag", message, param);
if (sendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
logger.info("购买交易额度抢占事务消息已经发送");
return;
}
注意:此阶段发送的为半消息,消费者不可见
添加本地事务消息监听器,完成本地事务的执行以及校验
//RocketMQTransactionListener注解中的rocketMQTemplateBeanName要与定义的RocketMQTemplate名称保持一致
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketTransactionMQTemplate")
public class TradeLocalTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private TradeMapper tradeMapper;
private final Logger logger = LoggerFactory.getLogger(TradeLocalTransactionListener.class);
/**
* 交易单号变量
*/
public static final String HEAD_KEY_REQ_NO = "HEAD_KEY_REQ_NO";
/**
* 消息key前缀
*/
public static final String KEY_PREFIX ="tradeBuyKey-";
/**
* 执行本地事务,并返回执行结果,返回值含义如下:\n
* RocketMQLocalTransactionState.COMMIT-本地事务提交,半消息对消费者可见
* RocketMQLocalTransactionState.UNKNOWN-本地事务未知,需要重新校验本地事务
* RocketMQLocalTransactionState.ROLLBACK-本地事务回滚,半消息撤销
* @param message
* @param param
* @return
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object param) {
try{
TradeEntity entity = BeanConverter.dataConvert(param, TradeEntity.class);
String reqNo = (String) message.getHeaders().get(HEAD_KEY_REQ_NO);
entity.setReqNo(reqNo);
//插入交易信息
tradeMapper.insertTradeInfo(entity);
//模拟异常
if (entity.getReqShr().equals(BigDecimal.valueOf(2000000L))) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
logger.error("抢占基金份额时,交易服务执行异常!", e);
throw e;
}
}
/**
* 本地事务状态重校验 executeLocalTransaction()返回RocketMQLocalTransactionState.UNKNOWN时调用
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String reqNo = (String) message.getHeaders().get(HEAD_KEY_REQ_NO);
return tradeMapper.queryTradeInfoByReqNo(reqNo) == null ? RocketMQLocalTransactionState.ROLLBACK : RocketMQLocalTransactionState.COMMIT;
}
}
消费者
添加依赖rocketMQ依赖
同生产者
添加配置
application.yml中添加如下配置:
rocketmq:
name-server: 112.124.36.250:9876
#服务端开启访问授权时需要配置
consumer:
access-key: ubuntu_zly_123
secret-key: ubuntu_zly_123
添加消费者
使用@RocketMQMessageListener,topic指定订阅主体,consumerGroup指定消费组分组;实现RocketMQListener接口
@Component
@RocketMQMessageListener(
topic = "tradeTransactionTopic",
consumerGroup = "tradeTransactionGroup"
)
public class TradeTransactionListener implements RocketMQListener<String> {
private static final Logger logger = LoggerFactory.getLogger(TradeTransactionListener.class);
@Autowired
private FundManageService fundManageService;
@Override
public void onMessage(String msg) {
// 执行本地事务逻辑,返回事务状态
try {
logger.info("购买交易抢占基金份额消息消费开始:{}", msg);
fundManageService.custBuyFund(JSON.parseObject(msg, FundInfoDto.class));
logger.info("购买交易抢占基金份额消息消费结束");
} catch (Exception e) {
logger.error("交易消息消费失败!", e);
throw e;
}
}
}
遇到的问题
依赖包版本冲突
- 问题背景:
Spring-boot项目在引入rocketmq-spring-boot-starter:2.2.3包后,与原有com.alibaba:fastjson:1.2.73冲突,导致应用启动失败。 - 解决方案:
fastjson包升级为com.alibaba:fastjson:2.0.42后解决。
参考文献
RocketMQ事务消息官方描述文档
RocketMQ集成Spring-boot的官方示例代码