《分布式事务系列教程-第四章-XA分布式事务解决方案》
一、 XA解决方案
XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器(TM)和资源管理器(RM)。其中资源管理器(RM)由数据库实现,比如Oracle、DB2这些数据库都实现了XA规范的接口,而TM作为全局的调度者,负责各个RM的提交和回滚。XA协议也分为2PC和3PC,本章讨论的是XA协议的2PC;
1.1 MySQL的XA实现
MySQL XA是基于DTP分布式事务处理模型标准实现的,支持多数据源的分布式事务。
命令如下:
- 开启一个全局事务:
xa start xid
xa start "001";
- 将全局事务置于空闲状态(此状态时所有的RM资源已经锁定):
xa end xid
xa start "001";
- 准备阶段:
xa prepare xid
xa prepare "001"
- 提交阶段:
xa commit xid
xa commit "001";
- 不进入两段式直接提交:
xa commit xid one phase;
xa commit "001" one phase;
XA执行流程:
创建一张测试表:
create table user(
id int,
username varchar(30)
);
insert into user values(1,'zs');
案例:
mysql> xa start '001';
Query OK, 0 rows affected (0.00 sec)
mysql> update user set username='ls';
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> xa end '001';
Query OK, 0 rows affected (0.00 sec)
mysql> xa prepare '001';
Query OK, 0 rows affected (0.00 sec)
mysql> xa commit '001'; # 释放锁资源
Query OK, 0 rows affected (0.00 sec)
mysql>
Tips:在MySQL控制台中演示不出一个全局事务中执行多个分支事务,我们可以在下一小结使用Java客户端在一个全局事务操作多个分支事务。
1.2 Atomikos实现分布式事务
JTA(Java Transaction API): Java事务API(编程接口),是XA在Java上的一种实现。JTA底层采用的是2PC(两阶段提交协议)
在JTA中存在以下三种角色:
- TransactionManager: 事务管理器
- XAResource: 资源管理器。代表每一个数据源(RM)
- XID: 事务ID,每一个独立的数据源都会分配一个事务ID(前面的xa start xid)
JTA只是Java提供的一个分布式多数据源的一套编程接口,来帮助我们解决分布式事务,我们具体的实现采用Atomikos,Atomikos是一个为Java平台提供增值服务的并且开源类事务管理器
搭建工程
创建订单数据库、库存数据库:
create database orders;
use orders;
CREATE TABLE `t_orders` (
`id` varchar(30) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into `t_orders`(`id`,`count`) values ('1',101);
create database store;
use store;
CREATE TABLE `t_store` (
`id` varchar(30) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into `t_store`(`id`,`count`) values ('1',100);
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.nc</groupId>
<artifactId>xa_transaction</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>xa_transaction</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.17</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
DataSourceConfig:
package com.lscl.config;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;
import java.util.Properties;
@Configuration
public class DataSourceConfig {
/**
* t_orders数据源
*
* @return
*/
@Bean(name = "ordersDS")
@Qualifier("ordersDS")
public AtomikosDataSourceBean ordersDS() {
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName("ordersDS");
atomikosDataSourceBean.setXaDataSourceClassName(
"com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
Properties properties = new Properties();
properties.put("URL","jdbc:mysql://localhost:3306/orders");
properties.put("user", "root");
properties.put("password", "admin");
atomikosDataSourceBean.setXaProperties(properties);
return atomikosDataSourceBean;
}
/**
* t_store数据源
*
* @return
*/
@Bean(name = "storeDS")
@Qualifier("storeDS")
public AtomikosDataSourceBean storeDS() {
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName("storeDS");
atomikosDataSourceBean.setXaDataSourceClassName(
"com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
Properties properties = new Properties();
properties.put("URL", "jdbc:mysql://localhost:3306/store");
properties.put("user", "root");
properties.put("password", "admin");
atomikosDataSourceBean.setXaProperties(properties);
return atomikosDataSourceBean;
}
/**
* transaction manager
*
* @return
*/
@Bean
public UserTransactionManager userTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
/**
* jta transactionManager
*
* @return
*/
@Bean
public JtaTransactionManager transactionManager() {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
jtaTransactionManager.setTransactionManager(userTransactionManager());
return jtaTransactionManager;
}
}
Controller:
package com.lscl.controller;
import com.lscl.service.OrdersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrdersController {
@Autowired
private OrdersService ordersService;
@RequestMapping("/addOrders/{flag}")
public String add(@PathVariable Integer flag) throws Exception{
ordersService.add(flag);;
return "ok";
}
}
Service:
package com.lscl.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@Service
public class OrdersService {
@Autowired
@Qualifier("ordersDS")
private AtomikosDataSourceBean ordersDS; //RM1
@Autowired
@Qualifier("storeDS")
private AtomikosDataSourceBean storeDS; //RM2
@Transactional
public void add(Integer flag) throws Exception {
Connection ordersConn = null;
Connection storeConn = null;
try {
ordersConn = ordersDS.getConnection();
storeConn = storeDS.getConnection();
// 订单+1
String ordersSQL = "update t_orders set count=count+1";
// 库存-1
String storeSQL = "update t_store set count=count-1";
// 执行订单+1
Statement ordersState = ordersConn.createStatement(); // prepare 阶段
ordersState.execute(ordersSQL);
if (flag == 500) {
int i = 1 / 0; // 模拟异常
}
// 执行库存-1
Statement storeState = storeConn.createStatement(); // prepare 阶段
storeState.execute(storeSQL);
// 代码没有问题准备发起全局提交 commit 阶段
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (ordersConn != null) {
ordersConn.close();
}
if (storeConn != null) {
storeConn.close();
}
}
}
}
调用图解:
1.3 总结
1)在执行分支事务时,会将RM资源锁住,需要等到所有的RM响应,等到第二阶段执行完毕时(提交/回滚),RM的锁才会释放,在高并发场所不适用。
2)XA方案依赖于本地数据库对XA协议的支持,如果本地数据库不支持XA协议那么第三方程序(Java)将操作不了。例如许多非关系型数据库并没有支持XA。
3)MySQL对XA方案支持的不太友好,MySQL的XA实现,没有记录prepare阶段日志。