1. Spring Boot 集成 RabbitMQ 消息事务(消费者)
1.1. 版本说明
构件 | 版本 |
---|---|
spring-boot | 2.7.18 |
spring-boot-starter-amqp | 2.7.18 |
spring-boot-starter-jdbc | 2.7.18 |
1.2. 概览
这里模拟一个常见的业务流程,消费者接收到一条 RabbitMQ 消息,此时消费者再发布一条 RabbitMQ 消息,同时更新数据库,更新失败时回滚数据库事务,同时拒绝先前接收到的消息,被拒绝的消息将被转发到死信队列,然后回滚消息事务,消费者发送出去的消息将被回滚,并不会真正发布到 RabbitMQ。这里涉及到分布式事务,本案例采用最大努力单阶段提交模式来实现事务管理。
sequenceDiagram participant producer as Spring Boot 应用(生产者) box lightYellow RabbitMQ participant exchange as 交换机 participant queue as 队列 participant deadLetterExchange as 死信交换机 participant deadLetterQueue as 死信队列 end participant consumer as Spring Boot 应用(消费者) participant mysql as 数据库 producer ->> exchange: 1. 发送消息 exchange ->> queue: 2. 转发消息 consumer ->> consumer: 3. 开启消息事务 queue ->> consumer: 4. 接收消息 consumer ->> consumer: 5. 开启数据库事务 consumer ->> exchange: 6. 发布消息 consumer ->> mysql: 7. 更新数据库 mysql ->> consumer: 8. 更新失败 consumer ->> consumer: 9. 回滚数据库事务,撤回第 7 步更新操作 consumer ->> consumer: 10. 回滚消息事务,撤回第 6 步发布操作 consumer ->> queue: 11. 拒绝消息 queue ->> deadLetterExchange: 12. 转发消息 deadLetterExchange ->> deadLetterQueue: 13. 转发消息1.2.1. 最大努力单阶段提交模式
最大努力单阶段提交模式是相当普遍的,但在开发人员必须注意的某些情况下可能会失败。这是一种非 XA 模式,涉及了许多资源的同步单阶段提交。因为没有使用二阶段提交,它绝不会像 XA 事务那样安全,但是如果参与者意识到妥协,通常就足够了。许多高容量,高吞吐量的事务处理系统通过设置这种方式以达到提高性能的目的。
1.2.2. 成功的业务流程
flowchart TB 1["1. 开始消息事务"] --> 2["2. 接收消息"] --> 3["3. 开始数据库事务"] --> 4["4. 发布消息"] --> 5["5. 更新数据库"] --> 6["6. 提交数据库事务"] --> 7["7. 提交消息事务"]1.2.3. 失败的业务流程
flowchart TB 1["1. 开始消息事务"] --> 2["2. 接收消息"] --> 3["3. 开始数据库事务"] --> 4["4. 发布消息"] --> 5["5. 更新数据库失败"] --> 6["6. 回滚数据库事务"] --> 7["7. 回滚消息事务"]1.3. 新建数据库表
create table t_user
(
id int auto_increment primary key,
name varchar(20) not null
);
1.4. Spring 配置
spring:
application:
name: spring-rabbit-transaction-consumer-demo
rabbitmq:
addresses: 127.0.0.1:5672
username: admin
password: admin
virtual-host: /
datasource:
url: jdbc:mysql://127.0.0.1:3306/demo
username: root
password: root
1.5. 定义常量
public class RabbitTransactionConsumerConstants {
public static final String QUEUE = "spring-rabbit-transaction-consumer-demo-queue";
public static final String EXCHANGE = "spring-rabbit-transaction-consumer-demo-exchange";
public static final String DEAD_LETTER_QUEUE = "spring-rabbit-transaction-consumer-demo-dead-latter-queue";
public static final String DEAD_LETTER_EXCHANGE = "spring-rabbit-transaction-consumer-demo-latter-exchange";
public static final String ROLLBACK_QUEUE = "spring-rabbit-transaction-consumer-demo-rollback-queue";
public static final String ROLLBACK_EXCHANGE = "spring-rabbit-transaction-consumer-demo-rollback-exchange";
}
1.6. 配置交换机和队列
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.build();
}
@Bean
public FanoutExchange exchange() {
return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange());
}
@Bean
public FanoutExchange deadLetterExchange() {
return ExchangeBuilder.fanoutExchange(DEAD_LETTER_EXCHANGE).durable(true).build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE)
.build();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
@Bean
public FanoutExchange rollbackExchange() {
return ExchangeBuilder.fanoutExchange(ROLLBACK_EXCHANGE).durable(true).build();
}
@Bean
public Queue rollbackQueue() {
return QueueBuilder.durable(ROLLBACK_QUEUE)
.build();
}
@Bean
public Binding rollbackBinding() {
return BindingBuilder.bind(rollbackQueue()).to(rollbackExchange());
}
1.7. 定义 RabbitMQ 消息事务管理器
@Bean(name = "rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
1.8. 配置 SimpleMessageListenerContainer
@Bean
public ContainerCustomizer<SimpleMessageListenerContainer> simpleMessageListenerContainerCustomizer(RabbitTransactionManager rabbitTransactionManager) {
return container -> {
//Channel 开启事务
// 由于消息事务只适用于发布(publish)和确认(ack)
// 在消费端 Spring 默认会自动 ack
// 因此,只有消费消息的情况下,并不需要开启事务
// 只有在消费消息的同时还发布消息出去,才需要配置开启消息事务
container.setChannelTransacted(true);
//配置事务管理器
container.setTransactionManager(rabbitTransactionManager);
//设置拒绝消息的行为
//值为 true 时,将抛出 AmqpRejectAndDontRequeueException 异常并重新消费该消息
//值为 false 时,将消息转发到死信队列中
container.setDefaultRequeueRejected(false);
};
}
1.9. 定义数据库事务管理器
@Bean(name = "dataSourceTransactionManager")
@Primary
DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager(dataSource);
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(dataSourceTransactionManager));
return dataSourceTransactionManager;
}
1.10. 测试
@Component
@Slf4j
public class SpringRabbitTransactionConsumerDemo implements ApplicationRunner {
@Resource
private JdbcTemplate jdbcTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
rabbitTemplate.convertAndSend(EXCHANGE, null, "Jason");
}
//Spring 数据库事务,指定事务管理器为 DataSourceTransactionManager
@Transactional(rollbackFor = Throwable.class, transactionManager = "dataSourceTransactionManager")
@RabbitListener(queues = {QUEUE})
public void listen(Channel channel, Message<String> message) throws Throwable {
//由于 Channel 已配置为开启事务,因此这里发送出去的消息将会在出现异常时回滚
channel.basicPublish(ROLLBACK_EXCHANGE, "", null, "rollback".getBytes());
//往数据库表插入两条主键 id 一样的数据,引起主键 id 重复异常
jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, message.getPayload()));
jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, message.getPayload()));
}
}