事务消息介绍及流程图
RocketMQ在4.3.0版中已经支持分布式事务消息,是通过二阶段提交加事务回查来保证本地事务和发送消息的一致性。事务消息交互流程如下图所示。
-
生产者将消息发送至Apache RocketMQ服务端。
-
Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
-
生产者开始执行本地事务逻辑。
-
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
-
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
-
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
-
-
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
-
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
-
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。
RocketMQ4.x版本的事务消息
RocketMQ4.x版本是通过TransactionMQProducer类的sendMessageInTransaction方法发送事务消息,并设置TransactionListener。
public interface TransactionListener {
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
executeLocalTransaction 是半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息
LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。checkLocalTransaction回查本地事务状态决定是提交还是回滚。
checkLocalTransaction是由于二次确认消息没有收到,Broker端回查事务状态的方法。回查规则:本地事务执行完成后,若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。
修改生产者MqProducer,增加OldVersionTrsactionMqProducer
@Slf4j
@Component
public class OldVersionTrsactionMqProducer implements InitializingBean, DisposableBean {
private TransactionMQProducer transactionMQProducer;
@Value("${rocketmq.namesrv}")
private String namesrv;
public SendResult sendTransactionMsg(Message msg, TransactionListener listener) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
transactionMQProducer.setTransactionListener(listener);
return transactionMQProducer.sendMessageInTransaction(msg, null);
}
@Override
public void destroy() throws Exception {
if (transactionMQProducer != null) {
transactionMQProducer.shutdown();
}
}
@Override
public void afterPropertiesSet() throws Exception {
transactionMQProducer = new TransactionMQProducer("my-transaction-producer");
transactionMQProducer.setNamesrvAddr(namesrv);
transactionMQProducer.start();
}
}
增加TransactionController:
@Slf4j
@RestController
public class TransactionController {
@Autowired
private OldVersionTrsactionMqProducer producer;
@RequestMapping("/sendTranMsg")
public List<SendResult> sendTranMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
List<SendResult> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message msg =
new Message("MyTopic", "*", String.valueOf(i),
("Hello RocketMQ,Transaction message " + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.sendTransactionMsg(msg, new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("executeLocalTransaction。。。执行本地事务");
String keys = message.getKeys();
int i1 = Integer.parseInt(keys);
if (i1 > 5) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
log.info("checkLocalTransaction。。。执行事务回查");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
list.add(sendResult);
System.out.printf("%s%n", sendResult);
}
return list;
}
}
executeLocalTransaction方法中判断消息的key是否大于5,若是则回滚,否则提交。
接下来修改消费者Mq-Consumer启用OldVersionConsumer类来消费。重启生产者,消费者后,调用http://localhost:8001/sendTranMsg发送事务消息。查看消费者控制台:
发现只有6条数据。
RocketMQ5.0的事务消息
事务回查是在构造生产者时,用于检查确认异常半事务的中间状态。:
ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
producer = clientServiceProvider.newProducerBuilder()
.setClientConfiguration(new ClientConfigurationBuilder().setEndpoints(proxy).build())
.setTransactionChecker(new TransactionChecker() {
@Override
public TransactionResolution check(MessageView messageView) {
log.info("check。。。。执行事务检查");
return TransactionResolution.COMMIT;
}
}).build();
TransactionChecker用于事务回查。TransactionChecker的check方法返回值TransactionResolution含义与LocalTransactionState相同。之后调用生产者开启事务:
Transaction transaction = null;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
continue;
}
调用Transaction 进行提交或回滚。
首先新建存储事务消息类型的topic:
mqadmin.cmd updatetopic -n localhost:9876 -t TranTopic -c DefaultCluster -a +message.type=TRANSACTION
-n localhost:9876指定namesrv地址,-t TranTopic指定topic名字是TranTopic,-c DefaultCluster指定集群是DefaultCluster,-a +message.type=TRANSACTION指定topic存储事务消息。
修改生产者MqProducer增加TranController:
@Slf4j
@RestController
public class TranController implements InitializingBean, DisposableBean {
private Producer producer;
@Value("${rocketmq.proxy}")
private String proxy;
@Override
public void destroy() throws Exception {
if (producer != null) {
producer.close();
}
}
@Override
public void afterPropertiesSet() throws Exception {
ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();
producer = clientServiceProvider.newProducerBuilder()
.setClientConfiguration(new ClientConfigurationBuilder().setEndpoints(proxy).build())
.setTransactionChecker(new TransactionChecker() {
@Override
public TransactionResolution check(MessageView messageView) {
log.info("check。。。。执行事务检查");
return TransactionResolution.COMMIT;
}
}).build();
}
@RequestMapping("/sengV5TranMsg")
public List<SendReceipt> sengV5TranMsg() throws ClientException {
MessageBuilder messageBuilder = new MessageBuilderImpl();
List<SendReceipt> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Transaction transaction = null;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
continue;
}
messageBuilder.setTopic("TranTopic")
.setBody(("Transaction Message " + i).getBytes(StandardCharsets.UTF_8));
SendReceipt send = producer.send(messageBuilder.build(), transaction);
list.add(send);
// 执行本地事务
log.info("执行本地事务");
if (i > 7) {
// 回滚
transaction.rollback();
} else {
// 提交
transaction.commit();
}
}
return list;
}
}
修改消费者订阅TranTopic topic,之后重启生产者和消费者。访问http://localhost:8001/sengV5TranMsg发送事务消息。查看消费者控制台:
RocketMQ事务消息原理
事务消息相对普通消息最大的特点就是半事务消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是半事务消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。
那如何实现消息回查?Broker会启动一个消息回查的定时任务,定时从事务消息queue中读取所有待反查的消息。针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求。然后根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。最后,提交或者回滚事务,将半消息标记为已处理状态【将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)】。 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中; 如果是回滚事务,则什么都不做。
rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
使用限制
- 消息类型一致性(RocketMQ5.0)
事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。
- 消费事务性
Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。
- 中间状态可见性
Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。
- 事务超时机制
Apache RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。
标签:事务,生产者,回滚,回查,消息,public From: https://www.cnblogs.com/shigongp/p/17556058.html