首页 > 数据库 >JAVA入门基础_从零开始的培训_Redis

JAVA入门基础_从零开始的培训_Redis

时间:2022-09-20 15:56:38浏览次数:132  
标签:JAVA conf Redis redis k1 从零开始 jedis key

目录

Redis能够为我们解决什么问题

  • 减轻CPU和内存压力

  • 减轻IO压力

  • 访问redis数据库是直接从内存中读取数据,比直接进行IO读取速度要快的多

  • 适用场景

    • 对数据高并发的读写
    • 海量数据的读写
    • 对数据高扩展的
  • 不适用场景

    • 需要事物支持的
    • 处理复杂的关系需要即时查询的
  • 用不着SQL和用了SQL也解决不了的问题时可以考虑使用NOSQL的数据库,例如Redis

Redis的下载与安装

  • 下载地址

  • redis-6.2.1.tar.gz压缩包放在Linux系统的/opt目录下,并完成解压

  • 需要安装gcc环境
    yum install gcc

  • 进入到解压目录,使用make命令进行编译,编译完成后使用make install命令安装

    • 如果中途出现了 —Jemalloc/jemalloc.h:没有那个文件 问题,可以make distclean
    • 尝试安装gcc后再次尝试
  • 不出问题的话就已经安装好了,其安装目录在:/usr/local/bin

前台启动(不推荐)与后台启动

  • 前台启动,就是直接进入/usr/local/bin下执行redis-server即可(不推荐)

  • 复制一个/opt/redis解压目录/下的一个redis.conf文件到/etc目录

  • 修改/etc/redis.conf配置文件,将daemonize no 的参数设置为yes,大概250行左右

  • 进入到/usr/local/bin目录下,执行redis-server /etc/redis.conf即可完成后台启动

常用五大数据类型

Redis键常用命令(key)

命令 作用 示例
keys * 查看所有的key keys *
exists key 判断某个key是否存在 exists k1
type key 查看某个key的类型 type k1
del key 删除某个key del k1
unlink key 删除某个key,但是会调用异步线程 unlink k2
expire key second 设置某个key的过期时间 expire k1 10
ttl key 查看某个key的过期时间,-1用不过期,-2已过期 ttl k1

4个数据库操作命令

命令 作用 示例
select 数据库编号 切换到指定的数据库默认为0号数据库 select 10
dbsize 查看当前数据库有多少个key dbsize
flushdb 清空当前数据库 flushdb
flushall 清空所有数据库 flushall

String字符串命令

命令 作用 示例
set key value 设置键值 set k1 v1
get key 获取某个key get k1
setnx key value 设置键值、如果key已存在则设置失败 setnx k1 v1
mset key1 value1 key2 value2 批量设置键值 mset k3 v3 k4 v4 k5 v5
msetnx 批量设置键值,任意一个key已存在则全部设置失败 msetnx k8 v8 k1 v1
setex key second value 设置键值并指定过期时间,单位为秒 setex k1 10 v1
append key value 在原有字符串中追加value append k1 nihao
setrange key startIndex value 在字符串指定位置设置value,会覆盖原有字符串的范围内容 setrange k1 1 abc
getrange key startIndex endIndex 获取指定范围内的value(包含头和尾) getrange k1 1 3
getset key value 获取原有的值,并设置新的值 getset k1 zhangsan
strlen key 获取值得长度 strlen k1
incr key 将key中存储到数字+1 incr k1
incr by key 步长 将key中存储到数字加步长 incrby k1 10
decr key 将key中存储到数字-1 decr k2
decr by key 步长 将key中存储到数字减步长 decrby k1 10

String的内存结构

  • SDS(Simple Dynamic String)简单动态字符串,结构上类似于Jva的ArrayList,预分配一些内存空间,避免频繁扩容

List类型命令(单键多值)

命令 作用 示例
lpush key v1 v2 v3 从左边插入一个或多个值 lpush k1 a b c d e
rpush key v1 v2 v3 从右边插入一个或多个值 rpush k2 a b c d e
lpop key n 从左边取出n个值并删除 lpop k1 2
rpop key n 从右边取出n个值并删除 rpop k1 2
rpoplpush source dest 从一个列表的右边取出一个值添加到另一个列表的左边 rpoplpush k1 k2
lrange key 0 -1 按照索引下标获得元素,0,-1代表获取全部 lrange k2 0 -1
lindex key index 按照索引下标获取元素,索引从0开始 lindex k2 1
llen 获得列表长度 llen k2
linsert key before value newValue 在某个value值的前或后添加一个元素(若有多个,则插入到从左边开始找到的第一个) linsert k2 before c zhangsan
lrem key n value 从左边删除n个相同的元素value lrem k2 2 value3
lset key index value 将列表中索引位置的值替换成指定的值 lset k2 0 wangwu

List列表类型数据结构

image

Set集合命令(member指的是Set集合key中的元素)

命令 作用 示例
sadd key value1 value2 valve3 将一个或多个member值添加到集合当中 sadd k1 v1 v2 v3
smembers key 取出一个集合的所有值 smembers k1
sismember key value 判断集合key中是否还有某个value值,0为没有,1为有 sismember k1 v1
scard key 返回该集合的元素个数 scard k1
srem key value1 value2 删除集合中的某些元素 scard k1
spop key n 随机从该集合中取出n个值并删除 spop k1 2
srandmember key n 随机从集合中取出多个值但不删除 srandmember k1 2
smove key1 key2 value 把集合中一个值移动到另一个集合中 smove k1 k2 v3
sinter key1 key2 返回2个集合的交集 sinter k1 k2
sunion key1 key2 返回2个集合的并集 sunion k1 k2
sdiff key1 key2 返回2个集合的差集 sdiff k1 k2

Set集合数据结构

  • 底层其实是一个value为null(内部值)的hash表,所以增删改的时间复杂度为O(1)

hash数据类型常用命令

命令 作用 示例
hset key fieldname fieldValue 将一个或多个值添加到hash当中 hset k1 name zhangsan age 18
hsetnx key fieldname fieldValue 将一个值添加到hash当中,如果field已存在则添加失败 hsetnx k1 birthday 2000-10-10
hget key fieldname 取出hash中的一个元素,通过字段名 hget k1 name
hexist key fieldname 查看hash中,指定字段是否存在 hexists k1 age
hkeys key 列出hash中所有的fieldname hkeys k1
hvals key 列出hash中所有的fieldvalue hvals k1
hincrby key field increment 为hash中的某个字段进行数值增加或减少 hincrby k1 age -5

hash数据结构

  • field-name的长度较短并且字段较少时,使用ziplist

  • 否则使用hashtable,也就是哈希表

  • 其内存结构跟Java中的HashMap差不多

Zset常用命令(带分数排序的Set集合)

