- 微服务系列:分布式事务 Spring Cloud Alibaba 之 Seata 入门篇
在上一篇入门篇中,我们已经对 Seata
有了大致的了解,并搭建好了 seata-server
服务端也就是 TC 协调者
,同时,我们还集成了 Nacos
。
这篇中我们就要项目实战中来使用 Seata
了,毕竟学习它就是为了实战中使用的。
其实 Seata
使用起来很简单,主要就是使用 @GlobalTransactional
注解,但是搭建过程却还是稍微有点复杂的。
话不多说,开始今天的学习。
一、前言
- 本案例使用的是
Seata
的 AT 模式 - 由于篇幅有限,本文只贴出核心代码和配置,完整代码地址:cloud-seata: seata demo - Gitee.com
- 本案例使用 Nacos 作为注册中心,使用 Nacos 作为配置中心
1. 版本说明
- MySQL 8.0.27
- Nacos Server 2.0.3
- Seata Server 1.4.2
- Spring Boot 2.3.7.RELEASE
- Spring Cloud Hoxton.SR9
- Spring Cloud Alibaba 2.2.5.RELEASE
2. 案例目标
本案例将会创建三个服务,分别是订单服务、库存服务、账户服务,各服务之间的调用流程如下:
- 1)当用户下单时,调用订单服务创建一个订单,然后通过远程调用(OpenFeign)让库存服务扣减下单商品的库存
- 2)订单服务再通过远程调用(OpenFeign)让账户服务来扣减用户账户里面的余额
- 3)最后在订单服务中修改订单状态为已完成
上述操作跨越了三个数据库,有两次远程调用,很明显会有分布式事务的问题,项目的整体结构如下:
cloud-seata
├── seata-account # 账户模块,端口:9201
├── seata-order # 订单模块,端口:9211
└── seata-product # 库存模块,端口:9221
二、代码
1. 账户服务搭建
创建 seata-account
服务模块
1.1、创建数据库
# 账户数据库信息 seata_account
DROP DATABASE IF EXISTS seata_account;
CREATE DATABASE seata_account;
DROP TABLE IF EXISTS seata_account.account;
CREATE TABLE seata_account.account
(
id INT(11) NOT NULL AUTO_INCREMENT,
balance DOUBLE DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_account.undo_log;
CREATE TABLE seata_account.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_account.account (id, balance)
VALUES (1, 50);
其中,库中的undo_log
表,是Seata AT
模式必须创建的表,主要用于分支事务的回滚。
另外,考虑到测试方便,我们插入了一条id = 1
的account
记录。
1.2、添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
由于使用的SpringCloud Alibaba
依赖版本是2.2.5.RELEASE
,其中自带的 seata 版本是1.3.0
,但是我们 Seata 服务端使用的版本是 1.4.2,因此需要排除原有的依赖,重新添加 1.4.2 的依赖。
如果依赖版本不一致,启动后会报如下错误
no available service ‘null’ found, please make sure registry config correct
注意:seata 客户端的依赖版本必须要和服务端一致。
1.3、服务配置文件
server:
port: 9201
# spring配置
spring:
application:
name: seata-account
datasource:
druid:
username: root
password: root
url: jdbc:mysql://localhost:3306/seata_account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
# seata配置
seata:
# 是否开启seata,默认true
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名, 一定要和 config.tx(nacos) 中配置的相同
tx-service-group: ${spring.application.name}-group
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
ruoyi-system-group: default
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
nacos:
# 需要 server 端(registry.config)中配置保持一致
namespace: c18b9158-bcf3-4d5a-b78b-f02bc8a19353
server-addr: localhost:8848
group: SEATA_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
# 这里的名字一定要和 seata 服务端的名称相同,默认是 seata-server
application: seata-server
# 需要与 server 端(registry.config)中配置保持一致
group: SEATA_GROUP
namespace: c18b9158-bcf3-4d5a-b78b-f02bc8a19353
server-addr: localhost:8848
username: nacos
password: nacos
# mybatis配置
mybatis:
# 搜索指定包别名
typeAliasesPackage: com.ezhang.account.mapper
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/xml/*.xml
注意:
- 客户端 seata 中的 nacos 相关配置要和服务端相同,比如地址、命名空间、组…
-
tx-service-group
:这个属性一定要注意,这个一定要和服务端的配置一致,否则不生效;比如上述配置中,就要在 nacos 中新增一个配置service.vgroupMapping.seata-account-group=default
,如下图:
值是 default
注意:记得添加配置的时候要加上前缀
service.vgroupMapping.
1. 4、核心代码
@Service
public class AccountServiceImpl implements AccountService
{
private static final Logger log = LoggerFactory.getLogger(AccountServiceImpl.class);
@Resource
private AccountMapper accountMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void reduceBalance(Long userId, Double price)
{
log.info("=============ACCOUNT START=================");
log.info("当前 XID: {}", RootContext.getXID());
Account account = accountMapper.selectById(userId);
Double balance = account.getBalance();
log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);
if (balance < price)
{
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new RuntimeException("余额不足");
}
log.info("开始扣减用户 {} 余额", userId);
double currentBalance = account.getBalance() - price;
account.setBalance(currentBalance);
accountMapper.updateById(account);
log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
log.info("=============ACCOUNT END=================");
}
}
注意
@Transactional(propagation = Propagation.REQUIRES_NEW)
注解
@RestController
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@PostMapping("/reduceBalance")
public Map<String, Object> reduceBalance(Long userId, Double price){
accountService.reduceBalance(userId, price);
Map<String, Object> map = new HashMap<>();
map.put("code","success");
return map;
}
}
这个主要用来给 Feign
远程调用,其他的代码就不贴了,文末会放上完整代码地址。
2. 仓储服务搭建
仓储服务和账户服务类比着搭建就好了,创建一个 seata-product
服务模块,不要纠结名字(懒得改了)
2.1、创建数据库
# 产品库存数据库信息 seata_product
DROP DATABASE IF EXISTS seata_product;
CREATE DATABASE seata_product;
DROP TABLE IF EXISTS seata_product.product;
CREATE TABLE seata_product.product
(
id INT(11) NOT NULL AUTO_INCREMENT,
price DOUBLE DEFAULT NULL,
stock INT(11) DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_product.undo_log;
CREATE TABLE seata_product.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_product.product (id, price, stock)
VALUES (1, 10, 20);
同样有一个undo_log
表,并且插入了一条测试数据
2.2、添加依赖
这个和账户服务相同,就不贴了
2.3、服务配置文件
和账户服务基本还是相同的,除了端口号,spring.application.name
、数据库连接 等几个个别的
server:
port: 9221
# spring配置
spring:
application:
name: seata-product
同样还是要注意 tx-service-group
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
在 Nacos
控制台中添加一个 service.vgroupMapping.seata-product-group
2.4、核心代码
@Service
public class ProductServiceImpl implements ProductService
{
private static final Logger log = LoggerFactory.getLogger(ProductServiceImpl.class);
@Resource
private ProductMapper productMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Double reduceStock(Long productId, Integer amount)
{
log.info("=============PRODUCT START=================");
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
Product product = productMapper.selectById(productId);
Integer stock = product.getStock();
log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);
if (stock < amount)
{
log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
throw new RuntimeException("库存不足");
}
log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
// 扣减库存
int currentStock = stock - amount;
product.setStock(currentStock);
productMapper.updateById(product);
double totalPrice = product.getPrice() * amount;
log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
log.info("=============PRODUCT END=================");
return totalPrice;
}
}
@RestController
@RequestMapping("/product")
public class ProductController {
@Autowired
private ProductService productService;
@PostMapping("/reduceStock")
public Map<String, Object> reduceStock(Long productId, Integer amount){
Double totalPrice = productService.reduceStock(productId, amount);
Map<String, Object> map = new HashMap<>();
map.put("code", "success");
map.put("totalPrice", totalPrice);
return map;
}
}
3. 订单服务搭建
3.1、创建数据库
# 订单数据库信息 seata_order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;
DROP TABLE IF EXISTS seata_order.p_order;
CREATE TABLE seata_order.p_order
(
id INT(11) NOT NULL AUTO_INCREMENT,
user_id INT(11) DEFAULT NULL,
product_id INT(11) DEFAULT NULL,
amount INT(11) DEFAULT NULL,
total_price DOUBLE DEFAULT NULL,
status VARCHAR(100) DEFAULT NULL,
add_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_order.undo_log;
CREATE TABLE seata_order.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
3.2、添加依赖
除了和上面两个服务一样的依赖之外,订单服务还需要一个 openfeign
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
3.3、服务配置文件
server:
port: 9211
# spring配置
spring:
application:
name: seata-order
datasource:
druid:
username: root
password: root
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
feign:
hystrix:
enabled: true
# seata配置
seata:
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
ruoyi-system-group: default
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
nacos:
namespace: c18b9158-bcf3-4d5a-b78b-f02bc8a19353
server-addr: localhost:8848
group: SEATA_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
application: seata-server
group: SEATA_GROUP
namespace: c18b9158-bcf3-4d5a-b78b-f02bc8a19353
server-addr: localhost:8848
username: nacos
password: nacos
# mybatis配置
mybatis:
# 搜索指定包别名
typeAliasesPackage: com.ezhang.order.mapper
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/**/*.xml
3.4、核心代码
测试接口
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/placeOrder")
public String placeOrder(@Validated @RequestBody PlaceOrderRequest request) {
orderService.placeOrder(request);
return "下单成功";
}
}
具体实现是通过 Feign
远程调用另外两个服务的扣减库存、扣减余额的接口
@Service
public class OrderServiceImpl implements OrderService
{
private static final Logger log = LoggerFactory.getLogger(OrderServiceImpl.class);
@Resource
private OrderMapper orderMapper;
@Autowired
private RemoteAccountService accountService;
@Autowired
private RemoteProductService productService;
@Override
@Transactional
@GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
public void placeOrder(PlaceOrderRequest request)
{
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", RootContext.getXID());
Order order = new Order(userId, productId, 0, amount);
orderMapper.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 扣减库存并计算总价
Map<String, Object> reduceStockMap = productService.reduceStock(productId, amount);
Double totalPrice = Double.valueOf(reduceStockMap.get("totalPrice").toString());
// 扣减余额
accountService.reduceBalance(userId, totalPrice);
order.setStatus(1);
order.setTotalPrice(totalPrice);
orderMapper.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
}
注意:这里有一个 @GlobalTransactional
注解,@GlobalTransactional
是Seata提供的,用于开启全局事务。
其中的 RemoteAccountService
和 RemoteProductService
@FeignClient(contextId = "remoteAccountService", value = "seata-account")
public interface RemoteAccountService {
@PostMapping(value = "/account/reduceBalance")
Map<String, Object> reduceBalance(@RequestParam("userId") Long userId, @RequestParam("price") Double price);
}
@FeignClient(contextId = "remoteProductService", value = "seata-product")
public interface RemoteProductService {
@PostMapping(value = "/product/reduceStock")
Map<String, Object> reduceStock(@RequestParam("productId") Long productId, @RequestParam("amount") Integer amount);
}
因为只是测试 seata
, 所以降级的 fallbackFactory 并没有添加。
不要忘记启动类上的 @EnableFeignClients
注解。
至此,我们的代码基本写好了,接下来就让我们来测试一下。
三、测试
测试前提是,我们测试用的 Nacos
,Seata
、MySQL
都成功启动,然后成功启动上面搭建的三个服务
seata_product
仓储库中 product
表里 id 为 1 的产品价格是 10 库存是 20 个
seata_account
账户库中 account
表里 id 为 1 的用户余额是 50
1. 正常下单
模拟正常下单,买一个商品 http://localhost:9211/order/placeOrder
参数:
{
"userId": 1,
"productId": 1,
"amount": 1
}
查看控制台日志:
2. 库存不足
模拟库存不足,事务回滚 http://localhost:9211/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 21
}
请求异常,控制台日志:
订单表 p_order
里新增的订单记录被回滚
3. 用户余额不足
模拟用户余额不足,事务回滚 http://localhost:9211/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 6
}
请求异常,控制台日志:
p_order
、product
、account
表里记录全部被回滚。
至此,测试完成,本文也就结束了。
完整代码地址:cloud-seata: seata demo - Gitee.com
点个赞吧,彦祖!