首页 > 数据库 >Redisson实现分布式锁剖析

Redisson实现分布式锁剖析

时间:2022-12-20 18:05:22浏览次数:76  
标签:Redisson String Redis redis 剖析 import org redisTemplate 分布式

Redission实现分布式锁:

Redission是什么?

  Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid),它充分利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类,让使用Redis更加简单、便捷,从而让使用者能够将更多精力集中到业务逻辑处理上。

Redission能解决什么问题?

  解决分布式环境下锁的竞争问题,导致的比如说超卖等

基本环境搭建:

实现的需求就是,每访问一次controller:localhost:8080/deductStock,Redis存的数据就会-1

1、创建一个SpringBoot项目,添加pom依赖

<dependencies>
<!--Redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--springboot2.x以后用得是lettuce:lettuce默认连接池使用 common-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Redission实现分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!--SpringBoot依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2、application.properties

# 应用名称
spring.application.name=redission-demo
# 应用服务 WEB 访问端口
server.port=8080
##########################Redis配置###################################
# 连接的那个数据库(默认为0)
spring.redis.database=1
# redis服务的ip地址(默认是本机-127.0.0.1)
spring.redis.host=127.0.0.1
# redis端口号(默认)
spring.redis.port=6379
# redis的密码,没设置过密码,可为空
spring.redis.password=
# 连接超时时间
spring.redis.timeout=10s
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最大连接数
spring.redis.lettuce.pool.max-active=8
# 连接池中的最大等待时间(-1表示没有限制)
spring.redis.lettuce.pool.max-wait=-1ms

3、Redis配置文件

package com.zhixi.config.redis;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.io.File;

/**
* @ClassName RedisConfig
* @Author zhangzhixi
* @Description Redis配置类
* @Date 2022-4-29 10:23
* @Version 1.0
*/
@Configuration
public class RedisConfig {
/**
* @param redisConnectionFactory:配置不同的客户端,这里注入的redis连接工厂不同: JedisConnectionFactory、LettuceConnectionFactory
* @功能描述 :配置Redis序列化,原因如下:
* (1) StringRedisTemplate的序列化方式为字符串序列化,
* RedisTemplate的序列化方式默为jdk序列化(实现Serializable接口)
* (2) RedisTemplate的jdk序列化方式在Redis的客户端中为乱码,不方便查看,
* 因此一般修改RedisTemplate的序列化为方式为JSON方式【建议使用GenericJackson2JsonRedisSerializer】
*/
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = serializer();
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// key采用String的序列化方式
redisTemplate.setKeySerializer(StringRedisSerializer.UTF_8);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(StringRedisSerializer.UTF_8);
//hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}

/**
* 此方法不能用@Ben注解,避免替换Spring容器中的同类型对象
*/
public GenericJackson2JsonRedisSerializer serializer() {
return new GenericJackson2JsonRedisSerializer();
}

@Value("${spring.redis.host}")
private String redisHost;

@Value("${spring.redis.port}")
private String redisPort;

@Value("${spring.redis.database}")
private Integer redisDatabase;

/**
* 注册Redission对象
*
* @return Redission客户端对象
*/
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setDatabase(redisDatabase);
return Redisson.create(config);
}
}

4、Controller

package com.zhixi.controller;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @ClassName RedissionController
* @Author zhangzhixi
* @Description
* @Date 2022-12-12 23:09
* @Version 1.0
*/
@Slf4j
@RestController
public class RedissionController {

/**
* 商品在Redis中的key名称
*/
private static final String KEY_STOCK = "stock";

@Resource
RedisTemplate<String, String> redisTemplate;

@Resource
RedissonClient redissonClient;


@RequestMapping("/deductStock")
public String deductStock() {
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
return "end";
}
}

5、向Redis中设置值

@SpringBootTest
class RedissionDemoApplicationTests {

@Resource
RedisTemplate<String, String > redisTemplate;

@Test
void contextLoads() {
redisTemplate.opsForValue().set("stock","100");
}
}

出现的问题以及解决方案

商品超卖(并发问题)

问题出现:

  比如有三个线程同时来访问这个请求,比如三个线程同时获得商品库存(100),然后-1,正常来说剩余库存是97,但是在并发环境下,可能会出现超卖问题。

下面准备三个线程,同时访问deductStock这个方法,看下是否会出现超卖问题?

@GetMapping("/test")
public void testThread() {
for (int i = 0; i < 3; i++) {
new Thread(()->{
deductStock();
}).start();
}
}

Redisson实现分布式锁剖析_spring

解决办法:

对代码进行加锁

@RequestMapping("/deductStock")
public String deductStock() {
// 在单机情况下,有且仅有一个线程能够访问
synchronized (this) {
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
return "end";
}
}

通过下图可以看到,解决了问题

Redisson实现分布式锁剖析_Redis_02