命令 作用 示例
zadd key score1 value1 score2 value2 将一个或多个member值添加到zset集合中 zadd k1 100 java 200 c++ 300 c#
zrange key 0 -1 [withscores] 返回下标在start_end之间的元素(默认不包括分数) zrange k1 0 1 withscores
zrangebyscore key min max [withscores] 返回分数在min ~ max之间的元素 zrangebyscore k1 200 300 withscores
zrevrangebyscore key max min [withscores] 同上,但是需要降序排列,而且是max~min zrevrangebyscore k1 300 200 withscores
zincrby key increment value 为元素的score加分或减分 zincrby k1 -300 java
zrem key value 删除该集合下指定值的一个或多个元素 zrem k1 c++ c#
zcount key min max 统计该集合分数区间内的元素个数 zcount k1 100 300
zrand key value 返回一个元素在集合中的排名,从0开始 zrank k1 java
zrandmember key n 返回zset中指定个数的元素(按照排名) zrandmember k1 2

Zset数据结构

  • 底层首先是一个hash表(Map<String,Double>)

  • 并且还存在跳跃表

  • 生成的文件与命令中:运行命令的路径有关

Redis配置文件介绍

###Units###

  • 配置单位大小,开头定义了一些基本的度量单位,只支持bytes,并且大小写不敏感,不支持bit(一个字节8位)
# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes

###INCLUDES###

  • 包含,类似于JSP或者Thymeleaf中的include。例如可以包含一些配置文件
# include /path/to/local.conf
# include /path/to/other.conf

###NETWORK###

  • 网络配置,其中常用的几个配置如下(如下配置均为默认,暂未修改)
# 标志当前可以访问的IP地址,如果想要IP都能访问,可以直接将其注释掉
bind 127.0.0.1 -::1

# 保护模式,如果开启了,那么在没有bind,redis连接也没有密码时,redis将会只接收本机的响应
protected-mode yes

# 服务的端口号
port 6379

# 是完成了TCP三次握手以及未完成TCP三次握手的连接队列
# Linux内核会将这个值减少到vim /proc/sys/net/core/somaxconn的值(128)
# 如果真的需要增加连接队列数量,则需要修改vim /proc/sys/net/core/somaxconn 
# 和 /proc/sys/net/ipv4/tcp_max_syn_backlog
tcp-backlog 511

# 超时时间,如果客户端连接到redis超过这个时间没有进行过任何操作(空闲时间),
# 则中断连接,0表示永不超时
timeout 0

# 对访问客户端的心跳检测,单位为秒,(判断客户端是否存活),建议设置为60
tcp-keepalive 300

###GENERAL###

  • 一些通用的配置
# 是否以后台运行redis
daemonize yes

# 记录当前redis启动的线程ID,只会存储当前运行redis服务的线程ID
# 如果redis服务关闭,会将该文件删除
pidfile /var/run/redis_6379.pid

# 日志级别,debug -> verbose -> notice -> warning
loglevel notice

# 日志文件的名称
logfile ""

# 数据库个数,从0开始
databases 16

###SECURITY###

  • 与安全相关的配置,设置密码(永久设置)
# 设置当前redis的密码
requirepass foobared
  • 设置密码后需要授权才能操作redis数据库
    image

  • 临时设置密码

# 查看当前密码
config get requirepass

# 设置密码
config set requirepass "123456"

# 授权
auth 123456

###CLIENTS###

  • 最大的客户端连接数量,如果超过了此数量,会返回:max number of clients reached(已达到最大连接数)
maxclients 10000

###MEMORY MANAGEMENT###

  • 内存管理常用配置
# 设置Redis可以使用的内存容量,建议**必须设置**,**否则内存占满后将会造成服务器宕机**
maxmemory <bytes>

# 达到最大内存容量的移除key策略。
# volatile-lru:使用LRU算法移除key,只对设置了过期时间的键;(最近最少使用)
# allkeys-lru:在所有集合key中,使用LRU算法移除key
# volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键
# allkeys-random:在所有集合key中,移除随机的key
# volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key
# noeviction:不进行移除。针对写操作,只是返回错误信息
maxmemory-policy noeviction

# 设置样本数量,LRU算法和最小TTL算法都并非是精确的数量
# 一般设置3到7的数字,数值越小样本越不精确,但性能消耗越小
maxmemory-samples 5

Redis的订阅与发布

什么是订阅与发布(频道)

  • 想想生活中的例子,我们订阅了一个频道,那么这个频道有消息的时候就会通知到我们

  • 其实程序中的订阅与发布也是如此。

  • 需要接收到消息的一方(订阅者)订阅某个通道,发送消息(发布者)的一方就通过这个通道来发送消息,因此订阅者就可以接收到消息

开启一个Redis客户端,成为订阅者订阅一个或多个频道

# 连接redis
redis-cli

# 订阅多个频道
subscribe channel1 channel2

开启一个Redis客户端,成为发布者,在某个频道发布消息

publish channel1 helloredis

订阅者收到消息

image

Redis的新数据类型

Bitmaps

  • 作用:统计用户活跃量

  • setbit

  • getbit

  • bitcount

  • bitop

HyperLogLog

  • 作用:基数统计,例如独立访客,不允许重复

  • pfadd

  • pfcount

  • pfmerge

Geospatial

  • 作用:统计经纬度,还能计算距离

  • geoadd 添加

  • geopos 获取指定地区的坐标值

  • geodist 获取直线距离

  • georadius 在距离范围内的

Redis_Jedis连接Redis进行操作

修改redis.conf并且开放Linux的端口号

  • 修改redis.conf
# 注释掉bind
#bind 127.0.0.1 -::1

# 关闭保护模式
protected-mode no
  • 开放端口6379
# 永久开放端口6379
firewall-cmd --permanent --add-port=6379/tcp

# 重启防火墙
systemctl restart firewalld.service
  • 可以使用telnet ip地址 端口 来测试是否可以连接上
    telnet 192.168.22.100 6379

创建一个Maven工程,引入Jedis的依赖

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.2.0</version>
    </dependency>

简单测试一下,其实方法跟命令行的差不多

public class JedisTest {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("192.168.22.100", 6379);

        // 1、操作string的常用方法
        jedis.set("k1","v1");
        jedis.get("k1");
        jedis.setnx("k1","v1");
        jedis.mset("k1","v1","k2","v2");
        jedis.msetnx("k1","v1","k2","v2");
        jedis.setex("k1",  3, "v1");
        jedis.append("k1", "111");
        jedis.setrange("k1", 1, "zhang");
        jedis.getrange("k1", 0 , 3);
        jedis.getSet("k1","v1");
        jedis.strlen("k1");
        jedis.incr("k1");
        jedis.incrBy("k1", 3);
        jedis.decr("k1");
        jedis.decrBy("k1", 3);
        
