使用rocket mq实现分布式事务
发送半消息 -> 执行本地事务 -> 回查本地事务执行状态 -> 第二个服务消费事务消息
1.参照下面链接去安装rocketmq
https://blog.csdn.net/weixin_43464076/article/details/127766159
rocket mq 启动命令: start mqnamesrv.cmd start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf 2.java代码部分 1.添加依赖给maven<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> <exclusions> <exclusion> <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> </exclusion> <exclusion> <groupId>javax.persistence</groupId> <artifactId>javax.persistence-api</artifactId> </exclusion> </exclusions> </dependency>
2.添加rocketmq config 到application yml文件
rocketmq: name-server: 127.0.0.1:9876 # 默认的消息组 producer: group: test
3.发送半消息
@GetMapping("/testTransaction") public String testTransaction() { String transactionId = UUID.randomUUID().toString(); JSONObject jsonObject = new JSONObject(); jsonObject.put("test", "test-value"); MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(jsonObject.toJSONString()); stringMessageBuilder.setHeader("order_id", "test-transaction-001"); stringMessageBuilder.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId); Message message = stringMessageBuilder.build(); this.rocketMQTemplate.sendMessageInTransaction("test-topic", message,"testObj"); return "success;"; }
4. executeLocalTransaction方法执行本地事务,当本地事务执行成功时可以给table t_transaction_log 添加一条记录, 用于检查本地事务是否成功。
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { logger.info("执行本地事务"); MessageHeaders headers = message.getHeaders(); //获取事务ID String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); String testOdr = (String) headers.get("order_id"); logger.info("transactionId is {}, orderId is {}", transactionId, testOdr); try { //执行本地事务,并记录日志 this.logService.doTest(transactionId); //执行成功,可以提交事务 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }
5. 实现 checkLocalTransaction 完成本地事务是否提交成功的检查。
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { MessageHeaders headers = message.getHeaders(); //获取事务ID String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); logger.info("检查本地事务,事务ID:{}", transactionId); //根据事务id从日志表检索 Optional<TransactionLog> optionalTransactionLog = this.transactionRepository.findTransactionLogByTransactionId(transactionId); if (optionalTransactionLog.isPresent()) { return RocketMQLocalTransactionState.COMMIT; } return RocketMQLocalTransactionState.ROLLBACK; }
6. 第二个服务需要实现 RocketMQListener 接口, 完成服务的提交。
@Slf4j @Service @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class TestListener implements RocketMQListener<Message> { @Autowired private MessageService messageService; /** * 收到消息的业务逻辑 */ @Override public void onMessage(Message message) { log.info("received message: {}", message); this.messageService.testTransaction(); log.info("add money success"); } }
标签:事务,String,rocket,RocketMQLocalTransactionState,mq,test,message,transactionId,分布式 From: https://www.cnblogs.com/goblinn/p/17406483.html