1 引言
RocketMQ的事务消息设计是为了解决分布式系统中数据一致性的问题。在分布式系统中,由于数据可能分布在不同的服务或节点上,因此需要一种机制来确保数据的最终一致性。事务消息通过引入本地事务和消息状态的关联,确保了消息的发送与本地事务的执行结果紧密相关,从而避免了数据不一致的问题。
2 事务消息步骤
- 生产者将半事务消息发送至 RocketMQ Broker。
- RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
4.1 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
4.2 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。 - 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
5.1 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
5.2 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。 - 注意:服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。
3 项目结构
本章内容以Spring Cloud Alibaba 快速学习之 RocketMQ中的项目为基础稍作修改。下面列出修改的文件。
3.1 项目rocketmq-producer
- application.properties
这里添加了事务相关的配置
#spring应用程序监听的端口号
server.port=8080
#spring应用程序的名称
spring.application.name=rocketmq-producer
#spring当前激活的配置文件
spring.profiles.active=dev
#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-out-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-out-0.content-type=application/json
#rocketmq 事务消息配置
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.producerType=Trans
#rocketmq 事务消息分组
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 事务消息监听
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.transactionListener=RocketMQTransactionListener
- rocketmq-producer/src/main/java/org/example/controller/TestController.java
package org.example.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("/send")
public String send() {
for (int i = 0; i < 5; i++) {
Map<String, String> map = new HashMap<>();
String id = i + "";
map.put("id", id);
map.put("msg", "测试消息");
MessageBuilder<Map<String, String>> builder = MessageBuilder.withPayload(map);
streamBridge.send("testchannel-out-0", builder.build());
}
return "消息发送成功!";
}
}
- rocketmq-producer/src/main/java/org/example/conf/RocketMQTransactionListener.java
这里添加了事务监听器,注意@Component名称与配置文件中对应
package org.example.conf;
import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
@Component("RocketMQTransactionListener")
public class RocketMQTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String msg = new String(message.getBody());
System.out.println("execute:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
if (jsonObject.getIntValue("id") == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
if (jsonObject.getIntValue("id") == 1) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("check:" + new String(messageExt.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3.2 项目rocketmq-consumer-b
- application.properties
这里注释了接收广播消息
#spring应用程序监听的端口号
server.port=8082
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev
#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING
#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json
3.3 项目rocketmq-consumer-a
- application.properties
这里也是注释了接收广播消息
#spring应用程序监听的端口号
server.port=8081
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev
#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING
#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json
4 测试
4.1 同时启动三个子项目
4.2 发送消息
- 打开浏览器访问:http://localhost:8080/test/send,可以看到5条消息都进入了executeLocalTransaction方法。
- 其中消息id为0二次确认结果为Commit,被consumerA正常接收,这与监听器中代码功能一致。
- 其中消息id为1二次确认结果为Unknown,触发回查,在回查中正常Commit,被consumerB正常接收,这与监听器中代码功能一致。
- 其他消息二次确认结果为Rollback,服务端将回滚事务,不会将半事务消息投递给消费者,这与监听器中代码功能一致。