伴随着业务体量的上升,我们的qps与并发问题越来越明显,这时候就需要用到让代码一定情况下进行串行执行的工具:锁
1.业务场景代码
@Override
@Transactional(rollbackFor = Exception.class)
public Object testBatch(User user) {
LambdaQueryWrapper<User> eq = Wrappers.<User>lambdaQuery()
.eq(User::getBatch, user.getBatch());
List<User> userList = list(eq);
if (CollUtil.isEmpty(userList)) {
save(user);
} else {
this.lambdaUpdate().eq(User::getBatch, user.getBatch())
.set(User::getUsername, user.getUsername())
.set(User::getUpdateTime, LocalDateTime.now())
.update();
}
redisUtil.delete(key);
return user;
}
备注:上述的代码逻辑在串行执行的时候是没有任何问题的,但是假如同时有两个线程进来:两个线程同时读取到当前batch对应的user为null,那么此时当前两个线程就会同时执行insert语句,导致当前batch本该只有1个user的但是此刻数据库有2个user记录。这个就是并发问题
2.解决方案
此刻我能想到的解决方案有以下三种,此处只讲redis锁
2.1 代码同步执行
2.1.1 redis分布式锁
private int maxCostSeconds = 5;
@Override
@Transactional(rollbackFor = Exception.class)
public Object testBatch(User user) {
String key = "userBatch::" + user.getBatch();
boolean lock = redisUtil.setNxEx(key, key, maxCostSeconds);
LocalDateTime startNow = LocalDateTime.now();
LocalDateTime endNow = LocalDateTime.now();
// 自选等待获取锁,超过5s就放弃
int count = 0;
while (!lock) {
lock = redisUtil.setNxEx(key, key, maxCostSeconds);
if (lock) {
break;
}
endNow = LocalDateTime.now();
int costSeconds = endNow.getSecond() - startNow.getSecond();
if (costSeconds >= maxCostSeconds) {
break;
}
Thread.sleep(500);
System.out.println("获取次数:" + count++);
}
System.out.println("当前线程获取到了 redis锁,线程名" + Thread.currentThread().getName());
if (!lock) {
throw new RunTimeException("系统繁忙,请稍后重试");
}
LambdaQueryWrapper<User> eq = Wrappers.<User>lambdaQuery()
.eq(User::getBatch, user.getBatch());
List<User> userList = list(eq);
if (CollUtil.isEmpty(userList)) {
save(user);
} else {
this.lambdaUpdate().eq(User::getBatch, user.getBatch())
.set(User::getUsername, user.getUsername())
.set(User::getUpdateTime, LocalDateTime.now())
.update();
}
redisUtil.delete(key);
return user;
}
redis工具类
package com.lzq.learn.utils;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtil {
@Resource
private RedisTemplate<String, Object> redisTemplate;
private final RedisScript<String> lockScript = new DefaultRedisScript<>("if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then return ARGV[1] else return nil end", String.class);
private final RedisScript<Long> unlockScript = new DefaultRedisScript<>("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end", Long.class);
public boolean acquireLock(String lockKey, String lockValue, long expireTime) {
String result = redisTemplate.execute(lockScript, Collections.singletonList(lockKey), lockValue, expireTime);
return "OK".equals(result);
}
public void releaseLock(String lockKey, String lockValue) {
redisTemplate.execute(unlockScript, Collections.singletonList(lockKey), lockValue);
}
/**
* 删除key
*
* @param key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
/**
* 批量删除key
*
* @param keys
*/
public void delete(Collection<String> keys) {
redisTemplate.delete(keys);
}
/**
* set NX PX
* @param key key
* @param value value
* @param seconds 过期时间 单位:seconds
* @return boolean
*/
public boolean setNxEx(String key , String value , int seconds){
Boolean result = false;
try {
result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
try {
Object set = redisConnection.execute("set", keySerializer.serialize(key), value.getBytes("UTF-8")
, "NX".getBytes("UTF-8"), "EX".getBytes("UTF-8"),
String.valueOf(seconds).getBytes("UTF-8"));
return "OK".equals(String.valueOf(set));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return false;
}
}
});
}catch (Exception e){
e.printStackTrace();
return false;
}
return result;
}
}
2.1.2 java锁(synchronized、lock)
2.2 数据库唯一索引校验
2.3 数据库锁(select for update行锁,version乐观锁)