        // 2、操作List、Set、Hash、Zset的方式均与命令行敲命令时一致
        
        // 3、关闭连接
        jedis.close();
    }
}

练习使用Jedis完成一个手机验证码功能

  • 模拟如下功能
    1、输入手机号,点击发送后随机生成6位数字码,2分钟有效
    2、输入验证码,点击验证,返回成功或失败
    3、每个手机号每天只能输入3次

  • 思路:

    • 首先方法中都必须做的事情

      • (1)定义好手机号、验证码所对应的key名
    • 发送验证码的思路(接收手机号)

      • (1)判断当前手机号是否已经在redis中存在
        • (1.1)如果不存在,则进行存储,将其数量设置为1,并设置过期时间为次日凌晨,也就是24小时减当前时间
        • (1.2)如果已存在,判断是否小于3,如果满足则代表发送验证码没有超过3次,为其发送验证码,否则告知每日发送验证码的次数不能超过3次
      • (2)获取验证码
      • (3)将验证码存储到Redis当中并设置超时时间为2分钟
    • 检验验证码的思路(接收手机号、验证码)

      • (1)根据当前的手机号、验证码拼接到对应的key
      • (2)通过对应的key去Redis中获取数据并进行响应判断即可
    • 定义发送验证码的方法

      • (1)6次for循环
      • (2)每次都拼接一个1~9的随机数即可
      • (3)随机数由Random类生成
  • 实际编码

public class RedisCode {
    public static void main(String[] args) throws Exception{
        // 发送验证码
        sendCode("15577778888");

        // 校验验证码
        boolean b = verifyCode("15577778888", "735315");
        System.out.println(b);
    }


    public static boolean verifyCode(String phone, String code) {
        Jedis jedis = new Jedis("192.168.22.100", 6379);

        // 1、定义手机号对应的验证码的key
        String codeKey = "Verify:" + phone + ":code";

        // 2、从redis中获取验证码
        String redisCode = jedis.get(codeKey);

        // 3、校验验证码是否已经失效
        if (redisCode == null || "".equals(redisCode)) {
            System.out.println("当前验证码失效,请重新获取");
            jedis.close();
            return false;
        }

        // 4、进行验证码的校验
        if(redisCode.equals(code)) {
            System.out.println("验证码校验成功");
            jedis.close();
            return true;
        }

        // 5、都走到这了,说明没有校验成功
        System.out.println("验证码校验失败");
        jedis.close();
        return false;
    }

    public static void sendCode(String phone) {
        Jedis jedis = new Jedis("192.168.22.100", 6379);
        // 1、定义手机号对应的key
        String phoneKey = "Verify:" + phone + ":qt";
        String codeKey = "Verify:" + phone + ":code";

        // 2、去Redis中查询是否含有该key对应的value
        String phoneValue = jedis.get(phoneKey);

        // 3、校验是否为null
        if(phoneValue == null) {
            // 3.1 在redis中设置值,明天凌晨重置
                // 获取到当前的时间戳
            long nowTimeStamp = Instant.now().toEpochMilli();
                // 获取到明天凌晨的时间戳
            long tomorrowTimeStamp = LocalDateTime.now().plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0)
                    .toInstant(ZoneOffset.of("+8")).toEpochMilli();
            // 计算获得现在到明天凌晨的秒数
            int second = (int) ((tomorrowTimeStamp - nowTimeStamp) / 1000);

            // 往redis当中存储数据
            jedis.setex(phoneKey,second,"1");
        }else if (Integer.parseInt(phoneValue) < 3) {
            // 3.2 这个时候代表今天已经给他发送过验证码了,并且次数没有达到3次,次数加一
            jedis.incr(phoneKey);
        }else {
            // 3.3 说明次数已经达到3次了
            System.out.println("今日发送验证码的次数已经达到3次,明天再来吧");
            jedis.close();
            return;
        }

        // 4、发送验证码,假装已经发送了
        String code = getCode();
        System.out.println("当前验证码是:" + code);

        // 5、存储到redis中,设置过期时间为2分钟
        jedis.setex(codeKey, 60 * 2 ,code);

        jedis.close();
    }

    public static String getCode() {
        StringBuilder sb = new StringBuilder();
        Random random = new Random();

        for (int i = 0; i < 6; i++) {
            sb.append(random.nextInt(10));
        }

        return sb.toString();
    }
}

Redis整合SpringBoot

创建一个SpringBoot工程并引入redis启动器和所需的pool2

    <!-- 引入一个web模块,用于测试RedisTemplate -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Redis启动器 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!-- spring2.X集成redis所需common-pool2-->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>

修改配置文件

#Redis服务器地址
spring.redis.host=192.168.22.100
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒),这里是30分钟
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

添加Redis的配置类(自动配置的不够我们用,所以需要自行扩展一下)

@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }

}

使用示例

@RestController
public class RedisTestController {
    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/testRedis")
    public Object testRedis() {
        // 1、string的操作类
        ValueOperations strOperations = redisTemplate.opsForValue();

        // 2、list的操作类
        ListOperations listOperations = redisTemplate.opsForList();

        // 3、Set的操作类
        SetOperations boundSetOperations = redisTemplate.opsForSet();

        // 4、hash的操作类
        HashOperations hashOperations = redisTemplate.opsForHash();

        // 5、zset的操作类
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();

        return "hello spring-boot-starter-redis";
    }
}

Redis的事物

Redis事物的定义

  • Redis的事物并不支持ACID

  • Redis事物是一个单独的隔离操作:事物中所有的命令都会序列化、按顺序的执行。事物在执行的过程中,不会被其他客户端发送来的请求所打断。

  • Redis中事物的主要作用就是串联多个命令防止别的命令插队

Multi、Exec、Discard

  • 在执行了Multi命令之后,输入的命令都会被加入到一个队列(组)当中,但是不会执行。(组队阶段)

  • 当执行Exec命令后,队列中的命令将会按照顺序执行,执行的过程中不会被其他客户端的请求所打断。因为这是一个单独的隔离操作。(执行阶段)

  • 在Multi命令的中途如果想要放弃当前的队列,则可以执行discard命令。

组队阶段与执行阶段出现错误时

  • 组队阶段出现了错误时,那么只要执行了exec进入执行阶段时,所有组队的命令都不会被执行。

  • 执行阶段出现错误时:指令依然会按照顺序执行,成功的就成功,失败的就失败,并没有原子性

Redis对于事物冲突的解决方案(乐观锁)

  • Redis中使用watch来监视某个key,依次来达到乐观锁的效果,在修改被监视的key后,key的版本号会发生改变,因此当再次修改时,若没有获取到最新的数据,则会导致更新失败。

  • watch的监视需要执行在 multi之前

