首页 > 其他分享 >基于RocketMQ实现分布式事务

基于RocketMQ实现分布式事务

时间:2024-03-12 09:00:30浏览次数:23  
标签:RocketMQ 事务 生产者 消息 回查 服务端 分布式

image

背景

在一个微服务架构的项目中,一个业务操作可能涉及到多个服务,这些服务往往是独立部署,构成一个个独立的系统。这种分布式的系统架构往往面临着分布式事务的问题。为了保证系统数据的一致性,我们需要确保这些服务中的操作要么全部成功,要么全部失败。通过使用RocketMQ实现分布式事务,我们可以协调这些服务的操作,保证数据的一致性。

功能原理

RocketMQ的分布式事务消息功能,在普通消息基础上,支持二阶段的提交。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

整个事务消息的详细交互流程如下图所示:

image

1、生产者将消息发送至RocketMQ服务端。

2、RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3、生产者开始执行本地事务逻辑。

4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到生产者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者集群中任一生产者实例发起消息回查。

6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7、生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

注意问题

消息类型
事务消息仅支持在MessageType为Transaction的主题使用,即事务消息只能发送至类型为事务消息的主题中。

消息消费
RocketMQ事务消息保证生产者本地事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务自行保证消息正确处理,建议消费端做好消费重试。

中间状态
RocketMQ事务消息一致性为最终一致性,即在消息提交到下游消费端处理完成之前,下游和上游事务之间的状态会不一致。因此,事务消息仅适合能接受异步执行的场景。

事务超时
RocketMQ事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

示例代码

以下为RocketMQ 4.x版本事务消息示例代码,

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;

public class RocketMqTransactionDemo {
	public static void main(String[] args) throws Exception {
		// 创建事务消息生产者
		TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
		producer.setNamesrvAddr("127.0.0.1:9876");

		// 设置事务监听器
		TransactionListener transactionListener = new MyTransactionListener();
		producer.setTransactionListener(transactionListener);

		// 设置事务回查的线程池,可以不必设置,如果不设置也会默认生成一个
		ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable> (2000), new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("client-transaction-msg-check-thread");
				return thread;
			}
		});
		producer.setExecutorService(executorService);

		// 启动生产者
		producer.start();

		// 发送事务消息
		Message message = new Message("transaction_topic", "test_tag", "test_key", "Hello RocketMQ".getBytes());
		producer.sendMessageInTransaction(message, null);

		// 关闭生产者
		producer.shutdown();
	}
}

/**
 * 事务监听器
 */
class MyTransactionListener implements TransactionListener {
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// 执行本地事务操作
		System.out.println("执行本地事务操作,消息内容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE; // 提交事务,允许消费者消费该消息
		// return LocalTransactionState.ROLLBACK_MESSAGE;// 回滚事务,消息将被丢弃不允许消费。
		// return LocalTransactionState.UNKNOW;// 暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
	}

	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		// 检查本地事务状态
		System.out.println("检查本地事务状态,消息内容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE;
	}
}

代码解释:
1、事务消息的生产者使用TransactionMQProducer创建。
2、MyTransactionListener作为事务监听器,实现了接口TransactionListener,该接口有两个方法,分别是:

  • executeLocalTransaction
    半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
    LocalTransactionState.COMMIT_MESSAGE: 提交事务,允许消费者消费该消息。
    LocalTransactionState.ROLLBACK_MESSAGE: 回滚事务,消息将被丢弃不允许消费。
    LocalTransactionState.UNKNOW: 暂时无法判断状态,等待固定时间以后RocketMQ服务端根据回查规则向生产者进行消息回查。

  • checkLocalTransaction
    二次确认消息没有收到,RocketMQ服务端回查生产者端事务结果的方法。回查规则:本地事务执行完成后,若RocketMQ服务端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则RocketMQ服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

标签:RocketMQ,事务,生产者,消息,回查,服务端,分布式
From: https://www.cnblogs.com/ayic/p/18067431

相关文章

  • k04_分布式缓存
    1.redis持久化RDB持久化RDB全称ResdisDatabaseBackupfile(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件成为RDB文件,默认是保存在当前运行目录saveAOF持久化AO......
  • 11_redis事务
    redis事务数据库事务所有的数据库操作都必须一次性完成,要么成功,要么失败。redis事务可以一次执行多个命令,本质上是一组命令的集合。一个事务中的所有命令都会序列化,按顺序地串行化执行而不被其他命令插入,不许加载(不许被不属于该集合的命令插入)。开启:以MULTI开始一个事务入......
  • Spring多线程事务处理
    一、背景本文主要介绍了spring多线程事务的解决方案,心急的小伙伴可以跳过上面的理论介绍分析部分直接看最终解决方案。在我们日常的业务活动中,经常会出现大规模的修改插入操作,比如在3.0的活动赛事创建,涉及到十几张表的插入(一张表可能插入一行或者多行数据),由于单线程模型的关系,......
  • 基于redis做分布式锁
    1.setnx其实是setkeynx,做分布式锁的问题是担心获取到锁的那个线程还没执行del得时候挂了,key会永久存在,可以给key加上expire,其实就是setnxkeyexxxxnx但如何确定expire多大呢?没法确定,所以一般在expire之前做一个续期操作,用独立线程做2.另一种做法是还是用setnx,如果获取不......
  • spring-事务案例
    spring的案例场景同一个事务中使用并发操作导致更新获取锁失败@AutowiredServiceservice1;@TransactionalpublicvoidmethodA(){ List<Object>objs; service1.deleteByid(id1); objs.parallelStream().forEach(o->{ UserEntityusEntity=newUserEntity();......
  • 客户说|从4小时到15分钟,一次分布式数据库的丝滑体验
    文/识货运维总监瞿晟荣识货APP致力于为广大用户提供专业的网购决策指导,为喜欢追求性价比的网购朋友带来及时劲爆的运动、潮流、生活、时尚等网购优惠资讯,产品覆盖国内外主流购物商城。它提供了全球范围内的时尚品牌、潮流单品的信息,帮助用户发现和购买最新、最热、最具性价......
  • MySQL实现事务隔离的原理
    一、readview四个字段create_trx_id:创建该readview的事务的事务idm_ids:创建readview时,当前数据库中的活跃事务(指启动但还没提交的事务)min_trx_id:m_ids的最小值max_trx_id:创建readview后,下一个事务的id二、聚簇索引的隐藏列trx_id:最近一次改动该聚簇索引记录的事务idrol......
  • 分布式锁——JVM锁、MySQL锁解决多线程下并发争抢资源
    分布式锁——JVM锁、MySQL锁解决库存超卖问题引入库存扣案例需求背景电商项目中,用户购买商品后,会对商品的库存进行扣减。需求实现根据用户购买商品及商品数量,对商品的库存进行指定数量的扣减publicStringdeductStock(LonggoodsId,Integercount){//1.查询商品......
  • 分布式锁实现——Redis
    分布式锁分布式锁的视线方式Redis实现分布式锁Zookeeper实现分布式锁MySQL实现分布式锁Etcd实现分布式锁实现分布式锁注意的点互斥性可重入性锁超时,防死锁锁释放正确,防误删阻塞和非阻塞公平和非公平Redis实现分布式锁的特点Redis是高性能的内存数据库,满足高......
  • SpringMVC声明式事务
    事务并发、传播性、隔离级别(重难点)导读:本节重点在于多线程并发环境下的事务处理、和数据库在并发环境下的表锁和行锁。案例:在新增图书的时候,肯定需要先新增作者。SpringMVC声明式事务事务分两种:编程式事务、声明式事务Connectionconnconn.setAutoCommit(false)conn.comm......