背景
在项目中,技术方案需要使用事务消息来保证最终一致性达到实现业务的目的。但在一个服务中有多个业务需要使用事务消息发送不同的消息类型到不同的Topic时,Rocket MQ的本地事务执行方法对这个Case的支持不是很好。
接下来先记录一下基础的MQ事务的使用方式,在记录一下针对上述场景中的代码实现
基础概念
RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。
可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。
引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。
发送流程
- Producer向broker发送半消息(Half Message)
其实半消息本质上就是往 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic发送了一条消息,保存在特殊队列中
-
Producer得到发送结果响应
- SEND_OK
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
-
根据发送结果执行本地事务,如果失败的话半消息对Consumer不可见,本地事务不执行
-
本地事务执行完成后返回状态
-
Commit:本地事务执行成功,消息对Consumer可见,Consumer可以消费消息
但不保证Consumer一定消费成功,但这一块已经不属于事务消息的范畴了,不做额外处理的话,利用MQ的ack机制,保证最终一致性,注意Consumer要保证幂等性
-
Rollback:本地事务回滚,消息对Consumer不可见,Consumer无法消费
-
Unknown:如果本地事务返回Unknown状态,或者MQ迟迟收不到Producer的返回结果,那MQ会发起回查回调Producer的检查本地事务方法,重新返回状态
这个属于补偿机制,默认回查15次
-
基础使用
- Maven引用
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-version}</version>
</dependency>
- Producer
- Send Transaction Message
@Component public class MessageProducer { @Resource private RocketMQTemplate rocketMQTemplate; public void sendTxMessage() { Message<BusinessMessage> message = MessageBuilder .withPayload(new BusinessMessage("Msg")) .build(); TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("tx_topic",message,null); } }
- Local Transaction Listener
@RocketMQTransactionListener @Slf4j public class SalesOrderSubmitTxMessageListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { RocketMQLocalTransactionState result; try { final BusinessMessage event = getBusinessMessage(message); if (event.val == 1) result = RocketMQLocalTransactionState.UNKNOWN; if (event.val == 2) result = RocketMQLocalTransactionState.ROLLBACK; if (event.val == 3) result = RocketMQLocalTransactionState.COMMIT; } catch (Exception ex) { result = RocketMQLocalTransactionState.ROLLBACK; if (!(ex instanceof BizException)) { log.error("Error occurred.", ex); } } return result; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { RocketMQLocalTransactionState result; try { final BusinessMessage event = getBusinessMessage(message); if (event.val == 1) result = RocketMQLocalTransactionState.UNKNOWN; if (event.val == 2) result = RocketMQLocalTransactionState.ROLLBACK; if (event.val == 3) result = RocketMQLocalTransactionState.COMMIT; } catch (Exception ex) { result = RocketMQLocalTransactionState.ROLLBACK; if (!(ex instanceof BizException)) { log.error("Error occurred.", ex); } } return result; } /** * Get event instance from message payload. */ private SalesOrderSubmitMessage getBusinessMessage(Message msg) throws JsonProcessingException { final String json = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); final ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(json, BusinessMessage.class); } }
- Send Transaction Message
- Consumer
@Component @RocketMQMessageListener( topic = ("tx_topic"), consumerGroup = ("test-tx-message-group") ) @Slf4j public class BusinessEventMsgHandler implements RocketMQListener<BusinessMessage> { @Override public void onMessage(BusinessMessage event) { log.info("Consumer success"); } }
从Producer发送事务消息到本地事务执行,再到返回Commit,Consumer可以消费该topic,整个链路完成了。
问题描述
但如果业务A需要发Message A到topic A的事务消息,业务B是需要发Message B到topic B的事务消息,那用一个RocketMQLocalTransactionListener来区分不同的业务消息及不同的topic就很勉强了,所以需要一个通用处理来识别当前执行的本地事务是属于哪个业务的,然后去执行它对应的本地事务逻辑
优雅处理
定义事务处理接口
想要确认一条事务消息属于什么业务有两点
- 业务消息类型
- 业务Topic
那不同的业务事务处理器就需要实现这两个接口,这样j就能找到对应的业务处理器
public interface TxMessageProcessor<T> {
/**
* Get Tx message type
*
* @return {@link Object}
*/
default Class<T> getMessageType() {
return (Class<T>) ((ParameterizedType) getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0];
}
/**
* Get topic of the corresponding transaction message
*
* @return Topic
*/
String getTopic();
/**
* Execute local transaction.
*
* @param message Message object {@link Object}
* @param context Arg of message
* @return {@link RocketMQLocalTransactionState}
*/
RocketMQLocalTransactionState executeLocalTransaction(T message, TxProcessContext context);
/**
* Check local transaction
*
* @param message Message object {@link Object}
* @return {@link RocketMQLocalTransactionState}
*/
RocketMQLocalTransactionState checkLocalTransaction(T message);
}
事务通用处理器
@RocketMQTransactionListener
@Slf4j
public class DefaultRocketMQTransactionDispatcher<T> implements RocketMQLocalTransactionListener {
@Resource
private List<TxMessageProcessor<T>> processors;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
TxMessageProcessor<T> processor = getMatchProcessor(processors, message);
T txMessage = getMatchMessage(message, processor.getMessageType());
TxProcessContext context = (TxProcessContext) o;
log.info("Start execute transaction,message type: {}, message: {}, context: {} ", txMessage.getClass().getSimpleName(), txMessage, context);
return processor.executeLocalTransaction(txMessage, context);
} catch (Exception ex) {
log.error("Error occurred in executeLocalTransaction.", ex);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
try {
TxMessageProcessor<T> processor = getMatchProcessor(processors, message);
T txMessage = getMatchMessage(message, processor.getMessageType());
log.info("Check local transaction,message type: {}, message: {}", txMessage.getClass().getSimpleName(), txMessage);
return processor.checkLocalTransaction(txMessage);
} catch (Exception ex) {
log.error("Error occurred in checkLocalTransaction.", ex);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
public TxMessageProcessor getMatchProcessor(List<TxMessageProcessor<T>> processors, Message message) {
String topic = (String) message.getHeaders().get(TxMessageConst.ROCKETMQ_TOPIC);
String messageType = (String) message.getHeaders().get(TxMessageConst.MESSAGE_TYPE);
return processors.stream()
.filter(processor -> processor.getMessageType().getName().equals(messageType))
.filter(processor -> processor.getTopic().equals(topic))
.findFirst()
.orElseThrow();
}
private T getMatchMessage(Message message, Class<T> msgType) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
final String json = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
return mapper.readValue(json, msgType);
}
}
定义通用常量
public class TxMessageConst {
public static final String ROCKETMQ_TOPIC = "rocketmq_TOPIC";
public static final String MESSAGE_TYPE = "message_type";
public static Message buildTxMessage(Object message) {
return MessageBuilder
.withPayload(message)
.setHeader(TxMessageConst.MESSAGE_TYPE, message.getClass().getName())
.build();
}
}
业务处理器
@Component
@Slf4j
public class SubmitMessageProcessor implements TxMessageProcessor<SubmitMessage> {
@Override
public String getTopic() {
return "submit_topic";
}
@Override
public RocketMQLocalTransactionState executeLocalTransaction(SubmitMessage message, TxProcessContext context) {
log.info("Start execute transaction, message: {}, context: {} ", message, context);
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(SubmitMessage message) {
log.info("Check local transaction, message: {} ", message);
return RocketMQLocalTransactionState.COMMIT;
}
}
生产者
public void sendSubmitTxMessage() {
Message msg = TxMessageConst.buildTxMessage(new SubmitMessage(99999, "I am submit test Msg", "Submit Message Description:1"));
TxProcessContext context = new TxProcessContext(new Object());
TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction(
"submit_topic",
msg,
context);
}
测试
从message的header取出业务消息类型和业务topic,拿到业务处理器,从而执行对应的业务逻辑