背景
在最近的项目开发中,遇到了一个关于版本号递增出现重复数据的问题。我们使用了Redisson分布式锁来确保自定义版本号的唯一性。在创建版本号的方法中,我们使用了Redisson来锁住创建版本的代码,并在该方法上添加了Spring的声明式事务注解@Transactional。
然而,在使用JMeter进行并发测试时,我们发现了多条重复的版本号数据。通过调试,我们发现问题的原因在于,在我们释放锁之后,Spring才提交事务。这样一来,其他请求线程在获取到锁后,会查询到旧数据,从而导致产生重复版本号。
导致插入重复数据的代码实现逻辑如下:
通过业务ID的传入,我们尝试获取锁,等待时间10秒,超时时间1分钟。如果成功获取锁,执行以下操作:
- 创建一个
TestVersion
对象,设置其pid
属性为id
,并设置其ver
属性为getMaxVersion(id)
方法返回的最大版本号。 - 使用
testVersionMapper
将TestVersion
对象插入到数据库中。 - 通过
@Transactional
注解,确保在发生异常时能够回滚事务。
从实现逻辑来看,是没有问题的。
@GetMapping("/tttt/{id}")
@Transactional
public String index(@PathVariable("id") String id) {
RLock lock = redissonClient.getLock(id);
try {
// 尝试获取锁,等待时间10秒,超时时间1分钟
if (lock.tryLock(10, 60, TimeUnit.SECONDS)) {
TestVersion ttt = new TestVersion();
ttt.setPid(id);
ttt.setVer(getMaxVersion(id));
testVersionMapper.insert(ttt);
} else {
throw new RuntimeException("Could not obtain lock for version control");
}
} catch (InterruptedException e) {
throw new RuntimeException("Lock acquisition interrupted", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return "Hello Spring Boot 3.x!";
}
private Integer getMaxVersion(String id) {
try {
// 最新版本
QueryWrapper<TestVersion> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("pid", id);
TestVersion ttt = testVersionMapper.getMaxNum(queryWrapper);
if (Objects.nonNull(ttt)) {
ver = ttt.getVer() + 1;
}
} catch (Exception e) {
e.printStackTrace();
}
return ver;
}
但是,当我们通过Jmeter进行并发测试时候,神奇的一幕发生了。
数据库中插入了很多重复的版本号。
分析原因
首先我们在数据插入的节点处进行断点调试,发现不同的线程请到了相同的查询结果。我们推断造成此结果的原因:
- Lock没有锁住。
- 上一个线程的锁被释放,但是事务没有被提交,导致下一个线程进到此处时读到历史数据。
逐一定位
首先,我们针对Lock没有锁住的问题,经过测试,即可发现,Lock不会存在没有锁住的问题。
那么就是锁被释放,但是事务没有被提交。
锁被释放,但是事务为什么没有被提交呢?是什么原因导致的?
我们通过以下代码示例,分析锁释放的时间,和事务提交的时间先后顺序,发现@Transactional事务对Redisson分布式事务的锁会造成影响。下面通过代码我们就对此进行分析一下。
首先、注册事务同步回调,打印事务提交前后的状态。
public String index(@PathVariable("id") String id) {
RLock lock = redissonClient.getLock(id);
try {
// 尝试获取锁,等待时间10秒,超时时间1分钟
lock.tryLock(10, 60, TimeUnit.SECONDS);
// 注册事务同步回调,在事务提交后释放锁
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
System.err.println("abandon lock after commit");
lock.unlock();
}
@Override
public void afterCompletion(int status) {
if (status != STATUS_COMMITTED) {
System.err.println("abandon lock after completion");
lock.unlock();
}
}
});
TestVersion ttt = new TestVersion();
ttt.setPid(id);
ttt.setVer(getMaxVersion(id));
testVersionMapper.insert(ttt);
} catch (InterruptedException e) {
throw new RuntimeException("Lock acquisition interrupted", e);
} finally {
System.err.println("abandon lock finally");
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return "Hello Spring Boot 3.x!";
}
通过下面的执行结果,我们可以看出,Lock锁是在Spring事务提交前被释放的。这样就会造成,下面的其他线程会查询到历史的版本号数据,最终导致了重复版本号的出现。
解决方式
既然我们已经知道了,是@Transactional导致的问题产生的原因。那么最简单粗暴的方式,就是不使用它了。但是很多的业务场景终究需要保证数据的一致性,可以参照以下实现方式。
第一种方式(非常不推荐)
MySQL排他锁
MySQL中的排他锁(Exclusive Locks,也称为写锁)是用于保证数据一致性的一种锁类型。当一个事务对某个资源加上排他锁后,其他事务不能对这个资源加任何锁。排他锁通常用于数据的修改操作,比如INSERT、UPDATE、DELETE。
在MySQL中,可以使用SELECT ... FOR UPDATE
语句对数据行加排他锁。以下是一个例子:
在这个例子中,**FOR UPDATE子句会阻止其他事务获取该记录的任何锁,直到当前事务结束。**这确保了在同一时间只有一个事务能够修改这条记录。
-- 开启一个事务START TRANSACTION;
-- 选择某个特定的记录,并对其加排他锁
SELECT * FROM your_table WHERE condition LIMIT 1 FOR UPDATE;
-- 进行数据修改
-- UPDATE your_table SET column = value WHERE condition;
-- 提交事务
COMMIT;
即:
@Select("SELECT * FROM test_version WHERE pid = #{pid} ORDER BY ver DESC LIMIT 1 FOR UPDATE")
TestVersion getMaxNumForUpdate(@Param("pid") String pid);
第二种方式
手动控制事务和锁的顺序: 将事务的控制权从 Spring 的声明式事务中移出,手动管理事务和锁的释放顺序。这样可以确保事务在释放锁之前提交。
可以使用 TransactionTemplate
来手动控制事务的提交和锁的释放顺序。以下是一个示例代码:
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
@Service
public class SequenceService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private PlatformTransactionManager transactionManager;
public void createSequence() {
RLock lock = redissonClient.getLock("sequenceLock");
lock.lock();
try {
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.execute(status -> {
// 你的业务逻辑
return null;
});
} finally {
lock.unlock();
}
}
}
第三种方式
使用 Spring 事务同步机制: 结合 Spring 的事务同步机制,可以在事务提交之前做一些操作,比如释放锁。
使用 Spring 的 TransactionSynchronizationManager
,在事务提交前注册一个回调来释放锁。以下是示例代码:
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@Service
public class SequenceService {
@Autowired
private RedissonClient redissonClient;
@Transactional
public void createSequence() {
RLock lock = redissonClient.getLock("sequenceLock");
lock.lock();
try {
// 注册事务同步回调,在事务提交后释放锁
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
lock.unlock();
}
@Override
public void afterCompletion(int status) {
if (status != STATUS_COMMITTED) {
lock.unlock();
}
}
});
// 你的业务逻辑
} catch (Exception e) {
lock.unlock();
throw e;
}
}
}
通过以上的方式,可以有效解决你在分布式环境中遇到的锁与事务提交顺序问题,确保生成的序列是唯一且不重复的。
第四种方式(强烈推荐)
这种业务会在很多这种场景中使用到,上述的方法,如果每次都这样写,未免太过繁琐,而且也会存在安全隐患,系统迭代过程中,漏改就会是一个0x01的事故了… ,虽然现在主张防御编程,但是最为从业者,还是对自己的代码要有些要求。所以针对这种类似的场景,我们需要一种简单的解决方式进行代码解耦。
为了简化这种场景的处理,我们可以定义一个自定义注解,该注解可以同时实现分布式锁和事务管理的功能。这样,开发者只需要在需要保证并发安全和事务管理的方法上使用这个注解,而不需要每次都手动编写分布式锁和事务管理的代码,大大简化了开发过程。
通过自定义注解的方式,将 Redisson 分布式锁和 Spring 事务结合起来,确保锁在事务提交后释放,可以实现一个更优雅和可复用的解决方案。以下是详细的步骤和示例代码。
步骤:
- 定义自定义注解。
- 创建注解处理器。
- 在服务方法上使用自定义注解。
锁的名称是动态的,可以将动态的锁名称作为注解参数传递,并在方法执行时解析锁名称。以下是详细的实现步骤:
定义自定义注解
首先,定义一个自定义注解 @DistributedLock
,并添加一个 lockName
参数,用于动态传递锁的名称。
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
String lockName();
}
创建注解处理器
使用 Spring AOP 来处理该注解,在方法执行前获取锁,在方法执行后释放锁。动态锁名称可以通过 SpEL(Spring Expression Language)来解析。
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.lang.reflect.Method;
@Aspect
@Component
public class DistributedLockAspect {
@Autowired
private RedissonClient redissonClient;
private final ExpressionParser parser = new SpelExpressionParser();
@Around("@annotation(distributedLock)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
// 解析动态锁名称
String lockNameSpel = distributedLock.lockName();
EvaluationContext context = new StandardEvaluationContext();
Object[] args = joinPoint.getArgs();
String[] paramNames = signature.getParameterNames();
for (int i = 0; i < args.length; i++) {
context.setVariable(paramNames[i], args[i]);
}
String lockName = parser.parseExpression(lockNameSpel).getValue(context, String.class);
RLock lock = redissonClient.getLock(lockName);
lock.lock();
try {
// 注册事务同步回调,在事务提交后释放锁
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
lock.unlock();
System.err.println("abandon lock after commit");
}
@Override
public void afterCompletion(int status) {
if (status != STATUS_COMMITTED) {
lock.unlock();
System.err.println("abandon lock after completion");
}
}
});
// 执行目标方法
return joinPoint.proceed();
} catch (Exception e) {
lock.unlock(); // 立即释放锁
throw e;
}
}
}
使用自定义注解
在需要使用分布式锁的服务方法上添加自定义注解 @DistributedLock
。使用 SpEL 表达式来动态传递锁的名称。
@GetMapping("/tttt/{id}")
@DistributedLock(lockName = "#id")
@Transactional
public String index(@PathVariable("id") String id) {
TestVersion ttt = new TestVersion();
ttt.setPid(id);
ttt.setVer(getMaxVersion(id));
testVersionMapper.insert(ttt);
return "Hello Spring Boot 3.x!";
}
自定义注解:
- 定义
@DistributedLock
注解,并添加lockName
参数,用于传递动态的锁名称。支持 SpEL 表达式。
注解处理器:
- 使用 Spring AOP 的
@Aspect
处理@DistributedLock
注解。 - 在方法执行前,通过 SpEL 解析锁名称。
- 在事务提交后 (
afterCommit
) 或事务完成 (afterCompletion
) 时释放锁。
服务方法:
- 在需要分布式锁的服务方法上添加
@DistributedLock
注解。 - 使用 SpEL 表达式来动态传递锁的名称,例如
"#id"
。
通过这种方式,可以根据业务需求动态生成锁的名称,同时确保在锁释放前事务已经提交,从而避免并发问题。