使用示例(2台客户端)

  • 客户端1号
# 开启乐观锁监视k1
watch k1

# 开启事物
multi
  • 客户端2号
# 开启乐观锁监视k1
watch k1

# 开启事物
multi
  • 客户端1号修改了k1的值并执行
# 修改k1的值
set k1 clientOne

# 执行
exec
  • 客户端2号也尝试修改k1的值并执行
set k1 clientTwo
exec

# 返回的结果,表示修改失败
(nil)

Watch配合Redis事物的总结

  • 当一个客户端使用wath开始了key的监视(可以监视1个或多个key)后

  • 那么当前客户端开启的事物,只要在最后exec执行之前

  • unwatch: 取消对key的监控。(如果已经执行了exec或discard后则会自动取消)

  • 发现watch监视的任何一个key发生了变化后,则会导致当前的事物失效所有的指令全都无法执行

Redis事物的三特性

  • 单独的隔离操作

    • 当执行了exec后,将会把队列中的指令序列化,按照顺序的执行这些指令,在此期间不会被其他客户端的请求打断。
  • 没有隔离级别的概念

    • 队列中的命令在没有进行exec之前都不会被执行,只是放在队列当中。
  • 不保证原子性

    • 事物中如果有一条命令执行失败,并不会导致其他命令回滚

Redis事物秒杀案例

  • 要求

    • 同一个用户最多只能秒杀成功一次
  • 实现思路(库存使用string存,秒杀成功的人数使用set存储

    • (1)接收到客户端发来的用户名与商品ID
    • (2)根据商品ID去Redis中查询是否为null
      • (2.1)如果为null代表秒杀还没有开始
      • (2.2)如果不为null,则判断当前的用户是否已经秒杀过了,若已经秒杀过则直接提示后结束当前方法
    • (3)根据第二步查询到的商品库存判断当前库存容量
      • (3.1)如果库存容量够,则秒杀成功的列表中加入当前用户ID,然后库存 -1 ,告知秒杀成功
      • (3.2)如果库存不够,则提示秒杀已结束
  • 编码实现

    @PostMapping("/secKill")
    public void secKill(String productId) {
        Jedis jedis = new Jedis("192.168.22.100", 6379);

        // 模拟不同的用户,随机4位用户ID
        String userId = getRandomUserId();
        // 1、拼接当前的key
        String productCountKey = "sk:" + productId + ":qt";
        String successSetKey = "sk:" + productId + ":user";

        // 2、判断当前是否开始了秒杀
        Integer productCount = Integer.parseInt(jedis.get(productCountKey));

        // 2.1 判断当前秒杀是否开始
        if (productCount == null) {
            System.out.println(new Result(false, "秒杀还未开始"));
            return;
        }

        // 3、判断当前用户是否已经秒杀过了
        if (jedis.sismember(successSetKey, userId)) {
            System.out.println(new Result(false, "您已经秒杀成功过了,不能再秒杀了"));
            return;
        }

        // 4、判断当前库存是否已经没有了
        if(productCount <= 0) {
            System.out.println(new Result(false, "非常抱歉,秒杀已经结束了"));
            return;
        }

        // 5、库存 - 1,秒杀成功的用户列表加上当前用户
        jedis.decr(productCountKey);
        jedis.sadd(successSetKey, userId);

        jedis.close();
        System.out.println(new Result(true, "恭喜你秒杀成功"));
    }

    private String getRandomUserId() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 4; i++) {
            sb.append(new Random().nextInt(10));
        }
        return sb.toString();
    }

Linux系统中安装压力测试工具httpd-tools

yum install httpd-tools

ab命令的使用示例

  • 在任意目录创建一个需要传递的参数文件: vim /opt/postfile

  • 修改其中的内容,放上需要传递的参数,以&结尾

productId=1010&
  • 输入如下指令完成压力测试
# 2000个线程,存在200个并发。注意ip地址和端口号别写错了
ab -n 2000 -c 200 -k -p /opt/postfile -T application/x-www-form-urlencoded http://192.168.31.71:8080/secKill

如上编码出现的问题

连接超时问题

  • 采用连接池,之后用连接池来获取Jedis

  • 连接池编码

public class JedisPoolUtils {
    private volatile static JedisPool jedisPool = null;

    public static JedisPool getInstance() {
        if(jedisPool == null) {
            synchronized (JedisPoolUtils.class) {
                if(jedisPool == null) {
                    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
                    // 最大连接
                    jedisPoolConfig.setMaxTotal(200);
                    // 最大空闲
                    jedisPoolConfig.setMaxIdle(32);
                    // 最大等待时间
                    jedisPoolConfig.setMaxWaitMillis(100 * 1000);
                    // 表示当pool中的jedis 实例都被分配完时,是否要进行阻塞
                    jedisPoolConfig.setBlockWhenExhausted(true);
                    // 每次获取连接时候都要到数据库验证连接有效性
                    jedisPoolConfig.setTestOnBorrow(true);
                    
                    jedisPool = new JedisPool(jedisPoolConfig, "192.168.22.100");
                }
            }
        }

        return jedisPool;
    }
}

超卖问题

  • 原因:由于整个流程判断,以及对数据的操作都是多线程的,会导致多个线程同时判断到了库存大于0而往下执行了,这多个线程往往超出实际的库存量,因此出现超卖问题

  • 解决思路:使用Redis的乐观锁机制来解决事物问题

  • 修改后的代码

    @PostMapping("/secKill2")
    public void secKillNew(String productId) {
        Jedis jedis = new Jedis("192.168.22.100", 6379);

        // 模拟不同的用户,随机4位用户ID
        String userId = getRandomUserId();

        // 1、拼接当前的key
        String productCountKey = "sk:" + productId + ":qt";
        String successSetKey = "sk:" + productId + ":user";

        // -- 解决超卖1: 监视库存是否发生变化
        jedis.watch(productCountKey);

        // 2、判断当前是否开始了秒杀
        Integer productCount = Integer.parseInt(jedis.get(productCountKey));

            // 2.1 判断当前秒杀是否开始
        if (productCount == null) {
            System.out.println(new Result(false, "秒杀还未开始"));
            return;
        }

        // 3、判断当前用户是否已经秒杀过了
        if (jedis.sismember(successSetKey, userId)) {
            System.out.println(new Result(false, "您已经秒杀成功过了,不能再秒杀了"));
            return;
        }

        // 4、判断当前库存是否已经没有了
        if(productCount <= 0) {
            System.out.println(new Result(false, "非常抱歉,秒杀已经结束了"));
            return;
        }

        // 解决超卖2: 开启事物,使如下命令编程串行执行
        Transaction multi = jedis.multi();

        // 5、库存 - 1,秒杀成功的用户列表加上当前用户
        multi.decr(productCountKey);
        multi.sadd(successSetKey, userId);

        // 解决超卖3: 执行组队好了的指令
        List<Object> execResult = multi.exec();

        if (execResult == null || execResult.size() == 0) {
            System.out.println("秒杀失败了");
            jedis.close();
            return;
        }

        jedis.close();
        System.out.println(new Result(true, "恭喜你秒杀成功"));
    }

