首页 > 数据库 >Redis高级

Redis高级

时间:2024-08-02 21:51:24浏览次数:12  
标签:文件 缓存 AOF Redis redis 高级 key

Redis高级

事务

什么是事务

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis事务的主要作用就是串联多个命令防止别的命令插队。

Multi 、 Exec 、 discard

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。

组队的过程中可以通过discard来放弃组队。

组队过程中 如果存在语法错误 exec 执行不了

组队过程中 如果存在数据错误 exec会按顺序执行 报错退出

事务的错误处理

image-20220616210255890

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

image-20220616210314170

事务冲突的问题

  • 案例:

    image-20220616210436677

悲观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁。

image-20220616210635805

乐观锁

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

image-20220616210716350

WATCH key [key ...]

在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

unwatch

取消 WATCH 命令对所有 key 的监视。

如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。

Redis事务三特性

  • 单独的隔离操作

    事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

  • 没有隔离级别的概念

    队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

  • 不保证原子性

    事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

Redis持久化之RDB

Redis 提供了2个不同形式的持久化方式

RDB(Redis DataBase)

AOF(Append Of File)

RDB

  • 什么是RDB

    在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里

  • 备份是如何执行的

    Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

  • Fork

    Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程

    l在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术

    一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

    持久化流程:

    image-20220616215755972

  • 配置

    rdb文件的保存路径,也可以修改。默认为Redis启动时命令行所在的目录下

    dir ./

    dump.rdb文件

    在redis.conf中配置文件名称,默认为dump.rdb

    配置文件中默认的快照配置

    ​ 格式:save 秒钟 写操作次数

    RDB是整个内存的压缩过的Snapshot,RDB的数据结构,可以配置复合的快照触发条件,

    默认是1分钟内改了1万次,或5分钟内改了10次,或60分钟内改了1次。

    image-20220616220418567

    stop-writes-on-bgsave-error

    当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes

    rdbcompression压缩文件

    对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。

    image-20220616220915311

    rdbchecksum 检查完整性

    在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能

    image-20220616221040696

  • save VS bgsave命令

    save :save时只管保存,其它不管,全部阻塞。手动保存。不建议。

    bgsaveRedis会在后台异步进行快照操作, 快照同时还可以响应客户端请求。

    可以通过lastsave 命令获取最后一次成功执行快照的时间

  • RDB恢复

    先通过config get dir 查询rdb文件的目录

    将*.rdb的文件拷贝到别的地方

    rdb的恢复

    ​ 关闭Redis

    ​ 先把备份的文件拷贝到工作目录下 cp dump2.rdb dump.rdb

    ​ 启动Redis, 备份数据会直接加载

  • 优势

    • 适合大规模的数据恢复
    • 对数据完整性和一致性要求不高更适合使用
    • 节省磁盘空间
    • 恢复速度快
  • 劣势

    • Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
    • 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
    • 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
  • 动态停止RDB

    redis-cli config set save ""#save后给空值,表示禁用保存策略

    Redis持久化之AOF

什么是AOF

日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

AOF 持久化流程

(1)客户端的请求写命令会被append追加到AOF缓冲区内;

