目录
- Redis能够为我们解决什么问题
- 常用五大数据类型
- Redis的订阅与发布
- Redis的新数据类型
- Redis_Jedis连接Redis进行操作
- Redis整合SpringBoot
- Redis的事物
- Redis之持久化
- 主从复制
- 哨兵模式
- Redis集群
- Redis应用问题及解决方案
Redis能够为我们解决什么问题
-
减轻CPU和内存压力
-
减轻IO压力
-
访问redis数据库是直接从内存中读取数据,比直接进行IO读取速度要快的多
-
适用场景
- 对数据高并发的读写
- 海量数据的读写
- 对数据高扩展的
-
不适用场景
- 需要事物支持的
- 处理复杂的关系需要即时查询的
-
用不着SQL和用了SQL也解决不了的问题时可以考虑使用NOSQL的数据库,例如Redis
Redis的下载与安装
-
将
redis-6.2.1.tar.gz
压缩包放在Linux系统的/opt
目录下,并完成解压 -
需要安装gcc环境
yum install gcc
-
进入到解压目录,使用
make
命令进行编译,编译完成后使用make install
命令安装- 如果中途出现了 —Jemalloc/jemalloc.h:没有那个文件 问题,可以make distclean
- 尝试安装gcc后再次尝试
-
不出问题的话就已经安装好了,其安装目录在:
/usr/local/bin
前台启动(不推荐)与后台启动
-
前台启动,就是直接进入/usr/local/bin下执行redis-server即可(不推荐)
-
复制一个
/opt/redis解压目录/下的一个redis.conf
文件到/etc目录
下 -
修改/etc/redis.conf配置文件,将
daemonize no
的参数设置为yes
,大概250行左右 -
进入到
/usr/local/bin
目录下,执行redis-server /etc/redis.conf
即可完成后台启动
常用五大数据类型
Redis键常用命令(key)
命令 | 作用 | 示例 |
---|---|---|
keys * | 查看所有的key | keys * |
exists key | 判断某个key是否存在 | exists k1 |
type key | 查看某个key的类型 | type k1 |
del key | 删除某个key | del k1 |
unlink key | 删除某个key,但是会调用异步线程 | unlink k2 |
expire key second | 设置某个key的过期时间 | expire k1 10 |
ttl key | 查看某个key的过期时间,-1用不过期,-2已过期 | ttl k1 |
4个数据库操作命令
命令 | 作用 | 示例 |
---|---|---|
select 数据库编号 | 切换到指定的数据库默认为0号数据库 | select 10 |
dbsize | 查看当前数据库有多少个key | dbsize |
flushdb | 清空当前数据库 | flushdb |
flushall | 清空所有数据库 | flushall |
String字符串命令
命令 | 作用 | 示例 |
---|---|---|
set key value | 设置键值 | set k1 v1 |
get key | 获取某个key | get k1 |
setnx key value | 设置键值、如果key已存在则设置失败 | setnx k1 v1 |
mset key1 value1 key2 value2 | 批量设置键值 | mset k3 v3 k4 v4 k5 v5 |
msetnx | 批量设置键值,任意一个key已存在则全部设置失败 | msetnx k8 v8 k1 v1 |
setex key second value | 设置键值并指定过期时间,单位为秒 | setex k1 10 v1 |
append key value | 在原有字符串中追加value | append k1 nihao |
setrange key startIndex value | 在字符串指定位置设置value,会覆盖原有字符串的范围内容 | setrange k1 1 abc |
getrange key startIndex endIndex | 获取指定范围内的value(包含头和尾) | getrange k1 1 3 |
getset key value | 获取原有的值,并设置新的值 | getset k1 zhangsan |
strlen key | 获取值得长度 | strlen k1 |
incr key | 将key中存储到数字+1 | incr k1 |
incr by key 步长 | 将key中存储到数字加步长 | incrby k1 10 |
decr key | 将key中存储到数字-1 | decr k2 |
decr by key 步长 | 将key中存储到数字减步长 | decrby k1 10 |
String的内存结构
- SDS(Simple Dynamic String)简单动态字符串,结构上类似于Jva的ArrayList,预分配一些内存空间,避免频繁扩容
List类型命令(单键多值)
命令 | 作用 | 示例 |
---|---|---|
lpush key v1 v2 v3 | 从左边插入一个或多个值 | lpush k1 a b c d e |
rpush key v1 v2 v3 | 从右边插入一个或多个值 | rpush k2 a b c d e |
lpop key n | 从左边取出n个值并删除 | lpop k1 2 |
rpop key n | 从右边取出n个值并删除 | rpop k1 2 |
rpoplpush source dest | 从一个列表的右边取出一个值添加到另一个列表的左边 | rpoplpush k1 k2 |
lrange key 0 -1 | 按照索引下标获得元素,0,-1代表获取全部 | lrange k2 0 -1 |
lindex key index | 按照索引下标获取元素,索引从0开始 | lindex k2 1 |
llen | 获得列表长度 | llen k2 |
linsert key before value newValue | 在某个value值的前或后添加一个元素(若有多个,则插入到从左边开始找到的第一个) | linsert k2 before c zhangsan |
lrem key n value | 从左边删除n个相同的元素value | lrem k2 2 value3 |
lset key index value | 将列表中索引位置的值替换成指定的值 | lset k2 0 wangwu |
List列表类型数据结构
Set集合命令(member指的是Set集合key中的元素)
命令 | 作用 | 示例 |
---|---|---|
sadd key value1 value2 valve3 | 将一个或多个member值添加到集合当中 | sadd k1 v1 v2 v3 |
smembers key | 取出一个集合的所有值 | smembers k1 |
sismember key value | 判断集合key中是否还有某个value值,0为没有,1为有 | sismember k1 v1 |
scard key | 返回该集合的元素个数 | scard k1 |
srem key value1 value2 | 删除集合中的某些元素 | scard k1 |
spop key n | 随机从该集合中取出n个值并删除 | spop k1 2 |
srandmember key n | 随机从集合中取出多个值但不删除 | srandmember k1 2 |
smove key1 key2 value | 把集合中一个值移动到另一个集合中 | smove k1 k2 v3 |
sinter key1 key2 | 返回2个集合的交集 | sinter k1 k2 |
sunion key1 key2 | 返回2个集合的并集 | sunion k1 k2 |
sdiff key1 key2 | 返回2个集合的差集 | sdiff k1 k2 |
Set集合数据结构
- 底层其实是一个value为null(内部值)的hash表,所以增删改的时间复杂度为O(1)
hash数据类型常用命令
命令 | 作用 | 示例 |
---|---|---|
hset key fieldname fieldValue | 将一个或多个值添加到hash当中 | hset k1 name zhangsan age 18 |
hsetnx key fieldname fieldValue | 将一个值添加到hash当中,如果field已存在则添加失败 | hsetnx k1 birthday 2000-10-10 |
hget key fieldname | 取出hash中的一个元素,通过字段名 | hget k1 name |
hexist key fieldname | 查看hash中,指定字段是否存在 | hexists k1 age |
hkeys key | 列出hash中所有的fieldname | hkeys k1 |
hvals key | 列出hash中所有的fieldvalue | hvals k1 |
hincrby key field increment | 为hash中的某个字段进行数值增加或减少 | hincrby k1 age -5 |
hash数据结构
-
field-name的长度较短并且字段较少时,使用ziplist
-
否则使用hashtable,也就是哈希表
-
其内存结构跟Java中的HashMap差不多
Zset常用命令(带分数排序的Set集合)
命令 | 作用 | 示例 |
---|---|---|
zadd key score1 value1 score2 value2 | 将一个或多个member值添加到zset集合中 | zadd k1 100 java 200 c++ 300 c# |
zrange key 0 -1 [withscores] | 返回下标在start_end之间的元素(默认不包括分数) | zrange k1 0 1 withscores |
zrangebyscore key min max [withscores] | 返回分数在min ~ max之间的元素 | zrangebyscore k1 200 300 withscores |
zrevrangebyscore key max min [withscores] | 同上,但是需要降序排列,而且是max~min | zrevrangebyscore k1 300 200 withscores |
zincrby key increment value | 为元素的score加分或减分 | zincrby k1 -300 java |
zrem key value | 删除该集合下指定值的一个或多个元素 | zrem k1 c++ c# |
zcount key min max | 统计该集合分数区间内的元素个数 | zcount k1 100 300 |
zrand key value | 返回一个元素在集合中的排名,从0开始 | zrank k1 java |
zrandmember key n | 返回zset中指定个数的元素(按照排名) | zrandmember k1 2 |
Zset数据结构
-
底层首先是一个hash表(Map<String,Double>)
-
并且还存在跳跃表
-
生成的文件与命令中:运行命令的路径有关
Redis配置文件介绍
###Units###
- 配置单位大小,开头定义了一些基本的度量单位,只支持bytes,并且大小写不敏感,不支持bit(一个字节8位)
# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
###INCLUDES###
- 包含,类似于JSP或者Thymeleaf中的include。例如可以包含一些配置文件
# include /path/to/local.conf
# include /path/to/other.conf
###NETWORK###
- 网络配置,其中常用的几个配置如下(如下配置均为默认,暂未修改)
# 标志当前可以访问的IP地址,如果想要IP都能访问,可以直接将其注释掉
bind 127.0.0.1 -::1
# 保护模式,如果开启了,那么在没有bind,redis连接也没有密码时,redis将会只接收本机的响应
protected-mode yes
# 服务的端口号
port 6379
# 是完成了TCP三次握手以及未完成TCP三次握手的连接队列
# Linux内核会将这个值减少到vim /proc/sys/net/core/somaxconn的值(128)
# 如果真的需要增加连接队列数量,则需要修改vim /proc/sys/net/core/somaxconn
# 和 /proc/sys/net/ipv4/tcp_max_syn_backlog
tcp-backlog 511
# 超时时间,如果客户端连接到redis超过这个时间没有进行过任何操作(空闲时间),
# 则中断连接,0表示永不超时
timeout 0
# 对访问客户端的心跳检测,单位为秒,(判断客户端是否存活),建议设置为60
tcp-keepalive 300
###GENERAL###
- 一些通用的配置
# 是否以后台运行redis
daemonize yes
# 记录当前redis启动的线程ID,只会存储当前运行redis服务的线程ID
# 如果redis服务关闭,会将该文件删除
pidfile /var/run/redis_6379.pid
# 日志级别,debug -> verbose -> notice -> warning
loglevel notice
# 日志文件的名称
logfile ""
# 数据库个数,从0开始
databases 16
###SECURITY###
- 与安全相关的配置,设置密码(永久设置)
# 设置当前redis的密码
requirepass foobared
-
设置密码后需要授权才能操作redis数据库
-
临时设置密码
# 查看当前密码
config get requirepass
# 设置密码
config set requirepass "123456"
# 授权
auth 123456
###CLIENTS###
- 最大的客户端连接数量,如果超过了此数量,会返回:max number of clients reached(已达到最大连接数)
maxclients 10000
###MEMORY MANAGEMENT###
- 内存管理常用配置
# 设置Redis可以使用的内存容量,建议**必须设置**,**否则内存占满后将会造成服务器宕机**
maxmemory <bytes>
# 达到最大内存容量的移除key策略。
# volatile-lru:使用LRU算法移除key,只对设置了过期时间的键;(最近最少使用)
# allkeys-lru:在所有集合key中,使用LRU算法移除key
# volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键
# allkeys-random:在所有集合key中,移除随机的key
# volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key
# noeviction:不进行移除。针对写操作,只是返回错误信息
maxmemory-policy noeviction
# 设置样本数量,LRU算法和最小TTL算法都并非是精确的数量
# 一般设置3到7的数字,数值越小样本越不精确,但性能消耗越小
maxmemory-samples 5
Redis的订阅与发布
什么是订阅与发布(频道)
-
想想生活中的例子,我们订阅了一个频道,那么这个频道有消息的时候就会通知到我们
-
其实程序中的订阅与发布也是如此。
-
需要接收到消息的一方(订阅者)订阅某个通道,发送消息(发布者)的一方就通过这个通道来发送消息,因此订阅者就可以接收到消息
开启一个Redis客户端,成为订阅者订阅一个或多个频道
# 连接redis
redis-cli
# 订阅多个频道
subscribe channel1 channel2
开启一个Redis客户端,成为发布者,在某个频道发布消息
publish channel1 helloredis
订阅者收到消息
Redis的新数据类型
Bitmaps
-
作用:统计用户活跃量
-
setbit
-
getbit
-
bitcount
-
bitop
HyperLogLog
-
作用:基数统计,例如独立访客,不允许重复
-
pfadd
-
pfcount
-
pfmerge
Geospatial
-
作用:统计经纬度,还能计算距离
-
geoadd 添加
-
geopos 获取指定地区的坐标值
-
geodist 获取直线距离
-
georadius 在距离范围内的
Redis_Jedis连接Redis进行操作
修改redis.conf并且开放Linux的端口号
- 修改redis.conf
# 注释掉bind
#bind 127.0.0.1 -::1
# 关闭保护模式
protected-mode no
- 开放端口6379
# 永久开放端口6379
firewall-cmd --permanent --add-port=6379/tcp
# 重启防火墙
systemctl restart firewalld.service
- 可以使用telnet ip地址 端口 来测试是否可以连接上
telnet 192.168.22.100 6379
创建一个Maven工程,引入Jedis的依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
简单测试一下,其实方法跟命令行的差不多
public class JedisTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.22.100", 6379);
// 1、操作string的常用方法
jedis.set("k1","v1");
jedis.get("k1");
jedis.setnx("k1","v1");
jedis.mset("k1","v1","k2","v2");
jedis.msetnx("k1","v1","k2","v2");
jedis.setex("k1", 3, "v1");
jedis.append("k1", "111");
jedis.setrange("k1", 1, "zhang");
jedis.getrange("k1", 0 , 3);
jedis.getSet("k1","v1");
jedis.strlen("k1");
jedis.incr("k1");
jedis.incrBy("k1", 3);
jedis.decr("k1");
jedis.decrBy("k1", 3);
// 2、操作List、Set、Hash、Zset的方式均与命令行敲命令时一致
// 3、关闭连接
jedis.close();
}
}
练习使用Jedis完成一个手机验证码功能
-
模拟如下功能
1、输入手机号,点击发送后随机生成6位数字码,2分钟有效
2、输入验证码,点击验证,返回成功或失败
3、每个手机号每天只能输入3次 -
思路:
-
首先方法中都必须做的事情
- (1)定义好手机号、验证码所对应的key名
-
发送验证码的思路(接收手机号)
- (1)判断当前手机号是否已经在redis中存在
- (1.1)如果不存在,则进行存储,将其数量设置为1,并设置过期时间为次日凌晨,也就是24小时减当前时间
- (1.2)如果已存在,判断是否小于3,如果满足则代表发送验证码没有超过3次,为其发送验证码,否则告知每日发送验证码的次数不能超过3次
- (2)获取验证码
- (3)将验证码存储到Redis当中并设置超时时间为2分钟
- (1)判断当前手机号是否已经在redis中存在
-
检验验证码的思路(接收手机号、验证码)
- (1)根据当前的手机号、验证码拼接到对应的key
- (2)通过对应的key去Redis中获取数据并进行响应判断即可
-
定义发送验证码的方法
- (1)6次for循环
- (2)每次都拼接一个1~9的随机数即可
- (3)随机数由Random类生成
-
-
实际编码
public class RedisCode {
public static void main(String[] args) throws Exception{
// 发送验证码
sendCode("15577778888");
// 校验验证码
boolean b = verifyCode("15577778888", "735315");
System.out.println(b);
}
public static boolean verifyCode(String phone, String code) {
Jedis jedis = new Jedis("192.168.22.100", 6379);
// 1、定义手机号对应的验证码的key
String codeKey = "Verify:" + phone + ":code";
// 2、从redis中获取验证码
String redisCode = jedis.get(codeKey);
// 3、校验验证码是否已经失效
if (redisCode == null || "".equals(redisCode)) {
System.out.println("当前验证码失效,请重新获取");
jedis.close();
return false;
}
// 4、进行验证码的校验
if(redisCode.equals(code)) {
System.out.println("验证码校验成功");
jedis.close();
return true;
}
// 5、都走到这了,说明没有校验成功
System.out.println("验证码校验失败");
jedis.close();
return false;
}
public static void sendCode(String phone) {
Jedis jedis = new Jedis("192.168.22.100", 6379);
// 1、定义手机号对应的key
String phoneKey = "Verify:" + phone + ":qt";
String codeKey = "Verify:" + phone + ":code";
// 2、去Redis中查询是否含有该key对应的value
String phoneValue = jedis.get(phoneKey);
// 3、校验是否为null
if(phoneValue == null) {
// 3.1 在redis中设置值,明天凌晨重置
// 获取到当前的时间戳
long nowTimeStamp = Instant.now().toEpochMilli();
// 获取到明天凌晨的时间戳
long tomorrowTimeStamp = LocalDateTime.now().plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0)
.toInstant(ZoneOffset.of("+8")).toEpochMilli();
// 计算获得现在到明天凌晨的秒数
int second = (int) ((tomorrowTimeStamp - nowTimeStamp) / 1000);
// 往redis当中存储数据
jedis.setex(phoneKey,second,"1");
}else if (Integer.parseInt(phoneValue) < 3) {
// 3.2 这个时候代表今天已经给他发送过验证码了,并且次数没有达到3次,次数加一
jedis.incr(phoneKey);
}else {
// 3.3 说明次数已经达到3次了
System.out.println("今日发送验证码的次数已经达到3次,明天再来吧");
jedis.close();
return;
}
// 4、发送验证码,假装已经发送了
String code = getCode();
System.out.println("当前验证码是:" + code);
// 5、存储到redis中,设置过期时间为2分钟
jedis.setex(codeKey, 60 * 2 ,code);
jedis.close();
}
public static String getCode() {
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < 6; i++) {
sb.append(random.nextInt(10));
}
return sb.toString();
}
}
Redis整合SpringBoot
创建一个SpringBoot工程并引入redis启动器和所需的pool2
<!-- 引入一个web模块,用于测试RedisTemplate -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
修改配置文件
#Redis服务器地址
spring.redis.host=192.168.22.100
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒),这里是30分钟
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
添加Redis的配置类(自动配置的不够我们用,所以需要自行扩展一下)
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
使用示例
@RestController
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/testRedis")
public Object testRedis() {
// 1、string的操作类
ValueOperations strOperations = redisTemplate.opsForValue();
// 2、list的操作类
ListOperations listOperations = redisTemplate.opsForList();
// 3、Set的操作类
SetOperations boundSetOperations = redisTemplate.opsForSet();
// 4、hash的操作类
HashOperations hashOperations = redisTemplate.opsForHash();
// 5、zset的操作类
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
return "hello spring-boot-starter-redis";
}
}
Redis的事物
Redis事物的定义
-
Redis的事物并不支持ACID
-
Redis事物是一个单独的隔离操作:事物中所有的命令都会序列化、按顺序的执行。事物在执行的过程中,不会被其他客户端发送来的请求所打断。
-
Redis中事物的主要作用就是串联多个命令防止别的命令插队。
Multi、Exec、Discard
-
在执行了Multi命令之后,输入的命令都会被加入到一个队列(组)当中,但是不会执行。(组队阶段)
-
当执行Exec命令后,队列中的命令将会按照顺序执行,执行的过程中不会被其他客户端的请求所打断。因为这是一个单独的隔离操作。(执行阶段)
-
在Multi命令的中途如果想要放弃当前的队列,则可以执行discard命令。
组队阶段与执行阶段出现错误时
-
当组队阶段出现了错误时,那么只要执行了exec进入执行阶段时,所有组队的命令都不会被执行。
-
当执行阶段出现错误时:指令依然会按照顺序执行,成功的就成功,失败的就失败,并没有原子性。
Redis对于事物冲突的解决方案(乐观锁)
-
Redis中使用watch来监视某个key,依次来达到乐观锁的效果,在修改被监视的key后,key的版本号会发生改变,因此当再次修改时,若没有获取到最新的数据,则会导致更新失败。
-
watch的监视需要执行在 multi之前
使用示例(2台客户端)
- 客户端1号
# 开启乐观锁监视k1
watch k1
# 开启事物
multi
- 客户端2号
# 开启乐观锁监视k1
watch k1
# 开启事物
multi
- 客户端1号修改了k1的值并执行
# 修改k1的值
set k1 clientOne
# 执行
exec
- 客户端2号也尝试修改k1的值并执行
set k1 clientTwo
exec
# 返回的结果,表示修改失败
(nil)
Watch配合Redis事物的总结
-
当一个客户端使用wath开始了key的监视(可以监视1个或多个key)后
-
那么当前客户端开启的事物,只要在最后exec执行之前
-
unwatch: 取消对key的监控。(如果已经执行了exec或discard后则会自动取消)
-
发现watch监视的任何一个key发生了变化后,则会导致当前的事物失效(所有的指令全都无法执行)
Redis事物的三特性
-
单独的隔离操作
- 当执行了exec后,将会把队列中的指令序列化,按照顺序的执行这些指令,在此期间不会被其他客户端的请求打断。
-
没有隔离级别的概念
- 队列中的命令在没有进行exec之前都不会被执行,只是放在队列当中。
-
不保证原子性
- 事物中如果有一条命令执行失败,并不会导致其他命令回滚
Redis事物秒杀案例
-
要求
- 同一个用户最多只能秒杀成功一次
-
实现思路(库存使用string存,秒杀成功的人数使用set存储)
- (1)接收到客户端发来的用户名与商品ID
- (2)根据商品ID去Redis中查询是否为null
- (2.1)如果为null代表秒杀还没有开始
- (2.2)如果不为null,则判断当前的用户是否已经秒杀过了,若已经秒杀过则直接提示后结束当前方法
- (3)根据第二步查询到的商品库存判断当前库存容量
- (3.1)如果库存容量够,则秒杀成功的列表中加入当前用户ID,然后库存 -1 ,告知秒杀成功
- (3.2)如果库存不够,则提示秒杀已结束
-
编码实现
@PostMapping("/secKill")
public void secKill(String productId) {
Jedis jedis = new Jedis("192.168.22.100", 6379);
// 模拟不同的用户,随机4位用户ID
String userId = getRandomUserId();
// 1、拼接当前的key
String productCountKey = "sk:" + productId + ":qt";
String successSetKey = "sk:" + productId + ":user";
// 2、判断当前是否开始了秒杀
Integer productCount = Integer.parseInt(jedis.get(productCountKey));
// 2.1 判断当前秒杀是否开始
if (productCount == null) {
System.out.println(new Result(false, "秒杀还未开始"));
return;
}
// 3、判断当前用户是否已经秒杀过了
if (jedis.sismember(successSetKey, userId)) {
System.out.println(new Result(false, "您已经秒杀成功过了,不能再秒杀了"));
return;
}
// 4、判断当前库存是否已经没有了
if(productCount <= 0) {
System.out.println(new Result(false, "非常抱歉,秒杀已经结束了"));
return;
}
// 5、库存 - 1,秒杀成功的用户列表加上当前用户
jedis.decr(productCountKey);
jedis.sadd(successSetKey, userId);
jedis.close();
System.out.println(new Result(true, "恭喜你秒杀成功"));
}
private String getRandomUserId() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 4; i++) {
sb.append(new Random().nextInt(10));
}
return sb.toString();
}
Linux系统中安装压力测试工具httpd-tools
yum install httpd-tools
ab命令的使用示例
-
在任意目录创建一个需要传递的参数文件: vim /opt/postfile
-
修改其中的内容,放上需要传递的参数,以&结尾
productId=1010&
- 输入如下指令完成压力测试
# 2000个线程,存在200个并发。注意ip地址和端口号别写错了
ab -n 2000 -c 200 -k -p /opt/postfile -T application/x-www-form-urlencoded http://192.168.31.71:8080/secKill
如上编码出现的问题
连接超时问题
-
采用连接池,之后用连接池来获取Jedis
-
连接池编码
public class JedisPoolUtils {
private volatile static JedisPool jedisPool = null;
public static JedisPool getInstance() {
if(jedisPool == null) {
synchronized (JedisPoolUtils.class) {
if(jedisPool == null) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 最大连接
jedisPoolConfig.setMaxTotal(200);
// 最大空闲
jedisPoolConfig.setMaxIdle(32);
// 最大等待时间
jedisPoolConfig.setMaxWaitMillis(100 * 1000);
// 表示当pool中的jedis 实例都被分配完时,是否要进行阻塞
jedisPoolConfig.setBlockWhenExhausted(true);
// 每次获取连接时候都要到数据库验证连接有效性
jedisPoolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(jedisPoolConfig, "192.168.22.100");
}
}
}
return jedisPool;
}
}
超卖问题
-
原因:由于整个流程判断,以及对数据的操作都是多线程的,会导致多个线程同时判断到了库存大于0而往下执行了,这多个线程往往超出实际的库存量,因此出现超卖问题
-
解决思路:使用Redis的乐观锁机制来解决事物问题
-
修改后的代码
@PostMapping("/secKill2")
public void secKillNew(String productId) {
Jedis jedis = new Jedis("192.168.22.100", 6379);
// 模拟不同的用户,随机4位用户ID
String userId = getRandomUserId();
// 1、拼接当前的key
String productCountKey = "sk:" + productId + ":qt";
String successSetKey = "sk:" + productId + ":user";
// -- 解决超卖1: 监视库存是否发生变化
jedis.watch(productCountKey);
// 2、判断当前是否开始了秒杀
Integer productCount = Integer.parseInt(jedis.get(productCountKey));
// 2.1 判断当前秒杀是否开始
if (productCount == null) {
System.out.println(new Result(false, "秒杀还未开始"));
return;
}
// 3、判断当前用户是否已经秒杀过了
if (jedis.sismember(successSetKey, userId)) {
System.out.println(new Result(false, "您已经秒杀成功过了,不能再秒杀了"));
return;
}
// 4、判断当前库存是否已经没有了
if(productCount <= 0) {
System.out.println(new Result(false, "非常抱歉,秒杀已经结束了"));
return;
}
// 解决超卖2: 开启事物,使如下命令编程串行执行
Transaction multi = jedis.multi();
// 5、库存 - 1,秒杀成功的用户列表加上当前用户
multi.decr(productCountKey);
multi.sadd(successSetKey, userId);
// 解决超卖3: 执行组队好了的指令
List<Object> execResult = multi.exec();
if (execResult == null || execResult.size() == 0) {
System.out.println("秒杀失败了");
jedis.close();
return;
}
jedis.close();
System.out.println(new Result(true, "恭喜你秒杀成功"));
}
如上虽解决了超卖问题,但是又出现了库存遗留问题
-
明明已经显示秒杀结束了,但是却还有库存
-
出现的原因:因为乐观锁问题,导致大部分线程在抢到了商品而进行库存减少时,都失败了。
-
使用lua脚本来实现事务的控制,当使用lua脚本时,redis执行单个脚本是无法被其他客户端的请求所中断的。
-
脚本如下
local productId=KEYS[1];
local userId=KEYS[2];
local productCountKey="sk:"..productId..":qt";
local successSetKey="sk:"..productId..":user";
local userExists=redis.call("sismember",successSetKey,userId);
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,productCountKey);
if tonumber(num)<=0 then
return 0;
else
redis.call("decr",productCountKey);
redis.call("sadd",successSetKey,userId);
end
return 1;
- 修改后的代码如下
static String str = "local productId=KEYS[1];\n" +
"local userId=KEYS[2]; \n" +
"local productCountKey=\"sk:\"..productId..\":qt\";\n" +
"local successSetKey=\"sk:\"..productId..\":user\"; \n" +
"local userExists=redis.call(\"sismember\",successSetKey,userId);\n" +
"if tonumber(userExists)==1 then \n" +
" return 2;\n" +
"end\n" +
"local num= redis.call(\"get\" ,productCountKey);\n" +
"if tonumber(num)<=0 then \n" +
" return 0; \n" +
"else \n" +
" redis.call(\"decr\",productCountKey);\n" +
" redis.call(\"sadd\",successSetKey,userId);\n" +
"end\n" +
"return 1;\n";
@PostMapping("/secKill3")
public void secKillNew3(String productId) {
Jedis jedis = JedisPoolUtils.getInstance().getResource();
// 模拟不同的用户,随机4位用户ID
String userId = getRandomUserId();
// 加载Lua脚本并执行
String sha1 = jedis.scriptLoad(str);
Object obj = jedis.evalsha(sha1, 2, productId, userId);
String result = String.valueOf(obj);
if("2".equals(result)) {
System.out.println("您已经秒杀过了,不能再次秒杀");
}else if("0".equals(result)) {
System.out.println("库存已经没有了,秒杀结束了~");
}else {
System.out.println("秒杀成功~");
}
// 一定要关闭连接
jedis.close();
}
Redis之持久化
RDB持久化
RDB文件备份的流程
-
Redis在进行RDB文件备份时,会单独创建一个Fork进程来进行数据的持久化
-
Fork进程会先将数据写到一个临时的文件中,当持久化结束后,再将该临时文件替换掉原本的dump.rdb文件(又称为写时复制技术),这是出于数据的完整性考虑,不然如果直接往磁盘上备份,突然宕机将会导致数据的不完整性。(一般情况父进程和子进程会共用同一段物理内存)
-
在这个过程中,redis的主进程是不会进行任何IO操作的,因此如果进行大规模的数据恢复,RDB是很不错的选择
-
不过也有个弊端,RDB最后一次持久化后的数据可能会丢失。
配置文件中关于RDB的问题、如何触发RDB快照(保持策略)
# rdb备份的文件名
dbfilename dump.rdb
# 备份文件的路径,与AOF共享。 该路径指的是启动redis-server 的路径
# 如果在当前路径直接执行redis-server,则是当前路径
# 如果是 /local/usr/bin/redis-server的话,则为/local/usr/bin目录下
dir ./
# 默认当 3600秒之内有一个key改变了,则进行一次持久化
# 当300秒之内有100个key改变了,则进行一次持久化
# 当60秒之内有10000个key改变了,则进行一次持久化,按照间隔时间来
# Redis会在后台异步进行快照工作,快照的同时还能响应客户端请求
# 当周期的间隔时间到了时会自动触发bgsave自动保存
save 3600 1
save 300 100
save 60 10000
# 当redis无法写入磁盘的时候,直接关闭redis的写操作,推荐yes
stop-writes-on-bgsave-error yes
# redis会采用LZF算法进行压缩
rdbcompression yes
# 检查快照的完整性,推荐yes
rdbchecksum yes
RDB是如何完成数据恢复的 ###SNAPSHOTTING###
- 当redis服务启动时,则会按照配置文件中所设置的备份文件的路径、文件名自动完成数据恢复的工作
2个命令(停止和查看最后一次备份时间)
-
通过lastsave命令可以查看最后一次快照时间
-
redis-cli config set save "",禁用保存策略
RDB的优势与劣势
-
优势
- 恢复数据快
- 节省磁盘空间
- 适合大规模的数据恢复
- 对数据完整性要求不高更适合使用
-
劣势
- 每次fork都会写一次临时文件,导致2倍的膨胀性
- 虽然redis在fork时使用了写时复制技术,但是如果数据库庞大还是比较消耗性能
- RDB是在备份周期的间隔时间做一次备份,如果redis意外的down掉的话,将会损失最后一次持久化后的数据。
AOF持久化
AOF持久化的流程
-
客户端只要执行了写的命令,那么该命令就会被追加到AOF缓冲区中
-
AOF的缓冲区根据AOF的持久化策略来决定何时写入到磁盘的AOF备份文件当中
-
当AOF文件的大小超过重写策略或手动重写时,会对AOF文件进行rewrite重写,压缩AOF文件容量
-
Redis启动时,会自动的加载AOF文件,执行AOF文件中的写指令,以达到数据恢复的目的
配置文件中的AOF文件 ###APPEND ONLY MODE###
# 是否开启AOF功能
appendonly no
# AOF文件的名称
appendfilename "appendonly.aof"
# AOF文件的路径跟RDB文件的路径一致
dir ./
# AOF在缓冲区时的同步策略, always 每次写入时都直接同步到磁盘。everysec 每秒。
# no 不主动进行同步操作,由操作系统决定何时同步
appendfsync everysec
与重写相关的配置
# 如果设置为yes,重写时数据将只写入缓存,不写入aof文件,性能更高但可能导致数据丢失
# 设置为no,则会把数据往磁盘里刷,将会导致主线程处于阻塞状态
no-appendfsync-on-rewrite=yes
# 重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)
auto-aof-rewrite-percentage 100
# 设置重写的基准值,当文件大小达到该值后开始重写
auto-aof-rewrite-min-size 64mb
例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写?100MB
系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,
如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。
当AOF与RDB同时存在时,优先选用谁?
- 当AOF与RDB同时存在时,Redis会使用AOF作为数据恢复的文件
AOF文件修复命令
redis-check-aof--fix appendonly.aof
AOF的优势和劣势
-
优势
- 备份机制更加文件,丢失数据的概率更低
- 可读的日志文本,通过操作AOF文件,可以处理误操作
-
劣势
- 比起RDB更加耗费磁盘空间
- 恢复、备份的速度较慢
- 每次读写都进行同步的话,有一定的性能压力,因为不断的IO
- 存在个别bug将会导致恢复不能
使用建议
-
当更追求速度且对数据完整性要求不高,可以考虑使用RDB
-
对数据完整性要求高时,可以考虑AOF
-
官方推荐是2个都开启
-
不建议单独使用AOF,可能会出现bug
主从复制
- 主机负责读,从机负责写,一般都是一主多从
能够为我们解决的问题
-
容灾快速恢复
-
读写分离,性能扩展
采用模拟的方式完成主从复制,实现一主多从
在/opt文件下创建一个myredis文件夹,将redis.conf文件复制到这修改成公共的配置文件
-
mkdir /opt/myredis
-
cp /etc/redis.conf /opt/myredis/
-
修改配置文件
- 1、bind ip地址绑定注释掉
- 2、protected no 关闭保护模式
- 3、daemonize yes 开启后台进程
创建3个配置文件,分别为redis6379.conf、redis6380.conf、redis6381.conf
- redis6379.conf
include /opt/myredis/redis.conf
pidfile /opt/myredis/redis6379.pid
port 6379
dbfilename dump6379.rdb
- redis6380.conf
include /opt/myredis/redis.conf
pidfile /opt/myredis/redis6380.pid
port 6380
dbfilename dump6380.rdb
- redis6381.conf
include /opt/myredis/redis.conf
pidfile /opt/myredis/redis6381.pid
port 6381
dbfilename dump6381.rdb
启动3个redis并查看当前进程
# 执行这些命令的时候,都是在/opt/myredis下进行的
redis-server redis6379.conf
redis-server redis6380.conf
redis-server redis6381.conf
使用3个xshell分别连接到不同的redis服务,并查看当前的服务器状态
redis-cli -p 6379
redis-cli -p 6380
redis-cli -p 6381
# 查看服务器的主从复制信息
info replication
操作2个从机,使用命令使其连接上主机后再查看3台从机的主从信息
- 在2个从机分别执行如下命令,由于我3台都在本机,所以是127.0.0.1
slaveof 127.0.0.1 6379
-
主机6379
-
从机6380
-
从机6381
主从复制的几个特点
一主二仆
-
主机可以进行读写的操作
-
从机只能进行读的操作
- 当从机与主机建立上关系后,会向主机发送一个sync命令,让主机同步数据文件过来,完成数据的同步
- 之后就是主机自动发请求过来,从机接收。
- 只有第一次连接时,是从机主动发请求让主机发送数据
-
综上所述,主机只要进行了写的操作,从机也可以拿到数据
薪火相传
-
由于每次主机写数据的时候,都需要向所有的从机发送数据进行同步,那么主机的压力就会不断增大
-
因此可以这样:主机底下永远只有几个从机,但是从机又是其他服务器的主机
-
类似于领导下有2个直系管理的人员,而这2个人员又是其他人的管理者
-
这里演示一下,6381从机将其主机设置为6380
127.0.0.1:6381> slaveof localhost 6380
-
此时我们看看6380的主从复制信息
-
此时当6379再进行写操作时,就只会向6380发送数据了。而6380再向6381发送数据来进行同步
-
有一个问题需要注意:就是当6380宕机了,竟会导致6381也无法同步到数据。
反客为主
-
在一主一从、或者一主多从的时候,如果主机宕机了,那么从机可以反客为主晋升为主机
- 从机手动执行:
slaveof no one
命令,晋升为主机
- 从机手动执行:
-
如果从机不反客为主,那么当挂掉的主机重启时,他们的关系依然是主从关系。
-
而当之前的主机再次开机时,会发现自己还是主机,但是那台反客为主的从机已经没了。
复制原理
-
当从机连接上主机后,会向主机发送一个sync命令,然后主机将会被整个的数据文件发送给从机以此来完成数据同步
-
而仅仅是第一次连接时,是从机向主机发送同步命令,之后就全都由主机来主动完成同步
-
全量复制: 每次重新连接上主机时,都会进行一次全量复制
-
增量复制: 已经连接上了主机后,由主机主动发送过来的同步数据为增量复制
哨兵模式
-
可以理解为反客为主的升级版
-
可以监视主从服务器的状态,当主机宕机时,会根据策略选取一名从机来充当主机
-
而当主机再次上线时,会发现自己变成了从机。
创建哨兵启动时所需的配置文件
-
在/opt/myredis/下创建一个sentinel.conf配置文件
-
修改其中的内容
# mymaster为监控对象起的服务器名称,1 为至少有多少个哨兵同意迁移的数量
# 啥意思呢?意思是如果需要迁移主机,只需要一个及一个以上哨兵同意即可
# 开启了哨兵模式,那么所有的服务器都成了哨兵
sentinel monitor mymaster 127.0.0.1 6379 1
启动哨兵,开启后的默认端口号为26379
redis-sentinel /opt/myredis/sentinel.conf
-
此时当主机宕机时,比如关闭掉6379这台主机
-
此时发现我们的哨兵进程已经监控到,并且完成了容灾的处理
哨兵选举新主机的策略
-
(1)根据每台服务器配置文件中的
slave-priority
配置来决定,数值越小优先级越高 -
(2)选择偏移量最大的(指的是哪个从机复制的数据最多最全的)
-
(3)选择runid最小的从机,每个redis服务在运行时都会随机生成一个40位的runid
哨兵模式小总结
-
当监控线程发现了主机宕机后,会选取一个从机晋升为主机
-
并且将之前主机之下的从机切换成新晋升的主机
-
当之前的主机再次上线时,发现自己也变成了新主机的从机
在Java程序中使用哨兵模式
- 修改一下之前配置的线程池,改成JedisSetinelPool
public class JedisPoolUtils {
private volatile static JedisSentinelPool jedisPool = null;
public static JedisSentinelPool getInstance() {
if(jedisPool == null) {
synchronized (JedisPoolUtils.class) {
if(jedisPool == null) {
// 创建一个set集合,保存哨兵线程的 ip和端口号
Set<String> sentinelSet=new HashSet<>();
sentinelSet.add("192.168.22.100:26379");
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 最大连接
jedisPoolConfig.setMaxTotal(200);
// 最大空闲
jedisPoolConfig.setMaxIdle(32);
// 最大等待时间
jedisPoolConfig.setMaxWaitMillis(100 * 1000);
// 表示当pool中的jedis 实例都被分配完时,是否要进行阻塞
jedisPoolConfig.setBlockWhenExhausted(true);
// 每次获取连接时候都要到数据库验证连接有效性
jedisPoolConfig.setTestOnBorrow(true);
jedisPool = new JedisSentinelPool("mymaster", sentinelSet,jedisPoolConfig);
}
}
}
return jedisPool;
}
}
Redis集群
集群提供了什么好处
-
Redis集群实现了对Redis的水平扩容,实现多台服务器分担客户端请求压力
-
Redis中的集群将会有一个slots插槽值,集群中的每个节点都刚好能存储数据的 1/N,这个N指的是节点的数量。而插槽slots则规定了整个集群能够存储数据的插槽范围,每个节点都会有对应的那一段范围。 在存储一个key时,会计算该key对应的插槽位置,存储到指定的节点。
-
并且Redis为我们提供了 无中心化集群 配置,使得集群中的任何一个节点都能够访问到其他的节点,也就是客户端请求服务时,即便请求的服务是节点B提供的,访问节点A也可以获取到节点B的提供的服务。
制作6个实例,分别实现刚好能让他们一主一从,3个节点
-
(1)复制原先/conf/redis.conf 文件到 /opt/myredis/ 目录下
-
(2)修改该配置文件
- (2.1)开启后台启动
- (2.2)关闭ip绑定
- (2.3)关闭保护模式 -
(3)创建6个配置文件,分别为redis6379.conf、redis6380.conf、redis6381.conf、redis6389.conf、redis6390.conf、redis6391.conf
- (3.1)配置基本信息(pidfile、port、dump.rdb、log日志文件名、aof功能)
- (3.2)配置集群信息(打开集群模式、设置节点配置文件名称、设置节点失联时间,超时自动进行主从切换)
- 提示:可以使用
%s/6379/6380
这样的命令来一次性替换
# 引入redis配置文件
include /opt/myredis/redis.conf
# 当前服务的进程id存放地址
pidfile "/opt/myredis/redis6379.pid"
port 6379
# rdb备份文件名称
dbfilename "dump6379.rdb"
# 后台启动
daemonize yes
# 关闭保护模式
protected-mode no
# 开启集群功能
cluster-enabled yes
# 设置集群的节点配置文件名
cluster-config-file nodes-6379.conf
# 配置节点失联时间,这里是毫秒,换算成秒就是15秒
cluster-node-timeout 15000
启动6个redis服务并查看进程
redis-server redis6379.conf
redis-server redis6380.conf
redis-server redis6381.conf
redis-server redis6389.conf
redis-server redis6390.conf
redis-server redis6391.conf
将6个节点合并成一个集群
-
进入到之前解压redis的目录:
cd /opt/redis-6.2.1/src/
-
执行如下命令
# --replicas 1采用最简单的方式配置集群
redis-cli --cluster create --cluster-replicas 1 192.168.22.100:6379 192.168.22.100:6380 192.168.22.100:6381 192.168.22.100:6389 192.168.22.100:6390 192.168.22.100:6391
中途会有点提示,询问是否按照默认分配的主机与从机进行集群配置。
- 最后还能看到插槽的数量是16384个,下面最大是16383是因为,插槽是从0开始算的
- 6379这个节点的插槽是:0~5460
- 6380节点:5461~10922
- 6381节点:10923~16383
连接客户端开始使用(采用集群方式连接)
-
如果不采用集群方式连接,存储数据时,如果key计算的插槽不在当前节点,则会导致出错
-
因此应该采用集群的方式连接客户端
redis-cli -c -p 6379
-
此时由于无中心化集群配置的原因,设置key和获取key对应的值时,都会根据key所对应的插槽使当前连接的客户端连接到指定节点
集群的故障恢复以及相关配置
-
综上的配置,当前的集群一共有3个主机,3个从机
-
如果一个主机宕机,那么15秒内从机将会晋升为主机,当主机再次上线时,就变成了从机
-
redis的配置文件中有如下配置
# 如果该配置设置为yes,那么如果某个节点的主机和从机全部挂掉时,整个集群都会直接挂掉
# 如果设置为no,那么剩下的主从节点将会继续提供它们插槽范围内的服务
cluster-require-full-coverage yes
Jedis的集群开发
- 使用JedisCluster工具来完成集群操作(别忘了关闭Linux的防火墙,或者开放端口)
public class JedisClusterTest {
public static void main(String[] args) {
// 注意:访问任何一个节点都是可以的,因为是无中心化集群
JedisCluster jedisCluster = new JedisCluster(
new HostAndPort("192.168.22.100", 6391));
jedisCluster.set("k3", "wangming");
String result = jedisCluster.get("k3");
System.out.println(result);
}
}
集群的好处和不足
-
好处
- 减轻了单台服务器的压力
- 实现扩容
- 无中心化配置比较简单
-
不足
- 多键操作是不被支持的,虽然可以靠着分组完成,但是并不方便
- 多键的Redis事务是不被支持的,lua脚本不被支持
Redis应用问题及解决方案
缓存穿透
-
问题描述
- (1)服务器的压力突然剧增
- (2)访问的接口全部都无法靠缓存处理,key对应的数据源并不存在
- (3)访问的请求甚至是无效请求
- (4)一般遇到这种情况都是黑客攻击,可以考虑直接报网警
-
解决方案
- (1)对空值进行缓存。如果查询返回的数据为null,将其缓存到redis数据库中,设置极短的过期时间,一般不超过5分钟(应急方案)
- (2)设置可以访问的白名单。使用bitMaps定义一个可以访问的名单,名单id作为bitMaps的偏移量,每次访问时都通过bitMaps来查询是否在白名单范围内
- (3)采用布隆过滤器
- (4)进行实时监控,当发现Redis的缓存命中率急剧下降时,需要排查访问对象,与运维人员配合。
- (5)直接报警。
缓存击穿
-
问题描述
- (1)服务器的压力突然增加
- (2)此时缓存服务器中的某个热门key突然过期了
- (3)而大量的请求都在访问这个热门key,最终导致大量请求都在一瞬间访问服务器,导致服务器压力过大直接宕机。
-
解决方案
- (1)预先设置好一些热门数据
- (2)实时调整:现场监控哪些热门数据,实时调整key的过期时间
- (3)使用锁的方式。
- 当缓存失效时,不是立即去访问服务器
- 使用某些操作成功会带返回值的操作,例如redis的setnx,来设置一个互斥的key
- 当操作返回成功的时候,再进行访问服务器的操作,并回头设置缓存,最后删除互斥key
- 当操作返回失败,证明已经有线程正在load db,可以当线程睡眠一会后直接get整个缓存
缓存雪崩
-
问题描述
- (1)服务器的压力突然增加
- (2)缓存服务器中的多个key在同一时间过期
- (3)而刚好在这时有大量的客户端请求发送过来,并且大部分都没有办法命中缓存
- (4)因此全都会去访问服务器,最终服务器不堪重负宕机
-
解决方案
- (1)构建多级缓存结构: nginx缓存 + redis缓存 + 其他缓存(ehcache等)
- (2)使用锁或队列:不适用大量并发情况
- (3)设置过期标识更新缓存
- 记录缓存是否过期(设置提前量),如果过期会触发通知另外的线程完成对缓存的更新。
- (4)将缓存的过期时间分散开来。
- 比如我们可以在原有缓存的失效时间基础上添加一个随机值,比如1~5分钟,这样就可以减少大量的缓存在同一时间过期的概率。
分布式锁
-
指的是在不同的机器提供的服务,需要实现一把锁对所有机器提供服务的控制,为了解决这个问题,就需要一种技术来实现跨JVM的互斥机制来控制共享资源的访问。上锁之后对所有机器都有效,这就叫做分布式锁
-
解决方案
- (1)基于数据库实现分布式锁
- (2)基于缓存(Redis等)
- (3)给予Zookeeper
-
优缺点
- (1)性能: redis性能最高
- (2)可靠性: Zookeeper可靠性最高
使用Redis实现分布式锁
redis命令:# set sku:1:info "OK" NX PX 10000
-
EX second:
设置键的过期时间为秒 -
PX millisecond
:设置键的过期时间为毫秒 -
NX
:只有当键不存在时,才能对键进行操作 -
XX
:只有当键已经存在时,才对键进行操作
实现redis中num数字的增加,靠setnx完成
@RestController
public class NumRedisLockController {
@Autowired
private RedisTemplate redisTemplate;
// 1、压力测试前: 先在redis当中设置string类型的 num = 0
@RequestMapping("numLockTest")
public void numLockTest() {
// 2、上锁
if (redisTemplate.opsForValue().setIfAbsent("numLock", "ok")){
// 执行num + 1
redisTemplate.opsForValue().increment("num");
// 3、解锁
redisTemplate.delete("numLock");
}else {
try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
// 重试
numLockTest();
}
}
}
优化之添加锁的过期时间
-
为什么要这么做?
- 因为如果一个线程上了锁之后,在里面发生了异常,可能最终导致都无法解锁,那么其他线程将会一直等待。
-
实现代码
@RestController
public class NumRedisLockController {
@Autowired
private RedisTemplate redisTemplate;
// 1、压力测试前: 先在redis当中设置string类型的 num = 0
@RequestMapping("numLockTest")
public void numLockTest() {
// 2、上锁 ,设置过期时间
if (redisTemplate.opsForValue().setIfAbsent("numLock", "ok", 1, TimeUnit.SECONDS)){
// 执行num + 1
redisTemplate.opsForValue().increment("num");
// 3、解锁
redisTemplate.delete("numLock");
}else {
try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
// 重试
numLockTest();
}
}
}
优化之添加UUID防止误删除
-
为什么会出现这种问题
- 线程A刚要执行解锁操作时,锁的过期时间到了,导致锁过期了。
- 这个时候如果线程B拿到锁进入了方法体,将会导致A释放了线程B的锁。
-
实现方案
@RestController
public class NumRedisLockController {
@Autowired
private RedisTemplate redisTemplate;
// 1、压力测试前: 先在redis当中设置string类型的 num = 0
@RequestMapping("numLockTest")
public void numLockTest() {
// 获取UUID
String uuid = UUID.randomUUID().toString();
// 2、上锁 ,设置过期时间
if (redisTemplate.opsForValue().setIfAbsent("numLock", uuid, 1, TimeUnit.SECONDS)){
// 执行num + 1
redisTemplate.opsForValue().increment("num");
// 3、解锁
if (uuid.equals((String)redisTemplate.opsForValue().get("numLock"))) {
redisTemplate.delete("numLock");
}
}else {
try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
// 重试
numLockTest();
}
}
}
优化之使用lua脚本保证的原子性
-
问题分析
- 线程A判断成功了当前锁对应的value值确实是自己的uuid
- 刚准备执行删除操作的时候,锁过期时间到了
- 与此同时,线程B拿到了锁
- 此时线程A将会把线程B的锁给释放。
- 因此得出结论,需要让进行删除标识判断以及删除的操作保持原子性
-
创建一个lua脚本
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('del', KEYS[1])
else
return 0
end
- 使用示例
@RestController
public class NumRedisLockController {
@Autowired
private RedisTemplate redisTemplate;
// 1、压力测试前: 先在redis当中设置string类型的 num = 0
@RequestMapping("numLockTest")
public void numLockTest() {
// 获取UUID
String uuid = UUID.randomUUID().toString();
String lockKey = "numLock";
// 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid, 1, TimeUnit.SECONDS);
if (lock) {
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().increment("num");
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 创建一个Redis脚本文件
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long ,指的是redisTemplate.execute返回的实际类型
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
Long execute = (Long)redisTemplate.execute(redisScript, Arrays.asList(lockKey), uuid);
System.out.println(execute == 1 ? "执行成功": "执行失败");
}else {
try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
// 重试
numLockTest();
}
}
}
标签:JAVA,conf,Redis,redis,k1,从零开始,jedis,key
From: https://www.cnblogs.com/itdqx/p/16688133.html