一、背景
有个业务系统(订单系统),通过后台日志和监控观察,系统偶尔会出现重复唯一索引问题,例如:后台日志片段
Duplicate entry 'service_no' for key 'idx_service_no' ....
也就是说写入数据与数据库已有数据发生重复。
下面我们分析一下问题出现在哪里:
这个字段就是业务编码 :service_no
这个取号规则是:要固定长度13位数,由首写大写字母 F+年月日(201201)+6 位有序数字组成。
6位有序数字,是当天数据库增量已有数据最大编码序列依次累加。
首先说明一下:每天6位数字(最高是999999)一天最多99万个序列号绝对够当前业务使用的,(实际上一天最多也就几万个单号),所以号量是满足业务需求的。
业务规则没有问题,那说明是程序代码逻辑有问题了,我们来看一下代码情况:
二、分析代码逻辑结构
1、首先取号
获取生成下一个序列号取号方法如下:
/**
* 创建编码内部自带加锁
* @param rule 编码规则参数
* @param params 参数数组
* @return 取号结果字符串
*/
@Transactional
public String create(String rule, String... params) {
String code = "";
String lockName = rule;
if (params.length > 0 && StringUtils.isNotEmpty(params[0])) {
lockName = rule + ":" + params[0];
}
if (params.length > 1) {
lockName = rule + ":" + params[0] + ":" + params[1];
}
try {
//加jedis客户端工具分布式锁
RedisLocker.lock(lockName, 10);
code = getNext(rule, params);
} finally {
//释放锁
RedisLocker.unlock(lockName);
}
return code;
}
/**
* 创建序列编码,无事务控制,需要依赖外层加redis锁!
* @param rule 生成规则
* @param params 参数列表
* @return 取号结果字符串
*/
private String getNext(String rule, String... params) {
String[] rules = rule.split("-");
CodeCondition condition = new CodeCondition();
Code code = new Code();
String prefix = "";
String num = "";
String digit = "";
int i = 0;
for (String str : rules) {
if (str.equals("r")) {
// 类型
code.setType(params[i]);
condition.setType(params[i]);
prefix += params[i];
i++;
} else if (str.equals("c")) {
// 商家编码
code.setShopCode(params[i]);
condition.setShopCode(params[i]);
prefix += params[i];
i++;
} else if (str.contains("yy") || str.contains("MM")) {
// 日期
//SimpleDateFormat formatter = new SimpleDateFormat(str);
String t = DateUtils.getCurrentDate(str);
code.setTime(t);
prefix += t;
} else if (str.contains("N")) {
// 数字
digit = str.substring(1);
} else {
// 其它
prefix += str;
}
}
code.setPrefix(prefix);
condition.setPrefix(prefix);
if (digit.length() > 0) {
int n = 1;
Integer serialNumber = codeDao.findNewOneByManual(condition);
if (serialNumber != null) {
n = serialNumber + 1;
}
code.setSerialNumber(n);
num = String.format("%0" + digit + "d", n);
}
code.setCode(prefix + num);
// 添加3次重试机制
boolean success = codeDao.getNextCode(code);
if (success) {
return prefix + num;
}
return null;
}
2、数据操作层事务
提交到DAO层准备交给执行JDBC去执行提交到数据库,主要使用了手动控制事务提交代码如下:
/**
* 手动控制事务
* @param param 提交新的对象值更新
* @return
*/
public boolean getNextCode(final Code param){
//手动控制事务
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
return transactionTemplate.execute(new TransactionCallback<Boolean>() {
public Boolean doInTransaction(final TransactionStatus status) {
try {
int i = 0;
CodeCondition con = new CodeCondition();
con.setType(param.getType());
con.setTime(param.getTime());
con.setShopCode(param.getShopCode());
//根据入参条件查询是否存在编码对象数据
Code c = findNewOne(con);
if(c == null){
i = jdbcTemplate.update("INSERT INTO xx_code(id,code,type,time,shop_code,serial_number,prefix,remarks) VALUES(?,?,?,?,?,?,?,?)",
UUID.randomUUID().toString().replaceAll("-", ""),
param.getCode(),
param.getType(),
param.getTime(),
param.getShopCode(),
param.getSerialNumber(),
param.getPrefix(),
param.getRemarks());
} else {
List<Object> params = new ArrayList<Object>();
params.add(param.getCode());
params.add(param.getPrefix());
params.add(param.getSerialNumber());
params.add(c.getTime());
params.add(c.getId());
//使用旧值做匹配条件
String sql = "UPDATE xx_code SET code=?, prefix=?,serial_number=?,time=? WHERE id=? ";
i = jdbcTemplate.update(sql, params.toArray());
}
return i > 0;
} catch (Exception ex) {
status.setRollbackOnly();
logger.error(ex.getMessage(),ex);
}
return false;
}
});
}
这里备注说明一下几个事务属于参数:
PROPAGATION_REQUIRED-- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
假如当前正要执行的事务不在另外一个事务里,那么就起一个新的事务 。
ServiceA {
void methodA() {
ServiceB.methodB();
}
}
ServiceB {
void methodB() {
}
}
比如说,ServiceB.methodB的事务级别定义为PROPAGATION_REQUIRED, 那么由于执行ServiceA.methodA的时候
1、如果ServiceA.methodA已经起了事务,这时调用ServiceB.methodB,ServiceB.methodB看到自己已经运行在ServiceA.methodA的事务内部,就不再起新的事务。这时只有外部事务并且他们是共用的,所以这时ServiceA.methodA或者ServiceB.methodB无论哪个发生异常methodA和methodB作为一个整体都将一起回滚。
2、如果ServiceA.methodA没有事务,ServiceB.methodB就会为自己分配一个事务。这样,在ServiceA.methodA中是没有事务控制的。只是在ServiceB.methodB内的任何地方出现异常,ServiceB.methodB将会被回滚,不会引起ServiceA.methodA的回滚。
在 spring的 TransactionDefinition接口中一共定义了六种事务传播属性:
PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
PROPAGATION_SUPPORTS -- 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY -- 支持当前事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW -- 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED -- 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER -- 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED -- 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。
前六个策略类似于EJB CMT,第七个(PROPAGATION_NESTED)是Spring所提供的一个特殊变量。
它要求事务管理器或者使用JDBC 3.0 Savepoint API提供嵌套事务行为(如Spring的DataSourceTransactionManager)。
3、分布式锁工具
使用jedis工具包作为分布式锁代码如下:
/**
* 在指定时间内等待获取锁
* @param waitTime 等待锁的时间
*
*/
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
// 系统当前时间,以毫微秒为单位。
long nano = System.nanoTime();
do {
if(tryLock()){
logger.debug(this.lockValue + "获取锁");
return Boolean.TRUE;
}
Thread.sleep(new Random().nextInt(100) + 1);
} while ((System.nanoTime() - nano) < unit.toNanos(waitTime));
return Boolean.FALSE;
}
/**
* 阻塞式加锁
*/
public void lock() {
while(!tryLock()){
try {
// 睡眠,降低抢锁频率,缓解redis压力
Thread.sleep(new Random().nextInt(100) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取锁的线程解锁
* 不足:当时阿里云redis集群版本暂不支持执行lua脚本eval函数。
* 参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN
*
*/
public void unlock() {
// 检查当前线程是否持有锁
if (this.lockValue.equals(jedis.get(this.lockName))) {
logger.debug(this.lockValue + "释放锁");
try {
jedis.del(this.lockName);
} finally {
// Jedis 客户端版本是使用 Jedis-2.7.2版本;如果是2.9以上本的版注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。
if (jedis != null) {
jedis.close();
}
}
} else {
logger.debug(Thread.currentThread().getName() + "并非持有锁的线程,未能解锁");
}
}
上面展示代码展示分布式销和事务一些使用,但请注意这里有坑!!!
4、存在哪些坑?
坑1:Jedis客户端版本要注意一下,如果是3.0.1及以上的话 jedis.close();已经被重写,请看客方源代码。
2.9 版本以前连接池使用有returnResource接口方法,3.0.1之后版本被去掉了。
官方重写了close方法,jedis.close不能直接调用。
try {
jedis = pool.getResource();
} finally {
if (jedis != null) {
jedis.close();
}
}
某一次升级了jedis client版本还导致生产环境redis服务被打暴,引发重大事故,所以如果使用jedis client建议使用2.9以下版本还靠谱一些。
其实这种写法,还有一些问题的,需要进一步改进。如何改进?我们往下一步分析。
改进版本1(使用lua脚本替代):
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryLock(Jedis jedis, String lockKey, String requestId, long expireTime) {
//jedis 3.0之后写法
/* SetParams params = new SetParams();
params.px(expireTime);
params.nx();*/
String result = jedis.set(lockKey, requestId,"NX", "PX", expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁(LUA脚本实现)
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return
*/
public static boolean unLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return " +
"0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
if (jedis != null) {
jedis.close();
}
return false;
}
当时阿里云redis集群版本暂不支持执行lua脚本eval函数。所以当时lua脚本方式的无发挥之地!
参考:https://help.aliyun.com/document_detail/26356.html?spm=5176.11065259.1996646101.searchclickresult.3dd24026cWCgRN
改进版本2(使用redisson替代):
/**
* 利用redisson client 实现分布式锁
* @author cgli
*/
public final class RedisLocker {
private final static String LOCKER_PREFIX = "lock:";
private static final Logger logger = LoggerFactory.getLogger(RedisLocker.class);
private RedisLocker() {
}
/**
* 获取连接配置实例
* @return
*/
private static RedissonClient getClient() {
return SingletonHolder.client;
}
/**
* 根据name对进行上锁操作,redissonLock 阻塞式的,采用的机制发布/订阅
* timeout结束强制解锁,防止死锁 :1分钟
* @param lockName 锁名称
*/
public static void lock(String lockName){
lock(lockName,60);
}
/**
* 根据name对进行上锁操作采用redisson RLOCK加锁方式
* @param lockName 锁名称
* @param leaseTime 结束强制解锁,防止死锁 :单位秒
*/
public static void lock(String lockName,long leaseTime){
String key = LOCKER_PREFIX + lockName;
RLock lock = getClient().getLock(key);
//lock提供带timeout参数,timeout结束强制解锁,防止死锁 :1分钟
lock.lock(leaseTime, TimeUnit.SECONDS);
}
/**
* 根据name对进行解锁操作
* @param lockName
*/
public static void unlock(String lockName){
String key = LOCKER_PREFIX + lockName;
RLock lock = getClient().getLock(key);
if(lock.isLocked()){
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
/**
*
* @param resourceName 锁的KEY名称
* @param worker 回调外部工作任务
* @param lockTime 锁定超时时间,默认acquireTimeout=100second 获取锁的超时时间
* @param <T>
* @return
* @throws Exception
*/
public static <T> T lock(String resourceName, AquiredLockWorker<T> worker,
long lockTime) throws Exception {
return lock(resourceName, worker, 100, lockTime);
}
/**
*
* @param resourceName 锁的KEY名称
* @param worker 回调外部工作任务
* @param acquireTimeout 获取锁的超时时间
* @param lockTime 锁定超时时间
* @param <T>
* @return
* @throws Exception
*/
public static <T> T lock(String resourceName, AquiredLockWorker<T> worker, long acquireTimeout,
long lockTime) throws Exception {
RLock lock = getClient().getLock(LOCKER_PREFIX + resourceName);
// Acquire lock and release it automatically after 10 seconds
// if unlock method hasn't been invoked
//lock.lock(10, TimeUnit.SECONDS);
try {
// Wait for acquireTimeout seconds and automatically unlock it after lockTime seconds
boolean res = lock.tryLock(acquireTimeout, lockTime, TimeUnit.SECONDS);
if (res) {
return worker.execute();
}
} finally {
if (lock != null) {
lock.unlock();
}
}
return null;
}
/**
* 内部类实现单例模式
*/
static class SingletonHolder {
private static RedissonClient client = init();
private static RedissonClient init() {
RedisProperties properties = ApplicationContextHolder.getApplicationContext().getBean(RedisProperties.class);
String host = properties.getRedisHost();
int port = Integer.parseInt(properties.getRedisPort());
String password = properties.getRedisPassword();
if (StringUtils.isEmpty(password)) {
password = null;
}
int database = Integer.parseInt(properties.getRedisDataBase());
try {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password)
.setDatabase(database)
//同任何节点建立连接时的等待超时。时间单位是毫秒。默认:10000
.setConnectTimeout(30000)
//当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。默认:3000
//等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认:3000
.setTimeout(10000)
//如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时)
// 计时。默认值:3
.setRetryAttempts(5)
//在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 默认值:1500
.setRetryInterval(3000);
return Redisson.create(config);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
}
}
坑2:使用@Transactional 注解嵌套事务的问题,会导致一些事务混乱,达不到最终的数据效果。
回去最开始代码处
@Transactional //这里加了事务注解
public String create(String rule, String... params) {
try {
//加jedis客户端工具分布式锁
RedisLocker.lock(lockName, 10);
code = getNext(rule, params);
} finally {
//释放锁
RedisLocker.unlock(lockName);
}
}
这种结果达不到预期的结果,就是会出现有时会产生重复的编码,如本文提出的问题。这是为什么呢?
由于spring的AOP机制,会在update/save方法之前开启事务,在这之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁代码块执行是在事务之内执行的。
可以推断在代码块执行完时,事务还未提交,这时如果其他线程进入锁代码块后,读取的库存数据就不是最新的,就可能产生了不是你想要的结果数据。
这个问题我们验证测试一下,写一个测试用例
用jmeter压力测试跑一下,取序列号的结果
在日志分析中,经常是两个线程查到上一个相同的序列号,而不是更新之后的数据结果,如下图:
产生重复编码,说明jedis分布式锁并没有真实锁住。问题就是出现在最外层的 @Transactional注解上,最外层调用代码完全没有必要加了@Transactional。
因为最内层已经加了手动控制事务的控制。拿外层的@Transactional再跑压力测试一切正常。
三、分析代码逻辑结构
1、Redis java客户端不推荐使用jedis,特别是2.9以上的版本,代码没有处理好很容易搞把redis服务搞死(连接数打满),推荐使用redisson代替,性能高且内置实现连接池。
如果真的一定要用jedis使用2.9以下版本并使用 lua脚本来控制,才能实现真正原子性操作。
2、事务注解@Transactional事务控制,嵌套使用时要注意,尽量控制在最小单元的最内层使用,在最外层(大方法)使用有风险,特别是跟锁一起使用时要注意控制两者顺序。
另外@Transactional在分布式环境下,远程调用无效的,并不能当作分布式事务来对待,这个业内有成熟其他方案替代。
3、其实取号生成序列号服务,没有必要使用数据库当作序列计数,完全可以使用redis计数器做实现(内部版本2年前就用redis版本来取号,连续运行两2年多没有发现有啥问题)。
示例代码如下:
private String generateByRedis(BusinessCodeCondition condition) {
String time = "";
String prefix = "";
String type = condition.getType();
StringBuilder sb = new StringBuilder();
sb.append(type);
if (StringUtils.isNotEmpty(condition.getShopCode())) {
sb.append(condition.getShopCode());
}
if (StringUtils.isNotEmpty(condition.getDateFormat())) {
time = DateFormatUtil.formatDate(condition.getDateFormat(), new Date());
sb.append(time);
}
prefix = sb.toString();
sb.setLength(0);
String key = KEY_PREFIX + prefix;
//long n = redisTemplate.opsForValue().increment(key, 1L);
//redisTemplate.expire(key, EXPIRE_TIME, TimeUnit.DAYS);
long n = getIncrementNum(key, condition.getDateFormat());
return String.format("%s%0" + condition.getDigitCount() + "d", prefix, n);
}
//使用redis计数器取序列,每天过期前一天的key值回收。
private Long getIncrementNum(String key, String dateFormat) {
RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
Long counter = entityIdCounter.incrementAndGet();
if ((null == counter || counter.longValue() == 1)) {
if (StringUtils.isNotEmpty(dateFormat) && dateFormat.indexOf("yyMMdd") > -1) {
entityIdCounter.expire(EXPIRE_TIME, TimeUnit.DAYS);
}
}
return counter;
}
标签:编码,事务,return,String,param,并发,params,序列,jedis From: https://www.cnblogs.com/cgli/p/17213403.html