(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;

(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;

(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

image-20220616223506193

配置

  • AOF默认不开启

可以在redis.conf中配置文件名称,默认为 appendonly.aof

AOF文件的保存路径,同RDB的路径一致。

  • AOF和RDB同时开启,redis听谁的?

AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)

  • AOF启动/修复/恢复

    AOF的备份机制和性能虽然和RDB不同, 但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载。

    正常恢复

    修改默认的appendonly no,改为yes

    将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir)

    恢复:重启redis然后重新加载

    异常恢复

    修改默认的appendonly no,改为yes

    如遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof--fix appendonly.aof 进行恢复

    备份被写坏的AOF文件

    恢复:重启redis,然后重新加载

  • AOF同步频率设置

    • appendfsync always

      始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较

    • appendfsync everysec

      每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。

    • appendfsync no

      redis不主动进行同步,把同步时机交给操作系统。

  • Rewrite压缩

    1、是什么:

    AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof

    2、重写原理,如何实现重写

    AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0版本后的重写,是指上就是把rdb 的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。

    no-appendfsync-on-rewrite:

    如果 no-appendfsync-on-rewrite=yes ,不写入aof文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能)

    如果 no-appendfsync-on-rewrite=no, 还是会把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)

    触发机制,何时重写

    Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发

    重写虽然可以节约大量磁盘空间,减少恢复时间。但是每次重写还是有一定的负担的,因此设定Redis要满足一定条件才会进行重写。

    auto-aof-rewrite-percentage:设置重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)

    auto-aof-rewrite-min-size:设置重写的基准值,最小文件64MB。达到这个值开始重写。

    例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写?100MB

    系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,

    如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。

    3、重写流程

    (1)bgrewriteaof触发重写,判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。

    (2)主进程的fork子进程执行重写操作,保证主进程不会阻塞。

    (3)子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失。

    (4)1).子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。2).主进程把aof_rewrite_buf中的数据写入到新的AOF文件。

    (5)使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。

    image-20220616230026736

如何选择

如果对数据不敏感,可以选单独用RDB。不建议单独用 AOF,因为可能会出现Bug。如果只是做纯内存缓存,可以都不用。

官方建议

l RDB持久化方式能够在指定的时间间隔能对你的数据进行快照存储

l AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次写的操作到文件末尾.

l Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大

l 只做缓存:如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式.

l 同时开启两种持久化方式

l 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据, 因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整.

l RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件。那要不要只使用AOF呢?

l 建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份), 快速重启,而且不会有AOF可能潜在的bug,留着作为一个万一的手段。

主从复制

主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主

优点:

1.读写分离,性能扩展

2.容灾快速恢复

image-20220616232925560

配置

  • 准备

    在Redis安装目录中创建文件夹 master_slave_conf 和 logs

    拷贝多个redis.conf文件

    include(写绝对路径)

    开启daemonize yes

    Pid文件名字pidfile

    指定端口port

    Log 文件名字

    dump.rdb名字dbfilename

    Appendonly 关掉或者换名字

  • 新建redis6380.conf,填写以下内容

    include /usr/local/soft/redis-6.2.1/redis.conf

    pidfile /var/run/redis_6380.pid

    port 6380

    logfile /usr/local/soft/redis-6.2.1/logs/redis_6380.log

    dbfilename dump6380.rdb

  • 新建redis6381.conf

    include /usr/local/soft/redis-6.2.1/redis.conf

    pidfile /var/run/redis_6381.pid

    port 6381

    logfile /usr/local/soft/redis-6.2.1/logs/redis_6381.log

    dbfilename dump6381.rdb

  • 新建redis6382.conf

    include /usr/local/soft/redis-6.2.1/redis.conf

    pidfile /var/run/redis_6382.pid

    port 6382

    logfile /usr/local/soft/redis-6.2.1/logs/redis_6382.log

    dbfilename dump6382.rdb

  • info replication

    打印主从复制的相关信息

  • 配从(库)不配主()

    slaveof

    成为某个实例的从服务器

    1、在6381和6382上执行: slaveof 127.0.0.1 6380

  • 测试

    在主机上写,在从机上可以读取数据 在从机上写数据报错

    注意:

    ​ 主机挂掉,重启就行,一切如初

    ​ 从机重启需重设:slaveof 127.0.0.1 6379

一主二仆

切入点问题?slave1、slave2是从头开始复制还是从切入点开始复制?比如从k4进来,那之前的k1,k2,k3是否也可以复制?

从机是否可以写?set可否?

主机shutdown后情况如何?从机是上位还是原地待命?

主机又回来了后,主机新增记录,从机还能否顺利复制?

其中一台从机down后情况如何?依照原有它能跟上大部队吗?

image-20220616235738289

