1、常见的分布式事务锁
1、数据库级别的锁
- 乐观锁,给予加入版本号实现
- 悲观锁,基于数据库的for update实现
2、Redis,基于SETNX、EXPIRE实现
3、Zookeeper,基于InterProcessMutex实现
4、Redisson的lock、tryLock(背后原理也是Redis)
2、redis搭建模式
单机: 只有一台,挂了就无法工作。主从:备份关系,数据会同步到从库,可以读写分离。哨兵:master挂了,哨兵就进行选举,选出新的master,作用是监控主从,主从切换。集群:高可用,分散请求,目的是将数据分片存储,节省内内存。
分布式事务: 按照传统的系统架构、下单、扣库存等,这一系列的操作都是在一个应用一个数据库中完成的,也就是要保证了事务的ACID特性。如果在分布式应用中就会涉及到跨应用、跨库。这样就涉及到了分布式事务,就要考虑怎么保证这一列操作要么都成功要么都失败。保证数据的一致性
3、redis分布式锁的原理
互斥性:保证同一时间只有一个客户端可以拿到锁
安全性:只有加锁的服务才有解锁权限,也就是不能让客户端A加的锁,客户端B、C都可以解锁
避免死锁:保证加锁与解锁操作是原子操作,这个其实属于是实现分布式锁的问题,假设a用redis实现分布式锁,假设加锁操作,操作步骤分为两步:1,设置key set(key,value) 2,给key设置过期时间。
Redis实现分布式锁的核心就是:
加锁
SET key value NX EX timeOut
参数说明:
NX:只有这个key不存才的时候才会进行操作,即 if not exists;
EX:设置key的过期时间为秒,具体时间由第5个参数决定
timeOut:设置过期时间保证不会出现死锁【避免宕机死锁】
代码实现:
public Boolean lock(String key,String value,Long timeOut){
String var1 = jedis.set(key,value,"NX","EX",timeOut); //加锁,设置超时时间 原子性操作
if(LOCK_SUCCESS.equals(var1)){
return true;
}
return false;
}
总的来说,执行上面的set()方法就只会导致两种结果:
当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。已有锁存在,不做任何操作。
注:从2.6.12版本后, 就可以使用set来获取锁、Lua 脚本来释放锁。setnx是以前刚开始的实现方式,set命令nx、xx等参数,,就是为了实现 setnx 的功能。
解锁:
代码实现:
public Boolean redisUnLock(String key, String value) {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object var2 = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value));
if (UNLOCK_SUCCESS == var2) {
return true;
}
return false;
}
这段lua代码的意思:首先获取锁对应的value值,检查是否与输入的value相等,如果相等则删除锁(解锁)。
4、Redisson 分布式锁原理(重要)
Redisson是一个在Redis的基础上实现的Java驻内存数据网格。
加锁流程
redisson的lock()、tryLock()方法 底层 其实是发送一段lua脚本到一台服务器:
if (redis.call('exists' KEYS[1]) == 0) then + -- exists 判断key是否存在
redis.call('hset' KEYS[1] ARGV[2] 1); + --如果不存在,hset存哈希表
redis.call('pexpire' KEYS[1] ARGV[1]); + --设置过期时间
return nil; + -- 返回null 就是加锁成功
end; +
if (redis.call('hexists' KEYS[1] ARGV[2]) == 1) then + -- 如果key存在,查看哈希表中是否存在(当前线程)
redis.call('hincrby' KEYS[1] ARGV[2] 1); + -- 给哈希中的key加1,代表重入1次,以此类推
redis.call('pexpire' KEYS[1] ARGV[1]); + -- 重设过期时间
return nil; +
end; +
return redis.call('pttl' KEYS[1]); --如果前面的if都没进去,说明ARGV[2]的值不同,也就是不是同一线程的锁,这时候直接返回该锁的过期时间
参数说明:
KEYS[1]:即加锁的key,RLock lock = redisson.getLock("myLock"); 中的myLock
ARGV[1]:即 TimeOut 锁key的默认生存时间,默认30秒
ARGV[2]:代表的是加锁的客户端的ID,类似于这样的:99ead457-bd16-4ec0-81b6-9b7c73546469:1
其中lock()默认是30秒的生存时间。
锁互斥
假如客户端A已经拿到了 myLock,现在 有一客户端(未知) 想进入:
1、第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。
2、第二个if判断,判断一下,myLock锁key的hash数据结构中, 如果是客户端A重新请求,证明当前是同一个客户端同一个线程重新进入,所以可从入标志+1,重新刷新生存时间(可重入); 否则进入下一个if。
3、第三个if判断,客户端B 会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。
此时客户端B会进入一个while循环,不停的尝试加锁。
watch dog 看门狗自动延期机制
lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
默认值:30000
监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。(如果设置了leaseTimeout那就会自动失效了呀~)
看门狗的时间可以自定义设置:
config.setLockWatchdogTimeout(30000);
看门狗有什么用呢?
假如客户端A在超时时间内还没执行完毕怎么办呢? redisson于是提供了这个看门狗,如果还没执行完毕,监听到这个客户端A的线程还持有锁,就去续期,默认是 LockWatchdogTimeout/ 3 即 10 秒监听一次,如果还持有,就不断的延长锁的有效期(重新给锁设置过期时间,30s)
可以在lock的参数里面指定:
lock.lock(); //如果不设置,默认的生存时间是30s,启动看门狗
lock.lock(10, TimeUnit.SECONDS);//10秒以后自动解锁,不启动看门狗,锁到期不续
如果是使用了可重入锁( leaseTimeout):
lock.tryLock(); //如果不设置,默认的生存时间是30s,启动看门狗
lock.tryLock(100, 10, TimeUnit.SECONDS);//尝试加锁最多等待100秒,上锁以后10秒自动解锁,不启动看门狗
这里的第二个参数leaseTimeout 设置为 10 就会覆盖 看门狗的设置(看门狗无效),在10秒后锁就自动失效,不会去续期;如果是 -1 ,就表示 使用看门狗的默认值。
释放锁机制
lock.unlock(),就可以释放分布式锁。就是每次都对myLock数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key。
- 为了安全,会先校验是否持有锁再释放,防止业务执行还没执行完,锁到期了。(此时没占用锁,再unlock就会报错)
- 主线程异常退出、或者假死
finally {
if (rLock.isLocked()) {
if (rLock.isHeldByCurrentThread()) {
rLock.unlock();
}
}
}
可能存在的问题
如果是 主从、哨兵模式,当客户端A 把 myLock这个锁 key 的value写入了 master,此时会异步复制给slave实例。
万一在这个主从复制的过程中 master 宕机了,主备切换,slave 变成了master。
那么这个时候 slave还没来得及加锁,此时 客户端A的myLock的 值是没有的,客户端B在请求时,myLock却成功为自己加了锁。这时候分布式锁就失效了,就会导致数据有问题。
所以说Redis分布式说最大的缺点就是宕机导致多个客户端加锁,导致脏数据,不过这种几率还是很小的。
5、实际应用(重要),模拟秒杀任务
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.3</version>
</dependency>
yml文件redis配置
spring:
redis:
database: 0
host: 127.0.0.1
port: 6124
配置redisson
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// config.useSingleServer();//单机
// config.useMasterSlaveServers();//集群
// config.useSentinelServers();//哨兵
// config.useClusterServers();//集群
// config.setLockWatchdogTimeout(30000);
//使用的Redis主从模式
config.useMasterSlaveServers()
.setPassword("redis")
.setMasterAddress("redis://82.71.16.139:6379")
.addSlaveAddress("redis://82.71.16.139:6380","redis://82.71.16.139:6381");
return Redisson.create(config);
}
新建两个实体
/**
* @author 公众号:HelloCoder,每天分享Java技术和面试题
* @date 2020/10/16
* @Description
*/
@Builder
@Data
@TableName("t_book")
@AllArgsConstructor
@NoArgsConstructor
public class Book {
@TableId(value = "book_id", type = IdType.AUTO)
private long bookId;
private String name;
private int count;
}
@Builder
@Data
@TableName("t_book_order")
@AllArgsConstructor
@NoArgsConstructor
public class Order {
@TableId(value = "id", type = IdType.AUTO)
private int id;
private String orderId;
private long bookId;
private int status;
private long userId;
private int count;
private String billTime;
}
@RestController
@Slf4j
@RequestMapping("Order/")
public class OrderController {
@Autowired
BookOrderService bookOrderService;
@RequestMapping("/seckill")
public RetResult seckill(@RequestParam(value = "bookId") Long bookId, @RequestParam(value = "userId", required = false) Long userId) {
if (userId == null) {
//模拟userId,随机生成,这里应该有前端传入
userId = (long) (Math.random() * 1000);
}
String result = bookOrderService.seckill(bookId, userId);
return RetResponse.makeOKRsp(result);
}
}
这里模拟了两种情况:
一种是不加锁,第二种是加redis锁
@Slf4j
@Service
public class BookOrderService {
@Autowired
BookMapper bookMapper;
@Autowired
OrderMapper orderMapper;
@Autowired
RedissonClient redissonClient;
public String seckill(Long bookId, Long userId) {
return notLockDemo(bookId, userId);
// return lockDemo(bookId, userId);
}
String lockDemo(Long bookId, Long userId) {
final String lockKey = bookId + ":" + "seckill" + ":RedissonLock";
RLock rLock = redissonClient.getLock(lockKey);
try {
// 尝试加锁,最多等待20秒,上锁以后10秒自动解锁
Boolean flag = rLock.tryLock(20, 10, TimeUnit.SECONDS);
if (flag) {
//1、判断这个用户id 是否已经秒杀过
List<Order> list = orderMapper.selectList(new QueryWrapper<Order>().lambda().eq(Order::getUserId, userId).eq(Order::getStatus, 1).eq(Order::getBookId, bookId));
if (list.size() >= 1) {
log.info("你已经抢过了");
return "你已经抢过了,一人只能抢一次";
}
//2、查库存
Book book = bookMapper.selectOne(new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
if (book != null && book.getCount() > 0) {
//生成订单
String orderId = UUID.randomUUID().toString();
Order newOrder = Order.builder().
orderId(orderId).
status(1).
bookId(bookId).
userId(userId).
count(1).
billTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())).build();
orderMapper.insert(newOrder);
//更新库存
Book newBook = Book.builder().count(book.getCount() - 1).build();
bookMapper.update(newBook, new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
log.info("userId:{} 秒杀成功", userId);
return "秒杀成功" + "";
} else {
log.info("秒杀失败,被抢完了");
}
} else {
log.info("请勿重复点击,userid:{} ", userId);
return "你已经抢过了";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (rLock.isLocked()) {
if (rLock.isHeldByCurrentThread()) {
rLock.unlock();
}
}
}
return "很遗憾,没货了...";
}
String notLockDemo(Long bookId, Long userId) {
//1、判断这个用户id 是否已经秒杀过
List<Order> list = orderMapper.selectList(new QueryWrapper<Order>().lambda().eq(Order::getUserId, userId).eq(Order::getStatus, 1).eq(Order::getBookId, bookId));
if (list.size() >= 1) {
log.info("你已经抢过了");
return "你已经抢过了,一人只能抢一次";
}
//2、查库存
Book book = bookMapper.selectOne(new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
if (book != null && book.getCount() > 0) {
//生成订单
String orderId = UUID.randomUUID().toString();
Order newOrder = Order.builder().
orderId(orderId).
status(1).
bookId(bookId).
userId(userId).
count(1).
billTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())).build();
orderMapper.insert(newOrder);
//更新库存
Book newBook = Book.builder().count(book.getCount() - 1).build();
bookMapper.update(newBook, new QueryWrapper<Book>().lambda().eq(Book::getBookId, bookId));
log.info("userId:{} 秒杀成功", userId);
return "秒杀成功" + "";
} else {
log.info("秒杀失败,被抢完了");
return "很遗憾,没货了...";
}
}
}
新建两个表
DROP TABLE IF EXISTS `t_book` ;
CREATE TABLE `t_book` (
`book_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(400) DEFAULT NULL COMMENT '名称',
`count` int DEFAULT 0 COMMENT '数量',
PRIMARY KEY (`book_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
DROP TABLE IF EXISTS `t_book_order` ;
CREATE TABLE `t_book_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` varchar(100) NOT NULL COMMENT '订单号',
`book_id` bigint(20) NOT NULL COMMENT '商品id',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`status` int DEFAULT 1 COMMENT '状态',
`count` int DEFAULT 0 COMMENT '购买数量',
`bill_time` datetime DEFAULT NULL COMMENT '下单时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
INSERT INTO `seckill`.`t_book`(`book_id`, `name`, `count`) VALUES (1, '《HaC的自传》', 5);
测试
1、配置Nginx
配置Nginx,分流进入两个服务。
修改nginx.conf
upstream mysite {
server 127.0.0.1:8090 weight=1;
server 127.0.0.1:8091 weight=1;
}
server {
listen 80;
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location / {
proxy_pass http://mysite;
}
}
说明:当访问localhost:80 端口会分流到8090和8091端口
启动服务,启动两个端口的服务,模拟分布式部署。
(1)不加锁的情况下
使用jmeter 模拟并发。不加锁的情况模拟10个请求在1s发出 共2次,方便查看:
查看一下日志:
8090这台服务器:
8091这台服务器:
同一时间进入请求。
查询一下订单:
库存为0之后,但是初始化只有 5 本书,最后竟然出现了18个订单,显然是有问题的。
这就是不加锁的结果。
(2)加锁的情况下
8090服务器:
8091服务器:
看一下数据库:
刚好生成 5 个订单,没有超卖的现象。