如上虽解决了超卖问题,但是又出现了库存遗留问题

  • 明明已经显示秒杀结束了,但是却还有库存

  • 出现的原因:因为乐观锁问题,导致大部分线程在抢到了商品而进行库存减少时,都失败了。

  • 使用lua脚本来实现事务的控制,当使用lua脚本时,redis执行单个脚本是无法被其他客户端的请求所中断的。

  • 脚本如下

local productId=KEYS[1];
local userId=KEYS[2]; 
local productCountKey="sk:"..productId..":qt";
local successSetKey="sk:"..productId..":user"; 
local userExists=redis.call("sismember",successSetKey,userId);
if tonumber(userExists)==1 then 
  return 2;
end
local num= redis.call("get" ,productCountKey);
if tonumber(num)<=0 then 
  return 0; 
else 
  redis.call("decr",productCountKey);
  redis.call("sadd",successSetKey,userId);
end
return 1;
  • 修改后的代码如下
    static String str = "local productId=KEYS[1];\n" +
            "local userId=KEYS[2]; \n" +
            "local productCountKey=\"sk:\"..productId..\":qt\";\n" +
            "local successSetKey=\"sk:\"..productId..\":user\"; \n" +
            "local userExists=redis.call(\"sismember\",successSetKey,userId);\n" +
            "if tonumber(userExists)==1 then \n" +
            "  return 2;\n" +
            "end\n" +
            "local num= redis.call(\"get\" ,productCountKey);\n" +
            "if tonumber(num)<=0 then \n" +
            "  return 0; \n" +
            "else \n" +
            "  redis.call(\"decr\",productCountKey);\n" +
            "  redis.call(\"sadd\",successSetKey,userId);\n" +
            "end\n" +
            "return 1;\n";

    @PostMapping("/secKill3")
    public void secKillNew3(String productId) {
        Jedis jedis = JedisPoolUtils.getInstance().getResource();

        // 模拟不同的用户,随机4位用户ID
        String userId = getRandomUserId();

        // 加载Lua脚本并执行
        String sha1 = jedis.scriptLoad(str);
        Object obj = jedis.evalsha(sha1, 2, productId, userId);

        String result = String.valueOf(obj);

        if("2".equals(result)) {
            System.out.println("您已经秒杀过了,不能再次秒杀");
        }else if("0".equals(result)) {
            System.out.println("库存已经没有了,秒杀结束了~");
        }else {
            System.out.println("秒杀成功~");
        }
		// 一定要关闭连接
        jedis.close();
    }

Redis之持久化

RDB持久化