薪火相传

上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master,去中心化降低风险。

用 slaveof

中途变更转向:会清除之前的数据,重新建立拷贝最新的

风险是一旦某个slave宕机,后面的slave都没法备份

主机挂了,从机还是从机,无法写数据了

image-20220616235746757

反客为主

当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。

用 slaveof no one 将从机变为主机

哨兵模式

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库

  • 配置

    目录下新建sentinel.conf文件,名字绝不能错

    配置哨兵,填写内容

    sentinel monitor mymaster 127.0.0.1 6380 1

    其中mymaster为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。

    启动哨兵

    执行redis-sentinel ./sentinel.conf

  • 效果

    当主机挂掉,从机选举中产生新的主机

    (大概10秒左右可以看到哨兵窗口日志,切换了新的主机)

    哪个从机会被选举为主机呢?根据优先级别:slave-priority

    原主机重启后会变为从机。

  • 故障恢复

    image-20220617002222385

    优先级在redis.conf中默认:slave-priority 100,值越小优先级越高

    偏移量是指获得原主机数据最全的

    每个redis实例启动后都会随机生成一个40位的runid

Redis 集群

问题

容量不够,redis如何进行扩容?

并发写操作, redis如何分摊?

另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

之前通过代理主机来解决,但是redis3.0中提供了解决方案。就是 无中心化 集群配置

什么是集群

Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。

Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求

删除持久化数据

将rdb,aof文件都删除掉。

配置

  • 创建文件夹 mkdir configs

  • 制作6 个实例, 6380,6381,6382,6390,6391,6392

  • 配置基本信息

    开启 daemonize yes

    Pid文件名字

    指定端口

    Log文件名字

    Dump.rdb 名字

    Appendonly 关掉或者换名字

  • redis cluster配置修改

    cluster-enabled yes 打开集群模式

    cluster-config-file nodes-6380.conf 设定节点配置文件名

    cluster-node-timeout 15000 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。

    redis_6380.conf

    include /usr/local/soft/redis-6.2.1/redis.conf

    pidfile "/var/run/redis_6380.pid"

    port 6380

    logfile "/usr/local/soft/redis-6.2.1/logs/redis_6380.log"

    dbfilename "dump6380.rdb"

    # Generated by CONFIG REWRITE

    daemonize yes

    protected-mode no

    databases 24

    maxmemory 1000000000

    dir "/usr/local/soft/redis-6.2.1/configs"

    cluster-enabled yes

    cluster-config-file nodes-6380.conf

    cluster-node-timeout 15000

  • 修改好redis6380.conf文件,拷贝多个redis.conf文件 :%s/6380/new

  • 修改另外5个文件

  • 确保当前端口不被占用,请关闭之前所有redis服务 ps -aux | grep redis

  • 创建 /usr/local/soft/redis-6.2.1/redis_cluster 目录并启动6个redis服务

  • 将六个节点合成一个集群

    • 组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常
    • cd /usr/local/soft/redis-6.2.1/src
    • redis-cli --cluster create --cluster-replicas 1 192.168.20.90:6380 192.168.20.90:6381 192.168.20.90:6382 192.168.20.90:6390 192.168.20.90:6391 192.168.20.90:6392
    • 此处不要用 127.0.0.1 , 请用真实IP地址 --replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好三组
  • 登录

    • 普通方式登录

      可能直接进入读主机,存储数据时,会出现MOVED重定向操作。所以,应该以集群方式登录。

    • -c 采用集群策略连接,设置数据会自动切换到相应的写主机

      image-20220617011559557

    • 通过 cluster nodes 命令查看集群信息

    • redis cluster 如何分配这六个节点?

      一个集群至少要有三个主节点。

      选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。

  • 什么是 slots

    [OK] All 16384 slots covered.

    一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,

    集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。

    集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:

    节点 A 负责处理 0 号至 5460 号插槽。

    节点 B 负责处理 5461 号至 10922 号插槽。

    节点 C 负责处理 10923 号至 16383 号插槽。

  • 集群中录入值

    在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。

    redis-cli客户端提供了 –c 参数实现自动重定向。

    如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。

    不在一个slot下的键值,是不能使用mget,mset等多键操作。

    image-20220617012235166

    可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去

    image-20220617012343532

  • 查询集群中的值

    CLUSTER GETKEYSINSLOT 返回 count 个 slot 槽中的键。

    image-20220617012659837

  • 故障恢复

    如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?

    如果某一段插槽的主从都挂掉,而 cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉

    如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。