模拟高并发下商品超卖问题

  在上个例子中,我们通过了Synchronized锁,成功解决了多个线程争抢导致的超卖问题,但是有个问题,假设后期公司为了保证服务可用性。

将单击的应用,升级称为了集群的模式,那么是否会有超卖问题呢?

模拟两台应用服务器:使用Nginx做负载均衡

1、修改application.properties配置文件,将端口分别修改为8080、8090,并启动应用。

2、下载Nginx:​​http://nginx.org/download/nginx-1.18.0.zip​

3、编辑Nginx配置文件:config/nginx.config

upstream redislock{
server localhost:8080 weight=1;
server localhost:8090 weight=1;
}
server {
listen 80;
server_name localhost;

location / {
root html;
index index.html index.htm;
proxy_pass http://redislock;
}

Redisson实现分布式锁剖析_Redis_03

4、启动nginx

  双击nginx.exe文件即可

5、访问应用:http://localhost/deductStock

就可以看到了IDEA控制台打印了日志。

模拟高并发:使用Jmeter

1、添加线程组

2、添加HTTP请求

3、添加聚合报告

4、执行压测

Redisson实现分布式锁剖析_Redis_04

Redisson实现分布式锁剖析_redis_05

Redisson实现分布式锁剖析_spring_06

Redisson实现分布式锁剖析_Redis_07

 IDEA控制台输出内容:

  可以发现,通过Synchronized没有锁住,还是出现了超卖的问题,Synchronized只在JVM进程内部有效,也就是一个Tomcat有效。

如果在一个分布式环境下,我们要控制一段资源的争抢问题,应该怎么做呢?

Redisson实现分布式锁剖析_Redis_08

使用Redis命令解决并发环境下超卖问题

  在上个例子中,模拟了在高并发环境下的商品超卖问题,那么应该如何通过Redis来解决这个问题呢?

setnx命令:

将key的值设置为value,当且仅当key不存在的情况下。

若给定的key已经存在,则setnx不做任何动作。

Redis简单命令式分布式锁

@RequestMapping("/deductStock")
public String deductStock() {
// 分布式锁名称,随便什么都可以,重要的是两个应用要共享这一个Redis的key
String lockKey = "lockDeductStock";
// redis实现基础版的分布式锁
Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
// 加锁不成功,返回给前端错误码,前端给用户友好提示
if (Boolean.FALSE.equals(lockResult)) {
log.info("系统繁忙,请稍后再试!");
return "系统繁忙,请稍后再试!";
}
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
// 业务执行完成,删除这个锁
redisTemplate.delete(lockKey);
return "end";
}

修改Jmeter参数  

最开始设置的Jmeter参数Ramp-up时间是0,为了明显看到效果,这里修改成2,表示2s发送完毕这些800个请求

Redisson实现分布式锁剖析_redis_09

测试并发访问 

Redisson实现分布式锁剖析_Redis_10

使用Redis做分布式锁时候出现的问题?

锁永久失效问题

  假如我们在setnx分布式锁的时候,正常执行,但是如果代码执行到业务逻辑地方的时候。

这时候8080程序挂掉了,那么这时候这个key就无法删除,另外一个8090服务器发现这个分布式锁的key还在,就在那边排队等着

给用户提示“系统繁忙,请稍后再试!”。显然这是不合理的。

  如果我们在程序挂了,但是这个key是一直存在Redis中的,其他的用户访问这个请求也进不到后面的业务处理逻辑代码中去,应该怎么办呢?那么这时候就需要使用到了redis原生API:

即使程序挂掉了,没有删除key,10S钟后,这个key也会自动被删除。

// redis实现基础版的分布式锁
Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey,30, TimeUnit.SECONDS);

  但是这一个代码,虽然比上个分布式超卖的问题优化了一点点,但是还是有问题的,比如说,

  • 问题1:如果程序的执行时间,大于了锁的释放时间。这就会导致程序在最后删除的key为空,显然这个失效时间确实不容易写的比较“合理”。  
  • 问题2:8090给8080程序的锁释放了,如果9090这个程序进来了,拿到了锁,但是8090的锁无法释放,导致了锁一直失效,这也是导致了超卖问题。

如何解决线程之间锁释放错了?

  上个问题,问题2锁释放错了应该如何解决呢?通过UUID解决,每次释放锁的时候,判断是不是当前请求的UUID,如果是则可以正常释放锁。如果不是,则释放锁失败!