RDB文件备份的流程

  • Redis在进行RDB文件备份时,会单独创建一个Fork进程来进行数据的持久化

  • Fork进程会先将数据写到一个临时的文件中,当持久化结束后,再将该临时文件替换掉原本的dump.rdb文件(又称为写时复制技术),这是出于数据的完整性考虑,不然如果直接往磁盘上备份,突然宕机将会导致数据的不完整性。(一般情况父进程和子进程会共用同一段物理内存

  • 在这个过程中,redis的主进程是不会进行任何IO操作的,因此如果进行大规模的数据恢复,RDB是很不错的选择

  • 不过也有个弊端,RDB最后一次持久化后的数据可能会丢失。

配置文件中关于RDB的问题、如何触发RDB快照(保持策略)

# rdb备份的文件名
dbfilename dump.rdb

# 备份文件的路径,与AOF共享。 该路径指的是启动redis-server 的路径
# 如果在当前路径直接执行redis-server,则是当前路径
# 如果是 /local/usr/bin/redis-server的话,则为/local/usr/bin目录下
dir ./

# 默认当 3600秒之内有一个key改变了,则进行一次持久化
# 当300秒之内有100个key改变了,则进行一次持久化
# 当60秒之内有10000个key改变了,则进行一次持久化,按照间隔时间来
# Redis会在后台异步进行快照工作,快照的同时还能响应客户端请求
# 当周期的间隔时间到了时会自动触发bgsave自动保存
save 3600 1
save 300 100
save 60 10000

# 当redis无法写入磁盘的时候,直接关闭redis的写操作,推荐yes
stop-writes-on-bgsave-error yes

# redis会采用LZF算法进行压缩
rdbcompression yes

# 检查快照的完整性,推荐yes
rdbchecksum yes

RDB是如何完成数据恢复的 ###SNAPSHOTTING###

  • 当redis服务启动时,则会按照配置文件中所设置的备份文件的路径、文件名自动完成数据恢复的工作

2个命令(停止和查看最后一次备份时间)

  • 通过lastsave命令可以查看最后一次快照时间

  • redis-cli config set save "",禁用保存策略

RDB的优势与劣势

  • 优势

    • 恢复数据快
    • 节省磁盘空间
    • 适合大规模的数据恢复
    • 对数据完整性要求不高更适合使用
  • 劣势

    • 每次fork都会写一次临时文件,导致2倍的膨胀性
    • 虽然redis在fork时使用了写时复制技术,但是如果数据库庞大还是比较消耗性能
    • RDB是在备份周期的间隔时间做一次备份,如果redis意外的down掉的话,将会损失最后一次持久化后的数据。

AOF持久化

AOF持久化的流程

  • 客户端只要执行了写的命令,那么该命令就会被追加到AOF缓冲区中

  • AOF的缓冲区根据AOF的持久化策略来决定何时写入到磁盘的AOF备份文件当中

  • 当AOF文件的大小超过重写策略或手动重写时,会对AOF文件进行rewrite重写,压缩AOF文件容量

  • Redis启动时,会自动的加载AOF文件,执行AOF文件中的写指令,以达到数据恢复的目的

配置文件中的AOF文件 ###APPEND ONLY MODE###

# 是否开启AOF功能
appendonly no

# AOF文件的名称
appendfilename "appendonly.aof"

# AOF文件的路径跟RDB文件的路径一致
dir ./

# AOF在缓冲区时的同步策略, always 每次写入时都直接同步到磁盘。everysec 每秒。 
# no 不主动进行同步操作,由操作系统决定何时同步
appendfsync everysec

与重写相关的配置

# 如果设置为yes,重写时数据将只写入缓存,不写入aof文件,性能更高但可能导致数据丢失
# 设置为no,则会把数据往磁盘里刷,将会导致主线程处于阻塞状态
no-appendfsync-on-rewrite=yes

# 重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)
auto-aof-rewrite-percentage 100

# 设置重写的基准值,当文件大小达到该值后开始重写
auto-aof-rewrite-min-size 64mb

例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写?100MB
系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,
如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。

当AOF与RDB同时存在时,优先选用谁?

  • 当AOF与RDB同时存在时,Redis会使用AOF作为数据恢复的文件

AOF文件修复命令

redis-check-aof--fix appendonly.aof

AOF的优势和劣势

  • 优势

    • 备份机制更加文件,丢失数据的概率更低
    • 可读的日志文本,通过操作AOF文件,可以处理误操作
  • 劣势

    • 比起RDB更加耗费磁盘空间
    • 恢复、备份的速度较慢
    • 每次读写都进行同步的话,有一定的性能压力,因为不断的IO
    • 存在个别bug将会导致恢复不能

使用建议

  • 当更追求速度且对数据完整性要求不高,可以考虑使用RDB

  • 对数据完整性要求高时,可以考虑AOF

  • 官方推荐是2个都开启

  • 不建议单独使用AOF,可能会出现bug

主从复制

  • 主机负责读,从机负责写,一般都是一主多从

能够为我们解决的问题

  • 容灾快速恢复

  • 读写分离,性能扩展

采用模拟的方式完成主从复制,实现一主多从

在/opt文件下创建一个myredis文件夹,将redis.conf文件复制到这修改成公共的配置文件

  • mkdir /opt/myredis

  • cp /etc/redis.conf /opt/myredis/

  • 修改配置文件

    • 1、bind ip地址绑定注释掉
    • 2、protected no 关闭保护模式
    • 3、daemonize yes 开启后台进程

创建3个配置文件,分别为redis6379.conf、redis6380.conf、redis6381.conf

  • redis6379.conf
include /opt/myredis/redis.conf
pidfile /opt/myredis/redis6379.pid
port 6379
dbfilename dump6379.rdb
  • redis6380.conf
include /opt/myredis/redis.conf
pidfile /opt/myredis/redis6380.pid
port 6380
dbfilename dump6380.rdb
  • redis6381.conf
include /opt/myredis/redis.conf
pidfile /opt/myredis/redis6381.pid
port 6381
dbfilename dump6381.rdb

启动3个redis并查看当前进程

# 执行这些命令的时候,都是在/opt/myredis下进行的
redis-server redis6379.conf
redis-server redis6380.conf
redis-server redis6381.conf

image

使用3个xshell分别连接到不同的redis服务,并查看当前的服务器状态

redis-cli -p 6379
redis-cli -p 6380
redis-cli -p 6381

# 查看服务器的主从复制信息
info replication

image

操作2个从机,使用命令使其连接上主机后再查看3台从机的主从信息

  • 在2个从机分别执行如下命令,由于我3台都在本机,所以是127.0.0.1
slaveof 127.0.0.1 6379
  • 主机6379
    image

  • 从机6380
    image

  • 从机6381
    image

主从复制的几个特点

一主二仆

  • 主机可以进行读写的操作

  • 从机只能进行读的操作

    • 当从机与主机建立上关系后,会向主机发送一个sync命令,让主机同步数据文件过来,完成数据的同步
    • 之后就是主机自动发请求过来,从机接收。
    • 只有第一次连接时,是从机主动发请求让主机发送数据
  • 综上所述,主机只要进行了写的操作,从机也可以拿到数据

薪火相传

  • 由于每次主机写数据的时候,都需要向所有的从机发送数据进行同步,那么主机的压力就会不断增大

  • 因此可以这样:主机底下永远只有几个从机,但是从机又是其他服务器的主机

  • 类似于领导下有2个直系管理的人员,而这2个人员又是其他人的管理者

  • 这里演示一下,6381从机将其主机设置为6380

    • 127.0.0.1:6381> slaveof localhost 6380
  • 此时我们看看6380的主从复制信息
    image

  • 此时当6379再进行写操作时,就只会向6380发送数据了。而6380再向6381发送数据来进行同步

  • 有一个问题需要注意:就是当6380宕机了,竟会导致6381也无法同步到数据。

反客为主

  • 在一主一从、或者一主多从的时候,如果主机宕机了,那么从机可以反客为主晋升为主机

    • 从机手动执行:slaveof no one 命令,晋升为主机
  • 如果从机不反客为主,那么当挂掉的主机重启时,他们的关系依然是主从关系。

  • 而当之前的主机再次开机时,会发现自己还是主机,但是那台反客为主的从机已经没了。

复制原理

  • 当从机连接上主机后,会向主机发送一个sync命令,然后主机将会被整个的数据文件发送给从机以此来完成数据同步

  • 而仅仅是第一次连接时,是从机向主机发送同步命令,之后就全都由主机来主动完成同步

  • 全量复制: 每次重新连接上主机时,都会进行一次全量复制

  • 增量复制: 已经连接上了主机后,由主机主动发送过来的同步数据为增量复制

哨兵模式

  • 可以理解为反客为主的升级版

  • 可以监视主从服务器的状态,当主机宕机时,会根据策略选取一名从机来充当主机

  • 而当主机再次上线时,会发现自己变成了从机。

创建哨兵启动时所需的配置文件

  • 在/opt/myredis/下创建一个sentinel.conf配置文件

  • 修改其中的内容

# mymaster为监控对象起的服务器名称,1 为至少有多少个哨兵同意迁移的数量
# 啥意思呢?意思是如果需要迁移主机,只需要一个及一个以上哨兵同意即可
# 开启了哨兵模式,那么所有的服务器都成了哨兵
sentinel monitor mymaster 127.0.0.1 6379 1

启动哨兵,开启后的默认端口号为26379

redis-sentinel /opt/myredis/sentinel.conf

image

  • 此时当主机宕机时,比如关闭掉6379这台主机

  • 此时发现我们的哨兵进程已经监控到,并且完成了容灾的处理
    image

哨兵选举新主机的策略

  • (1)根据每台服务器配置文件中的slave-priority配置来决定,数值越小优先级越高

  • (2)选择偏移量最大的(指的是哪个从机复制的数据最多最全的)

  • (3)选择runid最小的从机,每个redis服务在运行时都会随机生成一个40位的runid

哨兵模式小总结

  • 当监控线程发现了主机宕机后,会选取一个从机晋升为主机

  • 并且将之前主机之下的从机切换成新晋升的主机

  • 当之前的主机再次上线时,发现自己也变成了新主机的从机

在Java程序中使用哨兵模式

  • 修改一下之前配置的线程池,改成JedisSetinelPool
public class JedisPoolUtils {
    private volatile static JedisSentinelPool jedisPool = null;

    public static JedisSentinelPool getInstance() {
        if(jedisPool == null) {
            synchronized (JedisPoolUtils.class) {
                if(jedisPool == null) {
                    // 创建一个set集合,保存哨兵线程的 ip和端口号
                    Set<String> sentinelSet=new HashSet<>();
                    sentinelSet.add("192.168.22.100:26379");

                    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
                    // 最大连接
                    jedisPoolConfig.setMaxTotal(200);
                    // 最大空闲
                    jedisPoolConfig.setMaxIdle(32);
                    // 最大等待时间
                    jedisPoolConfig.setMaxWaitMillis(100 * 1000);
                    // 表示当pool中的jedis 实例都被分配完时,是否要进行阻塞
                    jedisPoolConfig.setBlockWhenExhausted(true);
                    // 每次获取连接时候都要到数据库验证连接有效性
                    jedisPoolConfig.setTestOnBorrow(true);

                    jedisPool = new JedisSentinelPool("mymaster", sentinelSet,jedisPoolConfig);
                }
            }
        }

        return jedisPool;
    }
}

Redis集群

集群提供了什么好处

  • Redis集群实现了对Redis的水平扩容,实现多台服务器分担客户端请求压力

  • Redis中的集群将会有一个slots插槽值,集群中的每个节点都刚好能存储数据的 1/N,这个N指的是节点的数量。而插槽slots则规定了整个集群能够存储数据的插槽范围,每个节点都会有对应的那一段范围。 在存储一个key时,会计算该key对应的插槽位置,存储到指定的节点。

  • 并且Redis为我们提供了 无中心化集群 配置,使得集群中的任何一个节点都能够访问到其他的节点,也就是客户端请求服务时,即便请求的服务是节点B提供的,访问节点A也可以获取到节点B的提供的服务。

制作6个实例,分别实现刚好能让他们一主一从,3个节点

  • (1)复制原先/conf/redis.conf 文件到 /opt/myredis/ 目录下

  • (2)修改该配置文件
    - (2.1)开启后台启动
    - (2.2)关闭ip绑定
    - (2.3)关闭保护模式

  • (3)创建6个配置文件,分别为redis6379.conf、redis6380.conf、redis6381.conf、redis6389.conf、redis6390.conf、redis6391.conf

    • (3.1)配置基本信息(pidfile、port、dump.rdb、log日志文件名、aof功能)
    • (3.2)配置集群信息(打开集群模式、设置节点配置文件名称、设置节点失联时间,超时自动进行主从切换)
    • 提示:可以使用%s/6379/6380这样的命令来一次性替换
# 引入redis配置文件
include /opt/myredis/redis.conf
# 当前服务的进程id存放地址
pidfile "/opt/myredis/redis6379.pid"
port 6379
# rdb备份文件名称
dbfilename "dump6379.rdb"
# 后台启动
daemonize yes
# 关闭保护模式
protected-mode no
# 开启集群功能
cluster-enabled yes
# 设置集群的节点配置文件名
cluster-config-file nodes-6379.conf
# 配置节点失联时间,这里是毫秒,换算成秒就是15秒
cluster-node-timeout 15000

启动6个redis服务并查看进程

redis-server redis6379.conf
redis-server redis6380.conf
redis-server redis6381.conf
redis-server redis6389.conf
redis-server redis6390.conf
redis-server redis6391.conf

image

将6个节点合并成一个集群

  • 进入到之前解压redis的目录:cd /opt/redis-6.2.1/src/

  • 执行如下命令

# --replicas 1采用最简单的方式配置集群
redis-cli --cluster create --cluster-replicas 1 192.168.22.100:6379 192.168.22.100:6380 192.168.22.100:6381 192.168.22.100:6389 192.168.22.100:6390 192.168.22.100:6391

中途会有点提示,询问是否按照默认分配的主机与从机进行集群配置。
image
image

  • 最后还能看到插槽的数量是16384个,下面最大是16383是因为,插槽是从0开始算的
    • 6379这个节点的插槽是:0~5460
    • 6380节点:5461~10922
    • 6381节点:10923~16383

连接客户端开始使用(采用集群方式连接)

  • 如果不采用集群方式连接,存储数据时,如果key计算的插槽不在当前节点,则会导致出错
    image

  • 因此应该采用集群的方式连接客户端
    redis-cli -c -p 6379

  • 此时由于无中心化集群配置的原因,设置key和获取key对应的值时,都会根据key所对应的插槽使当前连接的客户端连接到指定节点

集群的故障恢复以及相关配置

  • 综上的配置,当前的集群一共有3个主机,3个从机

  • 如果一个主机宕机,那么15秒内从机将会晋升为主机,当主机再次上线时,就变成了从机

  • redis的配置文件中有如下配置

# 如果该配置设置为yes,那么如果某个节点的主机和从机全部挂掉时,整个集群都会直接挂掉
# 如果设置为no,那么剩下的主从节点将会继续提供它们插槽范围内的服务
cluster-require-full-coverage yes

Jedis的集群开发

  • 使用JedisCluster工具来完成集群操作(别忘了关闭Linux的防火墙,或者开放端口)
public class JedisClusterTest {
    public static void main(String[] args) {
        //  注意:访问任何一个节点都是可以的,因为是无中心化集群
        JedisCluster jedisCluster = new JedisCluster(
                new HostAndPort("192.168.22.100", 6391));

        jedisCluster.set("k3", "wangming");

        String result = jedisCluster.get("k3");

        System.out.println(result);
    }
}

集群的好处和不足

  • 好处

    • 减轻了单台服务器的压力
    • 实现扩容
    • 无中心化配置比较简单
  • 不足

    • 多键操作是不被支持的,虽然可以靠着分组完成,但是并不方便
    • 多键的Redis事务是不被支持的,lua脚本不被支持

Redis应用问题及解决方案

缓存穿透

  • 问题描述

    • (1)服务器的压力突然剧增
    • (2)访问的接口全部都无法靠缓存处理,key对应的数据源并不存在
    • (3)访问的请求甚至是无效请求
    • (4)一般遇到这种情况都是黑客攻击,可以考虑直接报网警
  • 解决方案

    • (1)对空值进行缓存。如果查询返回的数据为null,将其缓存到redis数据库中,设置极短的过期时间,一般不超过5分钟(应急方案)
    • (2)设置可以访问的白名单。使用bitMaps定义一个可以访问的名单,名单id作为bitMaps的偏移量,每次访问时都通过bitMaps来查询是否在白名单范围内
    • (3)采用布隆过滤器
    • (4)进行实时监控,当发现Redis的缓存命中率急剧下降时,需要排查访问对象,与运维人员配合。
    • (5)直接报警。

缓存击穿

  • 问题描述

    • (1)服务器的压力突然增加
    • (2)此时缓存服务器中的某个热门key突然过期了
    • (3)而大量的请求都在访问这个热门key,最终导致大量请求都在一瞬间访问服务器,导致服务器压力过大直接宕机。
  • 解决方案

    • (1)预先设置好一些热门数据
    • (2)实时调整:现场监控哪些热门数据,实时调整key的过期时间
    • (3)使用锁的方式。
      • 当缓存失效时,不是立即去访问服务器
      • 使用某些操作成功会带返回值的操作,例如redis的setnx,来设置一个互斥的key
      • 当操作返回成功的时候,再进行访问服务器的操作,并回头设置缓存,最后删除互斥key
      • 当操作返回失败,证明已经有线程正在load db,可以当线程睡眠一会后直接get整个缓存

缓存雪崩

  • 问题描述

    • (1)服务器的压力突然增加
    • (2)缓存服务器中的多个key在同一时间过期
    • (3)而刚好在这时有大量的客户端请求发送过来,并且大部分都没有办法命中缓存
    • (4)因此全都会去访问服务器,最终服务器不堪重负宕机
  • 解决方案

    • (1)构建多级缓存结构: nginx缓存 + redis缓存 + 其他缓存(ehcache等)
    • (2)使用锁或队列:不适用大量并发情况
    • (3)设置过期标识更新缓存
      • 记录缓存是否过期(设置提前量),如果过期会触发通知另外的线程完成对缓存的更新。
    • (4)将缓存的过期时间分散开来。
      • 比如我们可以在原有缓存的失效时间基础上添加一个随机值,比如1~5分钟,这样就可以减少大量的缓存在同一时间过期的概率。

分布式锁

  • 指的是在不同的机器提供的服务,需要实现一把锁对所有机器提供服务的控制,为了解决这个问题,就需要一种技术来实现跨JVM的互斥机制来控制共享资源的访问。上锁之后对所有机器都有效,这就叫做分布式锁

  • 解决方案

    • (1)基于数据库实现分布式锁
    • (2)基于缓存(Redis等)
    • (3)给予Zookeeper
  • 优缺点

    • (1)性能: redis性能最高
    • (2)可靠性: Zookeeper可靠性最高

使用Redis实现分布式锁

redis命令:# set sku:1:info "OK" NX PX 10000

  • EX second:设置键的过期时间为秒

  • PX millisecond:设置键的过期时间为毫秒

  • NX:只有当键不存在时,才能对键进行操作

  • XX :只有当键已经存在时,才对键进行操作

实现redis中num数字的增加,靠setnx完成

@RestController
public class NumRedisLockController {
    @Autowired
    private RedisTemplate redisTemplate;

    // 1、压力测试前: 先在redis当中设置string类型的 num = 0
    @RequestMapping("numLockTest")
    public void numLockTest() {
        // 2、上锁
        if (redisTemplate.opsForValue().setIfAbsent("numLock", "ok")){
            // 执行num + 1
            redisTemplate.opsForValue().increment("num");
            // 3、解锁
            redisTemplate.delete("numLock");
        }else {
            try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
            // 重试
            numLockTest();
        }
    }
}
优化之添加锁的过期时间
  • 为什么要这么做?

    • 因为如果一个线程上了锁之后,在里面发生了异常,可能最终导致都无法解锁,那么其他线程将会一直等待。
  • 实现代码

@RestController
public class NumRedisLockController {
    @Autowired
    private RedisTemplate redisTemplate;

    // 1、压力测试前: 先在redis当中设置string类型的 num = 0
    @RequestMapping("numLockTest")
    public void numLockTest() {
        // 2、上锁 ,设置过期时间
        if (redisTemplate.opsForValue().setIfAbsent("numLock", "ok", 1, TimeUnit.SECONDS)){
            // 执行num + 1
            redisTemplate.opsForValue().increment("num");
            // 3、解锁
            redisTemplate.delete("numLock");
        }else {
            try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
            // 重试
            numLockTest();
        }
    }
}
优化之添加UUID防止误删除
  • 为什么会出现这种问题

    • 线程A刚要执行解锁操作时,锁的过期时间到了,导致锁过期了。
    • 这个时候如果线程B拿到锁进入了方法体,将会导致A释放了线程B的锁。
  • 实现方案

@RestController
public class NumRedisLockController {
    @Autowired
    private RedisTemplate redisTemplate;

    // 1、压力测试前: 先在redis当中设置string类型的 num = 0
    @RequestMapping("numLockTest")
    public void numLockTest() {
        // 获取UUID
        String uuid = UUID.randomUUID().toString();
        // 2、上锁 ,设置过期时间
        if (redisTemplate.opsForValue().setIfAbsent("numLock", uuid, 1, TimeUnit.SECONDS)){
            // 执行num + 1
            redisTemplate.opsForValue().increment("num");
            // 3、解锁
            if (uuid.equals((String)redisTemplate.opsForValue().get("numLock"))) {
                redisTemplate.delete("numLock");
            }
        }else {
            try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
            // 重试
            numLockTest();
        }
    }
}
优化之使用lua脚本保证的原子性
  • 问题分析

    • 线程A判断成功了当前锁对应的value值确实是自己的uuid
    • 刚准备执行删除操作的时候,锁过期时间到了
    • 与此同时,线程B拿到了锁
    • 此时线程A将会把线程B的锁给释放。
    • 因此得出结论,需要让进行删除标识判断以及删除的操作保持原子性
  • 创建一个lua脚本

if redis.call('get', KEYS[1]) == ARGV[1] 
then
return redis.call('del', KEYS[1]) 
else 
return 0 
end
  • 使用示例
@RestController
public class NumRedisLockController {
    @Autowired
    private RedisTemplate redisTemplate;

    // 1、压力测试前: 先在redis当中设置string类型的 num = 0
    @RequestMapping("numLockTest")
    public void numLockTest() {
        // 获取UUID
        String uuid = UUID.randomUUID().toString();

        String lockKey = "numLock";

        // 获取锁
        Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid, 1, TimeUnit.SECONDS);

        if (lock) {
            // 使num 每次+1 放入缓存
            redisTemplate.opsForValue().increment("num");
            /*使用lua脚本来锁*/
            // 定义lua 脚本
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

            // 创建一个Redis脚本文件
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            redisScript.setScriptText(script);
            // 设置一下返回值类型 为Long ,指的是redisTemplate.execute返回的实际类型
            redisScript.setResultType(Long.class);
            
            // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
            Long execute = (Long)redisTemplate.execute(redisScript, Arrays.asList(lockKey), uuid);
            System.out.println(execute == 1 ? "执行成功": "执行失败");

        }else {
            try { TimeUnit.NANOSECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace(); }
            // 重试
            numLockTest();
        }
    }
}

标签:JAVA,conf,Redis,redis,k1,从零开始,jedis,key
From: https://www.cnblogs.com/itdqx/p/16688133.html

相关文章