image-20220617013051071

Jedis操作

即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。

无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。

public class JedisClusterTest {
  public static void main(String[] args) { 
     Set<HostAndPort>set =new HashSet<HostAndPort>();
     set.add(new HostAndPort("192.168.31.211",6379));
     JedisCluster jedisCluster=new JedisCluster(set);
     jedisCluster.set("k1", "v1");
     System.out.println(jedisCluster.get("k1"));
  }
}

评价

  • 优点

    实现扩容

    分摊压力

    无中心配置相对简单

  • 缺点

    多键操作是不被支持的

    多键的Redis事务是不被支持的

    由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

缓存穿透

问题描述

key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

image-20220617013905336

解决方案

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

解决方案:

(1) 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟

(2) 设置可访问的名单(白名单):

使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。

(3) 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。

布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)

将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

(4) 进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

缓存击穿

问题描述

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

image-20220617014020608

解决方案

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

解决问题:

1)预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

(2)实时调整:现场监控哪些数据热门,实时调整key的过期时长

(3)使用锁:

(1) 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。

(2) 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key

(3) 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;

(4) 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

image-20220617014119509

缓存雪崩

问题描述

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key正常访问

image-20220617014207048

缓存失效瞬间

image-20220617014220859

解决方案

缓存失效时的雪崩效应对底层系统的冲击非常可怕!

解决方案:

(1) 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)

(2) 使用锁或队列:

用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

(3) 设置过期标志更新缓存:

记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。

(4) 将缓存失效时间分散开:

比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

Redis缓存

Springboot中的配置

spring:
  cache:
    type: redis
    redis:
      time-to-live: 60000
      cache-null-values: false

pom文件中依赖 开启Springboot的缓存

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

常见错误

错误信息:

org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'data' cannot be found on object of type 'org.springframework.cache.interceptor.CacheExpressionRootObject' - maybe not public or not valid?

解决方式:

​ 对应@Cacheable中key中的固定值要用单引号

错误信息:

java.net.ConnectException: Connection refused: no further information

解决方式:

​ 确认Redis连接无误:配置信息内容 以及 Linux中的redis是否启动

代码

config层

@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean //在Springboot启动后,通过扫描到该注解,然后调用该方法并传入factory对象
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // redis的字符串序列化类
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        // 指定序列化输入的类型,类必须是非final修饰的
        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashKeySerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om); // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));
//                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}

Dao层

@Repository
public class DeptDaoImpl implements DeptDao {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    Dept dept;

    @Override
    public String getDept() {
        String sql ="select * from bigdata19.dept where deptno='30'";
        List<Map<String, Object>> maps = jdbcTemplate.queryForList(sql);
        for (Map<String, Object> map : maps) {
            dept.setDeptno((Integer)map.get("DEPTNO"));
            dept.setDname((String)map.get("DNAME"));
            dept.setLoc((String)map.get("LOC"));
        }
        return dept.toString();
    }





    @Override
    public List<Dept> getAllCusBaseInfo() {
        //查询部门信息
        List<Map<String,Object>> list=jdbcTemplate.queryForList("select * from dept");
        List<Dept> cusBaseInfoList=new ArrayList<>();
        for (Map<String,Object> map:list ) {

            Dept cusBaseInfo=new Dept();
            //部门id
            cusBaseInfo.setDeptno((Integer)map.get("deptno"));
            cusBaseInfo.setDname(map.get("dname").toString());
            cusBaseInfo.setLoc(map.get("loc").toString());
            cusBaseInfoList.add(cusBaseInfo);
        }
        return cusBaseInfoList;
    }