@RequestMapping("/deductStock")
public String deductStock() {
// 分布式锁名称,随便什么都可以,重要的是两个应用要共享这一个Redis的key
String lockKey = "lockDeductStock";
// 分布式锁的值
String lockValue = UUID.randomUUID().toString().replaceAll("-", "");
try {
// redis实现基础版的分布式锁,设置了key的过期时间
Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
// 加锁不成功,返回给前端错误码,前端给用户友好提示
if (Boolean.FALSE.equals(lockResult)) {
log.info("系统繁忙,请稍后再试!");
return "系统繁忙,请稍后再试!";
}
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
} finally {
// 判断是不是当前请求的UUID,如果是则可以正常释放锁。如果不是,则释放锁失败!
if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.delete(lockKey);
}
}
return "end";
}

-------------------------------------------------使用Redis实现分布式锁有两个问题:----------------------------------------------------------------------------------------------

分别是:

  • 1、锁超时问题
  • 2、锁被误释放问题

在上个代码中我们通过UUID解决了锁被误释放的问题,还有一个锁超时的问题应该如何解决呢?

  写一个定时任务,分线程每隔十秒去查看一次主线程是否持有这把锁,如果这个锁存在,重新将这个锁的超时时间设置为30S,对锁续命~

当然自己写上面的代码肯定无比麻烦的,当前市面上有很多开源框架已经帮助我们封装好了这些逻辑!

使用redisson解决超卖问题

  Redission来帮我们实现锁续命的分布式锁。

@RequestMapping("/deductStock")
public String deductStock() {
// 分布式锁名称,随便什么都可以,重要的是两个应用要共享这一个Redis的key
String lockKey = "lockDeductStock";
// 获取锁对象
RLock redissonLock = redissonClient.getLock(lockKey);
try {
// 加锁
// 类似于Redis的一条setnx命令:Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
redissonLock.lock();
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
} finally {
// 释放锁
redissonLock.unlock();
}
return "end";
}

Redisson分布式锁实现原理:

 

Redisson实现分布式锁剖析_Redis_11

 Redisson分布式锁底层

 Lua脚本

if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);

Redisson实现分布式锁剖析_Redis_12

 



标签:Redisson,String,Redis,redis,剖析,import,org,redisTemplate,分布式
From: https://blog.51cto.com/zhangzhixi/5956383

相关文章

  • 深入剖析ThreadLocal
    想必很多朋友对ThreadLocal并不陌生,今天我们就来一起探讨下ThreadLocal的使用方法和实现原理。首先,本文先谈一下对ThreadLocal的理解,然后根据ThreadLocal类的源码分析了其实......
  • .NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构一)--学习笔记
    目录为什么我们用OrleansDaprVSOrleansActor模型Orleans的核心概念为什么我们用Orleans分布式系统开发、测试的难度(服务发现、通信)运维的复杂度(伸缩性与可靠性的保障)a......
  • 【分布式系列】- 分布式id生成有几种选择
    什么是分布式系统ID在复杂分布式系统中,我们系统是分布式部署,往往需要在分布式环境中对大量的数据和消息进行唯一标识,这里的唯一就得要求ID不能重复。当我们对数据......
  • 使用zookeeper实现分布式锁时的注意点
    一.指定一个空的Watcher实现,可以消除控制台的错误1Zookeeperzk=newZookeeper("ip:port",timeout,newWatcher(){2@Override3process(WatcherEventwatc......
  • 分布式 | dble 沿用 jumpstringhash,移除 Mycat 一致性 hash 原因解析
    作者:爱可生开源社区背景MyCat对于字符串类型为分片字段的数据,有三种分片模式,分别是:模值hash(求模法),jumpstringhash(跳跃法),一致性hash(环割法)dble对于hash算法选取方面,除......
  • 分布式 | DBLE 之 SQL 解析
    作者:路路热爱技术、乐于分享的技术人,目前主要从事数据库相关技术的研究。数据库中间件与数据库有什么区别?个人认为除了没做数据存储,其他的功能数据库中间件几乎一样不少,比......
  • 分布式 | 从 dble 日志分析到 MySQL 源码学习
    作者:袁琳铸爱可生DBLE团队开发成员,主要负责DBLE需求开发,故障排查和社区问题解答。背景在客户的生产环境中,dble.log时常出现nohandler日志。虽然没有影响客户业务的......
  • 分布式 | DBLE 新全局表检查实现浅析
    作者:孙正方爱可生DBLE核心研发成员,拥有丰富的分布式数据库中间件开发、咨询以及调优经验,擅长数据库中间件问题排查和处理,对线上中间件部分排错有深入的实践与认知。背景......
  • 分布式 | DBLE 是如何实现视图的?
    作者:苏仕祥浩鲸科技PaaS组件团队成员,长期从事分库分表中间件的相关解决方案工作,热爱技术,乐于分享。本文来源:原创投稿*爱可生开源社区出品,原创内容未经授权不得随意使用,转......
  • 分布式id的方案
    1简介在分布式系统架构中,通常会涉及到分布式全局唯一ID的生成在复杂分布式系统中,往往需要对大量的数据和消息进行唯一标识。如在金融、电商、支付、等产品......