1. Spring Boot 集成 RabbitMQ 消息事务(生产者)
1.1. 版本说明
构件 | 版本 |
---|---|
spring-boot | 2.7.18 |
spring-boot-starter-amqp | 2.7.18 |
spring-boot-starter-jdbc | 2.7.18 |
spring-boot-starter-web | 2.7.18 |
1.2. 概览
这里模拟一个常见的业务流程,提供一个 Http 接口,这个接口会向 RabbitMQ 发送一条消息,同时更新数据库。这里涉及到分布式事务,本案例采用最大努力单阶段提交模式来实现事务管理。
1.2.1. 最大努力单阶段提交模式
最大努力单阶段提交模式是相当普遍的,但在开发人员必须注意的某些情况下可能会失败。这是一种非 XA 模式,涉及了许多资源的同步单阶段提交。因为没有使用二阶段提交,它绝不会像 XA 事务那样安全,但是如果参与者意识到妥协,通常就足够了。许多高容量,高吞吐量的事务处理系统通过设置这种方式以达到提高性能的目的。
1.2.2. 成功的业务流程
flowchart TB 1["1. 开始消息事务"] --> 2["2. 发送消息"] --> 3["3. 开始数据库事务"] --> 4["4. 更新数据库"] --> 5["5. 提交数据库事务"] --> 6["6. 提交消息事务"]1.2.3. 失败的业务流程
flowchart TB 1["1. 开始消息事务"] --> 2["2. 发送消息"] --> 3["3. 开始数据库事务"] --> 4["4. 更新数据库失败"] --> 5["5. 回滚数据库事务"] --> 6["6. 回滚消息事务"]1.3. 新建数据库表
create table t_user
(
id int auto_increment primary key,
name varchar(20) not null
);
1.4. Spring 配置
spring:
application:
name: spring-rabbit-transaction-producer-demo
rabbitmq:
addresses: 127.0.0.1:5672
username: admin
password: admin
virtual-host: /
datasource:
url: jdbc:mysql://127.0.0.1:3306/demo
username: root
password: root
1.5. 定义常量
public class RabbitTransactionProducerDemoConstants {
public static final String EXCHANGE = "spring-rabbit-transaction-producer-demo-exchange";
public static final String QUEUE = "spring-rabbit-transaction-producer-demo-queue";
}
1.6. 配置交换机和队列
@Bean
public FanoutExchange exchange() {
return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
}
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE).build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange());
}
1.7. 定义 RabbitMQ 消息事务管理器
@Bean(name = "rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
1.8. 定义 RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
//Channel 启用事务
template.setChannelTransacted(true);
return template;
}
1.9. 定义数据库事务管理器
@Bean(name = "dataSourceTransactionManager")
@Primary
DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
return transactionManager;
}
1.10. 测试
@RestController
@Slf4j
public class SpringRabbitTransactionProducerDemo {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private JdbcTemplate jdbcTemplate;
@Resource
private TransactionDefinition transactionDefinition;
@Resource
private DataSourceTransactionManager dataSourceTransactionManager;
//Spring 消息事务,指定事务管理器为 RabbitTransactionManager
@Transactional(rollbackFor = Throwable.class, transactionManager = "rabbitTransactionManager")
@GetMapping("/transaction")
public void transaction() {
String name = "Jason";
//发送一条消息,接下来因数据库插入数据异常,消息事务回滚,并不会真正把消息发送出去
rabbitTemplate.convertAndSend(EXCHANGE, null, name);
//手动开启数据库事务
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
//往数据库表插入两条主键 id 一样的数据,引起主键 id 重复异常
jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, name));
jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, name));
//手动提交数据库事务,因上面插入数据异常,并不会执行到这里
dataSourceTransactionManager.commit(transactionStatus);
} catch (Throwable throwable) {
//捕获异常,手动回滚数据库事务
dataSourceTransactionManager.rollback(transactionStatus);
//抛出异常,让 Spring 回滚 RabbitMQ 消息事务
throw throwable;
}
}
}