    @Override
    public Dept saveDeptInfo(Dept dept) {
//        dept.setDeptno(UUID.randomUUID().toString());
        System.out.println(dept);
        String sql="insert into dept (deptno,dname,loc) values(?,?,?)";
        jdbcTemplate.update(sql,
                new Object[]{
                dept.getDeptno(),
                dept.getDname(),
                dept.getLoc(),},new int[]{
                        Types.INTEGER,Types.VARCHAR,Types.VARCHAR});
        //获取信息
        return getDeptByNo(dept.getDeptno());
    }
    private static final Logger log = LoggerFactory.getLogger(DeptDaoImpl.class);

    @Override
    public Dept getDeptByNo(Integer deptNo) {

//        log.error("getDeptByNo错误发生....");
        log.info("getDeptByNo正在执行....");
        Map<String,Object> map=jdbcTemplate.queryForMap("select * from dept where deptno = ?",deptNo);
        Dept cusBaseInfo=new Dept();

        cusBaseInfo.setDeptno((Integer)map.get("deptno"));

        cusBaseInfo.setDname(map.get("dname").toString());

        cusBaseInfo.setLoc(map.get("loc").toString());
        return cusBaseInfo;

    }

    @Override
    public String updateDept(Dept dept) {
        String sql="update  dept set loc = ? where DEPTNO = ?";
        jdbcTemplate.update(sql,
                new Object[]{
                        dept.getLoc(),
                        dept.getDeptno()
                        },new int[]{
                        Types.VARCHAR,
                        Types.INTEGER});
        return getDeptByNo(dept.getDeptno()).toString();
    }
}

Service层

@Service
public class CacheServiceImpl implements CacheService {

    /**
     * 由于有@Cacheable注释修饰后,当第一次调用该方法时,该方法会直接执行,
     *      并且会将执行结果存放至缓存中,在后续的调用中,可以直接去缓存中寻找对应的值,不直接执行该方法
     *
     *
     * @return
     */
    @Override
    @Cacheable(value = "cache",key = "'data'")
    public String getData() {
        System.out.println("service中的getData方法被调用了....");
        return "从MySQL中读取数据... 1";
    }

    @Autowired
    DeptDao deptDao;

    /**
     *  在 Cacheable 注解中的 key中对应单引号为固定值 #变量名 为调用方法中的参数
     *
     */
    @Override
    @Cacheable(value = "cache",key = "#deptNo")
    public Dept getDeptByNo(Integer deptNo) {
        return deptDao.getDeptByNo(deptNo);
    }


    /**
     * 虽然我们针对数据设置了 Key的过期时间,但是无法保证数据在过期时间内,不发生变化
     *  那么后续获取到的数据都是缓存中的错误数据
     *  所以在修改数据时,需要将原先缓存中的数据进行删除
     *  @CacheEvict 注解用于删除缓存数据
     */

    @Override
    @CacheEvict(value = "cache",key = "'deptNo'")
    public String updateInfo(Dept dept) {
        return deptDao.updateDept(dept);
    }

}

Control层

@RestController
@RequestMapping(value = "cache")
public class CacheController {

    @Autowired
    CacheService cacheService;


    @RequestMapping(value = "getData")
    public String getData(){
        System.out.println("*******************");
        return cacheService.getData();
    }


    @RequestMapping(value = "/get/{deptno}",method = RequestMethod.GET)
    public Dept getCusBaseInfoById(@PathVariable("deptno") Integer deptno){
        System.out.println("该方法被调用...");
        return cacheService.getDeptByNo(deptno);
    }

    @RequestMapping(value = "/update",method = RequestMethod.POST)
    public String updateInfo(@RequestBody Dept dept){
        return cacheService.updateInfo(dept);
    }

}

注意:

​ 如果要使用缓存注解 那么需要在Application主入口类上添加@EnableCaching注解 或者在RedisConfig类上添加@EnableCaching

标签:文件,缓存,AOF,Redis,redis,高级,key
From: https://www.cnblogs.com/justice-pro/p/18339664

相关文章

  • go-zero 使用 redis 作为 cache 的 2 种姿势
    在go-zero框架内,如在rpc的应用service中,其内部已经预置了redis的应用,所以我们只需要在配置中加入相关字段即可,另外,在svcContext声明redisclient后即可在具体的业务逻辑处理中应用。但这里有个问题,如我用的是go-zero1.5.0版本,从源码分析来看,redis的连接并没用到......
  • 嵌入式软件--C语言高级 DAY 8.5 相关函数
    递归函数在嵌入式中应用不常见,但对于学习C语言的我们,也要时刻记得它的作用和用法。此外还要记住sprintf尤其重要!还有时间戳!一、递归函数1.概念一个函数在函数体内又调用了本身。但必须满足两个条件:具有明显的结束条件;趋近于结束条件的趋势。2.递归原理#include<stdio.h>......
  • 嵌入式软件--C语言高级 DAY 7数组
    一、概念数组array:是多个相同类型数据按一定顺序排列的集合,并使用一个标识符命名。并通过编号(索引,亦称为下标或角标)的方式对这些数据进行统一管理。数组的长度=元素的个数标号角标是从0开始。二、define_array.c定义数组的三种形式:1.定义数组,可以先确定数组的元素个......
  • Redis学习[5] ——Redis过期删除和内存淘汰
    六、Redis过期键值删除6.1Redis的过期键值删除策略6.1.1什么是过期键值删除?Redis中是可以对key设置过期时间的,所以需要有相应的机制将已过期的键值对删除,也就是**过期键值删除策略。Redis会用一个过期字典(expiresdict)**来存储有过期时间的所有key。当查询一个key时,Red......
  • C++高级功能
    Lambda匿名函数[只读列表](参数列表){函数体}例如:sort(a+1,a+n,[](constData&x,constData&y){returnx.val<y.val;}constintk=5;autocalc=[k](constint&x){returnx*k;}template模板在struct/namespace/函数前加入template<typ......
  • C高级(学习)2024.8.2
    目录1.指针函数概念格式2.函数指针概念格式基本用法3.函数指针数组概念格式  4.共用体格式定义共用体变量特性5.枚举定义格式6.存储类型(1)auto(2)static(3)extern(4)register7.条件编译(1)根据宏是否定义(2)根据宏值(3)防止头文件重复包含(放在头文件中)1.指针函......
  • redis配置(不全)
    1、pom添加依赖<!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>2、Redis配置类packagecom.exam.config;importcom.......
  • 【Spark高级应用】使用Spark进行高级数据处理与分析
    Spark高级应用使用Spark进行高级数据处理与分析引言在大数据时代,快速处理和分析海量数据是每个企业面临的重大挑战。ApacheSpark作为一种高效的分布式计算框架,凭借其高速、易用、通用和灵活的特点,已经成为大数据处理和分析的首选工具。本文将深入探讨如何使用Spark进行......
  • play高级用法
    play高级用法控制并发更新主机的数量#指定具体数字----name:test1serialhosts:allserial:2#每次同时处理2个主机max_fail_percentage:50#当两台机器中一台执行失败,既终止task或者#也可以使用百分比进行控制----name:test2se......
  • MQ高级
    消息的可靠性:一个消息发送出去以后至少被消费一次丢失场景:消息发送时候丢失,mq崩了消息丢失,消费者把消息搞丢了(交易服务) 解决方法针对以上三个场景和兜底方案 1、发送者可靠性消息从生产者到消费者的每一步都可能导致消息丢失:-发送消息时丢失:-生产者发送消息时连接MQ......