Redis
1 几乎覆盖了Memcached的绝大部分功能
2 数据都在内存中,支持持久化,主要用作备份恢复
3 除了支持简单的key-value模式,还支持多种数据结构的存储,比如 list、set、hash、zset等。
4 一般是作为缓存数据库辅助持久化的数据库
MongoDB
1 高性能、开源、模式自由(schema free)的文档型数据库
2 数据都在内存中, 如果内存不足,把不常用的数据保存到硬盘
3 虽然是key-value模式,但是对value(尤其是json)提供了丰富的查询功能
4 支持二进制数据及大型对象
5 可以根据数据的特点替代RDBMS ,成为独立的数据库。或者配合RDBMS,存储特定的数据。
DB-Engines数据库排名
查看连接http://db-engines.com/en/ranking
Redis简介和安装
Redis简介和适用场景
-
Redis是当前比较热门的NOSQL系统之一,它是一个开源的使用ANSI c语言编写的key-value存储系统(区别于MySQL的二维表格的形式存储。)
-
Redis数据都是缓存在计算机内存中,但是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,实现数据的持久化。
-
Redis读写速度快,Redis读取的速度是110000次/s,写的速度是81000次/s;
-
Redis的所有操作都是原子性的。
-
Redis支持多种数据结构:string(字符串),list(列表),hash(哈希),set(集合),zset(有序集合)
-
Redis支持集群部署
-
支持过期时间,支持事务,消息订阅
Redis的安装和基本操作
Redis的官网和下载
官网
Redis官方网站 | Redis中文官方网站 |
---|---|
http://redis.io | [https://www.redis.net.cn/]( |
Redis安装
第一步 下载redis及版本选择
第二步 下载安装最新版本的gcc编译器
-
安装C语言环境
yum -y install gcc
-
测试安装是否成功
gcc --version
第三步 上传redis-7.0.10.tar.gz放/opt目录
第四步 解压命令:tar -zxvf redis-7.0.10.tar.gz
第五步 解压完成后进入目录:cd redis-7.0.10
第六步 在redis-7.0.10目录下再次执行make命令(只是编译好)
注意:如果没有准备好C语言编译环境,make 会报错
-
Jemalloc/jemalloc.h:没有那个文件
-
此时解决方案:运行make distclean
-
make disclean
-
安装好 gcc后再次make
第七步 跳过make test,继续执行make install
Redis的启动和停止
查看安装目录
cd /usr/local/bin
- redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何
- redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲
- redis-check-dump:修复有问题的dump.rdb文件
- redis-sentinel:Redis哨兵/集群使用
- redis-server:Redis服务器启动命令
- redis-cli:客户端,操作入口
前台启动方式
redis-server
- 不推荐原因: 窗口不能关闭,关闭则服务停止
后台启动方式
修改配置文件redis.conf
- 修改配置文件中的 bind ,注释该配置,取消绑定仅主机登录
- 修改protected-mode 为no,取消保护模式
- 改redis.conf 文件将里面的daemonize no 改成 yes,让服务在后台启动
- 启动redis时,使用我们自己修改之后的配置文件
redis-server redis.conf
通过客户端连接redis
通过redis-cli命令默认连接的是本地的redis服务,并且使用默认6379端口。也可以通过指定如下参数连接:
-h ip地址 -p 端口号 -a 密码(如果需要)
- 通过客户端指令连接redis
redis-cli
- 如果想退出客户端可以 按 Ctrl+c ,退出客户端不会关闭redis服务
- 通过客户端连接制定端口下的redis (默认6379)
redis-cli -p 6379
- 连接后,测试与redis的连通性
ping
停止redis服务
- 单实例非客户端连接模式下关闭服务
redis-cli shutdown
- 在客户端连接模式下,直接使用shutdown关闭当前连接的redis服务
shutdown
- 多实例关闭指定端口的redis服务
redis-cli -p 6379 shutdown
数据库操作
默认16个数据库,类似数组下标从0开始,初始默认使用0号库
使用命令 select <dbid>来切换数据库。如: select 8
统一密码管理,所有库同样密码。
dbsize查看当前数据库的key的数量
flushdb清空当前库
flushall清空全部库
Redis单线程+多路复用
多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)
- 多路:指的是多个网络连接客户端
- 复用:指的是复用同一个线程
- I/O 多路复用:其实是使用一个线程来检查多个 Socket 的就绪状态,在单个线程中通过记录跟踪每一个 socket(I/O流)的状态来管理处理多个 I/O 流。
1、一个 socket 客户端与服务端连接时,会生成对应一个套接字描述符(套接字描述符是文件描述符的一种),每一个 socket 网络连接其实都对应一个文件描述符。
- 文件描述符(file descriptor): Linux 系统中,把一切都看做是文件,当进程打开现有文件或创建新文件时,内核向进程返回一个文件描述符。可以理解文件描述符是一个索引,这样,要操作文件的时候,我们直接找到索引就可以对其进行操作了。我们将这个索引叫做文件描述符(file descriptor),简称fd。
2、多个客户端与服务端连接时,Redis 使用 「I/O 多路复用程序」 将客户端 socket 对应的 FD 注册到监听列表(一个队列)中。当客服端执行 read、write 等操作命令时,I/O 多路复用程序会将命令封装成一个事件,并绑定到对应的 FD 上。
3、「文件事件处理器」使用 I/O 多路复用模块同时监控多个文件描述符(fd)的读写情况,当 accept、read、write 和 close 文件事件产生时,文件事件处理器就会回调 FD 绑定的事件处理器进行处理相关命令操作。
4、文件事件分派器接收到I/O多路复用程序传来的套接字fd后,并根据套接字产生的事件类型,将套接字派发给相应的事件处理器来进行处理相关命令操作。
5、整个文件事件处理器是在单线程上运行的,但是通过 I/O 多路复用模块的引入,实现了同时对多个 FD 读写的监控,当其中一个 client 端达到写或读的状态,文件事件处理器就马上执行,从而就不会出现 I/O 堵塞的问题,提高了网络通信的性能。
Redis常用数据类型和命令
什么是Redis的五大数据类型
redis的存储时 key-value形式的,这里的五大类型指的是 value的五种数据类型
相关命令
1 如何对键进行一些操作
2 String类型的value值如何进行操作
3 List 类型的value如何进行操作
4 Set类型的value如何进行操作
5 Hash类型的value如何进行操作
6 Zset类型的value如何进行操作
redis常见数据类型操作命令的帮助文档
http://www.redis.cn/commands.html
第一节 key操作的相关命令
语法 | 功能 |
---|---|
keys * | 查看当前库所有key (匹配:keys *1) |
exists key | 判断某个key是否存在 |
type key | 查看你的key是什么类型 |
del key | 删除指定的key数据 |
unlink key | 非阻塞删除,仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作 |
expire key 10 | 10秒钟:为给定的key设置过期时间 |
ttl key | 查看还有多少秒过期,-1表示永不过期,-2表示已过期 |
select | 命令切换数据库 |
dbsize | 查看当前数据库的key的数量 |
flushdb | 清空当前库 |
flushall | 清空全部库 |
第二节 字符串类型(String)
简介
String是Redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value。
String类型是二进制安全的。意味着Redis的string可以包含任何数据。比如jpg图片或者序列化的对象。
String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M
常用命令
语法 | 解释 |
---|---|
set <key><value> | 添加键值对 |
set 参数 | NX:当数据库中key不存在时,可以将key-value添加数据库 |
set 参数 | XX:当数据库中key存在时,可以将key-value添加数据库,与NX参数互斥 |
set 参数 | EX:key的超时秒数 |
set 参数 | PX:key的超时毫秒数,与EX互斥 |
get <key> | 查询对应键值 |
append <key><value> | 将给定的<value> 追加到原值的末尾 |
strlen <key> | 获得值的长度 |
setnx <key><value> | 只有在 key 不存在时 设置 key 的值 |
incr <key> | 将 key 中储存的数字值增1,只能对数字值操作,如果为空,新增值为1 |
decr <key> | 将 key 中储存的数字值减1,只能对数字值操作,如果为空,新增值为-1 |
incrby / decrby <key> <步长> | 将 key 中储存的数字值增减。自定义步长 |
mset <key1><value1><key2><value2> ..... | 同时设置一个或多个 key-value对 |
mget <key1><key2><key3> ..... | 同时获取一个或多个 value |
msetnx <key1><value1><key2><value2> ..... | 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。有一个失败则都失败(原子性) |
getrange <key><起始位置><结束位置> | 获得值的范围,类似java中的substring,前包,后包(索引从0开始)。 |
setrange <key><起始位置><value> | 用 <value> 覆写<key>所储存的字符串值,从<起始位置>开始(索引从0开始)。 |
setex <key> <过期时间> <value> | 设置键值的同时,设置过期时间,单位秒。 |
getset <key><value> | 以新换旧,设置了新值同时获得旧值。 |
redis指令运行的原子性
- 所谓原子操作是指
不会被线程调度机制打断的操作
;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。- (1)在单线程中, 能够在单条指令中完成的操作都可以认为是"原子操作",因为中断只能发生于指令之间。
- (2)在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。
- (3)Redis单命令的原子性主要得益于的单线程。
问题 JAVA中的 a++ 是否具有原子性
原子性:即不可分割性。比如 a=0;(a非long和double类型) 这个操作是不可分割的,那么我们说这个操作是原子操作。再比如:a++; 这个操作实际是a = a + 1;是可分割的,所以他不是一个原子操作。非原子操作都会存在线程安全问题,需要使用同步技术(sychronized)或者锁(Lock)来让它变成一个原子操作。一个操作是原子操作,那么我们称它具有原子性。
数据结构
String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配.
如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M。
第三节 Redis 列表(List)
简介
单键多值, 一个键下的value是一个List.Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。
常用命令
语法 | 功能 |
---|---|
lpush/rpush <key><value1><value2><value3> .... | 从左边/右边插入一个或多个值。 |
lpop/rpop <key> | 从左边/右边吐出一个值。值在键在,值光键亡。 |
rpop/lpush <key1><key2> | 从<key1>列表右边吐出一个值,插到<key2>列表左边 |
lrange <key><start><stop> | 按照索引下标获得元素(从左到右) |
0左边第一个,-1右边第一个,(0-1表示获取所有) | |
lindex <key><index> | 按照索引下标获得元素(从左到右) |
llen <key> | 获得列表长度 |
linsert <key> before <value><newvalue> | 在<value>的前面插入<newvalue>插入值 |
linsert <key> after <value><newvalue> | 在<value>的后面插入<newvalue>插入值 |
lrem <key><n><value> | 从左边删除n个value(从左到右) |
lset<key><index><value> | 将列表key下标为index的值替换成value |
数据结构
List的数据结构为快速链表quickList
。首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也即是压缩列表。它将所有的元素紧挨着一起存储,分配的是一块连续的内存。当数据量比较多的时候才会改成quicklist。因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next。
Redis将链表和ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。
第四节Redis 集合(Set)
简介
Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。
Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的复杂度都是O(1)。一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变
常用命令
语法 | 功能 |
---|---|
sadd <key><value1><value2> ..... | 将一个或多个 member 元素加入到集合 key 中,已经存在的 member 元素将被忽略 |
smembers <key> | 取出该集合的所有值。 |
sismember <key><value> | 判断集合<key>是否为含有该<value>值,有1,没有0 |
scard<key> | 返回该集合的元素个数。 |
srem <key><value1><value2> .... | 删除集合中的某个元素。 |
spop <key> | 随机从该集合中吐出一个值 |
spop <key><N> | 随机从该集合中吐出N个值。 |
srandmember <key><n> | 随机从该集合中取出n个值。不会从集合中删除 。 |
smove <source><destination><value> | 把集合中一个值从一个集合移动到另一个集合 SMOVE set set2 a |
sinter <key1><key2> | 返回两个集合的交集元素。 |
sunion <key1><key2> | 返回两个集合的并集元素。 |
sdiff <key1><key2> | 返回两个集合的差集元素(key1中的,不包含key2中的) |
数据结构
Set数据结构是dict字典,字典是用哈希表实现的。Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的set结构也是一样,它内部也使用hash结构,所有value都指向同一个内部值。
第五节 Redis 哈希(Hash)
简介
Redis hash 是一个键值对集合。Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。类似Java里面的Map<String,Object>用户ID为查找的key,存储的value用户对象包含姓名,年龄,生日等信息
-
方式1 单key+序列化 .问题:每次修改用户的某个属性需要,先反序列化改好后再序列化回去。开销较大。
-
方式2 多key-value .问题:用户ID数据冗余
-
方式3 单key + 多(field+value)
- 通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据了,既不需要重复存储数据,也不会带来序列化和并发修改控制的问题
常用命令
语法 | 功能 |
---|---|
hset <key><field><value> | 给<key>集合中的 <field>键赋值<value> |
hget <key1><field> | 从<key1>集合<field>取出 value |
hmset <key1><field1><value1><field2><value2>... | 批量设置hash的值 |
hexists<key1><field> | 查看哈希表 key 中,给定域 field 是否存在。 |
hkeys <key> | 列出该hash集合的所有field |
hvals <key> | 列出该hash集合的所有value |
hincrby <key><field><increment> | 为哈希表 key 中的域 field 的值加上增量 1 -1 |
hsetnx <key><field><value> | 将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field 不存在 . |
数据结构
Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。
第六节 Redis 有序集合Zset
简介
Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串集合。不同之处是有序集合的每个成员都关联了一个评分(score),这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了 。因为元素是有序的, 所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表。
常用命令
语法 | 功能 |
---|---|
zadd <key><score1><value1><score2><value2>… | 将一个或多个 member 元素及其 score 值加入到有序集 key 当中。 |
zrange<key><start><stop> [WITHSCORES] | 升序返回有序集 key 中,下标在<start><stop>之间的元素,0代表第一个元素索引,-1代表最后一个元素索引.带WITHSCORES,可以让分数一起和值返回到结果集。 |
zrevrange <key><start><stop> [WITHSCORES] | 降序返回有序集 key 中,下标在<start><stop>之间的元素,0代表第一个元素索引,-1代表最后一个元素索引.带WITHSCORES,可以让分数一起和值返回到结果集 |
zrangebyscore <key> <min> <max> [withscores] [limit offset count] | 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。 |
zrevrangebyscore <key> <max> <min> [withscores] [limit offset count] | 同上,改为从大到小排列。 |
zincrby <key><increment><value> | 对有序集合中指定成员的分数加上增量 increment |
zrem <key><value> | 删除该集合下,指定值的元素 |
zcount <key><min><max> | 统计该集合,分数区间内的元素个数 |
zrank <key><value> | 返回该值在集合中的排名,从0开始。 |
数据结构
SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。
Jedis客户端程序
Jedis简介
Redis不仅是使用命令来操作,现在基本上主流的语言都有客户端支持,比如java、C、C#、C++、php、Node.js、Go等。在官方网站里列一些Java的客户端,有Jedis、Redisson、Jredis、JDBC-Redis、等其中官方推荐使用Jedis和Redisson。 在企业中用的最多的就是Jedis。Jedis提供了完整Redis命令,而Redisson有更多分布式的容器实现。
环境准备
1 创建maven普通项目,导入如下依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
2 虚拟机和Redis设置
- 禁用Linux的防火墙:Linux(CentOS7)里执行命令
- systemctl stop/disable firewalld.service
- redis.conf中注释掉bind 127.0.0.1 ,然后 protected-mode 的值设置为no
3 测试JAVA程序和Redis之间的通信
package com.chs.jedis;
import redis.clients.jedis.Jedis;
public class Demo01 {
@Test
public void TestPing() {
Jedis jedis = new Jedis("linuxIP",6379);
String pong = jedis.ping();
System.out.println("连接成功:"+pong);
jedis.close();
}
}
key相关API
@Test
public void testKeyAPI(){
jedis.set("k1", "v1");
// 添加 键值对并设置过期时间
jedis.setex("k2",100, "v2");
jedis.set("k3", "v3");
// 获取所有的键
Set<String> keys = jedis.keys("*");
System.out.println(keys.size());
for (String key : keys) {
System.out.println(key);
}
// 判断某个键是否存在
System.out.println(jedis.exists("k1"));
// 查看键剩余过期时间
System.out.println(jedis.ttl("k2"));
// 根据键获取值
System.out.println(jedis.get("k1"));
}
String相关API
// 添加String
System.out.println(jedis.set("k1", "v1"));
// 一次添加多个
System.out.println(jedis.mset("ka","aaa","kb","bbb"));
// 获取
System.out.println(jedis.get("k1"));
// 一次获取多个
System.out.println(jedis.mget("k1","ka","kb"));
// 追加
System.out.println(jedis.append("k1", "vvvvv"));
// 获取长度
System.out.println(jedis.strlen("k1"));
// 不存在时进行设置
System.out.println(jedis.setnx("k1","xxxxx"));
System.out.println(jedis.setnx("k2","10"));
// 增长/减少
System.out.println(jedis.incr("k2"));
System.out.println(jedis.decr("k2"));
System.out.println(jedis.incrBy("k2", 10));
System.out.println(jedis.decrBy("k2", 10));
List相关API
@Test
public void testList(){
// 放入List
Long lpush = jedis.lpush("klist", "a", "b", "c", "d", "d");
System.out.println(lpush);
// 获取List
List<String> kList = jedis.lrange("klist", 0, -1);
kList.forEach(System.out::println);
// 取值
String klist = jedis.lpop("klist");
}
Set相关API
@Test
public void testSet(){
// 添加一个set集合
jedis.sadd("skey","a","b","c","d","e");
// 获取制定的set集合
Set<String> skey = jedis.smembers("skey");
skey.forEach(System.out::println);
//判断是否包含
System.out.println(jedis.sismember("skey","a"));
//删除元素
jedis.srem("skey","a","b");
//弹出一个元素
System.out.println(jedis.spop("skey"));
//弹出N个元素
System.out.println(jedis.spop("skey",2));
//从一个set向另一个set移动元素
jedis.smove("skey","bkey","X");
// ……
}
Hash相关API
// 添加值
jedis.hset("player1","pname","宇智波赵四儿");
jedis.hset("player1","page","14");
jedis.hset("player1","gender","boy");
// 获取值
System.out.println(jedis.hget("player1","pname"));
// 批量添加值
Map<String,String> player2=new HashMap<String,String>();
player2.put("pname","旋涡刘能");
player2.put("page","13");
player2.put("gender","boy");
jedis.hmset("player2",player2);
// 查看filed是否存在
System.out.println(jedis.hexists("player1", "pname"));
// 查看集合中所有的field
Set<String> player1fields = jedis.hkeys("player1");
player1fields.forEach(System.out::println);
// 查看集合中所有的value
List<String> player1vals = jedis.hvals("player1");
player1vals.forEach(System.out::println);
// 给制定属性+1
jedis.hincrBy("player1","page",5);
// 如不存在,添加某个属性
jedis.hsetnx("player1","height","156");
System.out.println(jedis.hget("player1","page"));
System.out.println(jedis.hget("player1","height"));
ZSet相关API
// 准备数据
Map<String ,Double> map=new HashMap<>();
map.put("李四",11d);
map.put("王五",8d);
map.put("赵六",20d);
map.put("刘七",3d);
// 添加元素
jedis.zadd("zkey",10,"张三");
jedis.zadd("zkey",map);
// 升序返回有序
Set<String> zkeys = jedis.zrange("zkey", 0, -1);
zkeys.forEach(System.out::println);
// 降序返回元素
Set<String> zkeys2 = jedis.zrevrange("zkey", 0, -1);
zkeys2.forEach(System.out::println);
System.out.println("===========");
Set<String> zkeys3 = jedis.zrangeByScore("zkey", 10, 20);
zkeys3.forEach(System.out::println);
System.out.println("===========");
Set<String> zkeys4 = jedis.zrevrangeByScore("zkey", 20, 10);
zkeys4.forEach(System.out::println);
// 增加分数
jedis.zincrby("zkey",5,"张三");
jedis.zincrby("zkey",-5,"赵六");
// 删除 元素
jedis.zrem("zkey","张三");
System.out.println(jedis.zcount("zkey",10,20));
System.out.println(jedis.zrank("zkey","李四"));
SpringBoot整合Redis
创建sping-boot工程
添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
创建配置文件
application.properties
spring.data.redis.host=linuxIP
spring.data.redis.port=6379
#spring.data.redis.client-type=lettuce
#设置lettuce的底层参数
#spring.data.redis.lettuce.pool.enabled=true
#spring.data.redis.lettuce.pool.max-active=8
#切换jedis
spring.data.redis.client-type=jedis
spring.data.redis.jedis.pool.enabled=true
spring.data.redis.jedis.pool.max-active=8
application.yaml
#SpringBoot整合Redis
spring:
data:
redis:
host: linuxIP
port: 6379
RedisTemplate、StringRedisTemplate: 操作redis的工具类
-
要从redis的连接工厂获取链接才能操作redis
-
Redis客户端
-
- Lettuce: 默认
- Jedis:可以使用以下切换
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 切换 jedis 作为操作redis的底层客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
创建启动类
package com.chs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
序列化定制
@Configuration
public class AppRedisConfiguration {
/**
* 允许Object类型的key-value,都可以被转为json进行存储。
* @param redisConnectionFactory 自动配置好了连接工厂
* @return
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//把key转为字符串的序列化工具
template.setKeySerializer(new StringRedisSerializer());
//把对象转为json字符串的序列化工具
template.setDefaultSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
编写测试方法
创建测试类,编写测试方法
@Autowired
RedisTemplate redisTemplate;
@Test
void redisTest(){
//测试String
redisTemplate.opsForValue().set("a","1234");
System.out.println(redisTemplate.opsForValue().get("a"));
//测试list集合
List list = new ArrayList<>();
list.add("lucy");
list.add("mary");
redisTemplate.opsForValue().set("abc",list);
System.out.println(redisTemplate.opsForValue().get("abc"));
}
StringRedisTemplate和RedisTemplate
StringRedisTemplate.opsForValue(). //操作String字符串类型*
StringRedisTemplate.delete(key/collection) //根据key/keys删除
StringRedisTemplate.opsForList().* //操作List类型
StringRedisTemplate.opsForHash().* //操作Hash类型
StringRedisTemplate.opsForSet().* //操作set类型
StringRedisTemplate.opsForZSet(). //操作有序set*
String
缓存:将数据以字符串方式存储
计数器功能:比如视频播放次数,点赞次数。
共享session:数据共享的功能,redis作为单独的应用软件用来存储一些共享数据供多个实例访问。
字符串的使用空间非常大,可以结合字符串提供的命令充分发挥自己的想象力
hash
字典。键值对集合,即编程语言中的Map类型。适合存储对象,并且可以像数据库中update一个属性一样只修改某一项属性值。适用于:存
储、读取、修改用户属性。也可以用Hash做表数据缓存
list
链表(双向链表),增删快,提供了操作某一段元素的API。适用于:最新消息排行等功能;消息队列。
set
集合。哈希表实现,元素不重复,为集合提供了求交集、并集、差集等操作。适用于:共同好友;利用唯一性,统计访问网站的所有独立ip;> 好友推荐时,根据tag求交集,大于某个阈值就可以推荐。
sorted set
有序集合。将Set中的元素增加一个权重参数score,元素按score有序排列。数据插入集合时,已经进行天然排序。适用于:排行榜;带权重的消息队列。
//向redis里存入数据和设置缓存时间
stringRedisTemplate.opsForValue().set("test", "100",60*10,TimeUnit.SECONDS);
//val做-1操作
stringRedisTemplate.boundValueOps("test").increment(-1);
//val +1
stringRedisTemplate.boundValueOps("test").increment(1);
//根据key获取缓存中的val
stringRedisTemplate.opsForValue().get("test");
//根据key获取过期时间
stringRedisTemplate.getExpire("test")
//根据key获取过期时间并换算成指定单位
stringRedisTemplate.getExpire("test",TimeUnit.SECONDS)
//根据key删除缓存
stringRedisTemplate.delete("test");
//检查key是否存在,返回boolean值
stringRedisTemplate.hasKey("546545");
//向指定key中存放set集合
stringRedisTemplate.opsForSet().add("red_123", "1","2","3");
//设置过期时间
stringRedisTemplate.expire("red_123",1000 , TimeUnit.MILLISECONDS);
//根据key查看集合中是否存在指定数据
stringRedisTemplate.opsForSet().isMember("red_123", "1")
//根据key获取set集合
stringRedisTemplate.opsForSet().members("red_123");
@Component
public class RedisString {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 新增一个字符串类型的值,key是键,value是值。
*
* set(K key, V value)
*/
public void set() {
// 存入永久数据
stringRedisTemplate.opsForValue().set("test2", "1");
// 也可以向redis里存入数据和设置缓存时间
stringRedisTemplate.opsForValue().set("test1", "hello redis", 1000, TimeUnit.SECONDS);
}
/**
* 批量插入,key值存在会覆盖原值
*
* multiSet(Map<? extends K,? extends V> map)
*/
public void multiSet() {
Map<String,String> map = new HashMap<>(16);
map.put("testMultiSet1", "value0");
map.put("testMultiSet2", "value2");
stringRedisTemplate.opsForValue().multiSet(map);
}
/**
* 批量插入,如果里面的所有key都不存在,则全部插入,返回true,如果其中一个在redis中已存在,全不插入,返回false
*
* multiSetIfAbsent(Map<? extends K,? extends V> map)
*/
public void multiSetIfAbsent() {
Map<String,String> map = new HashMap<>(16);
map.put("testMultiSet4", "value1");
map.put("testMultiSet3", "value3");
Boolean absent = stringRedisTemplate.opsForValue().multiSetIfAbsent(map);
System.out.println(absent);
}
/**
* 如果不存在则插入,返回true为插入成功,false失败
*
* setIfAbsent(K key, V value)
*/
public void setIfAbsent() {
Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent("test", "hello redis");
System.out.println(absent);
}
/**
* 获取值,key不存在返回null
*
* get(Object key)
*/
public void get() {
System.out.println(stringRedisTemplate.opsForValue().get("testMultiSet1"));
}
/**
* 批量获取,key不存在返回null
*
* multiGet(Collection<K> keys)
*/
public void multiGet() {
List<String> list = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("test", "test2"));
assert list != null;
System.out.println(list.toString());
}
/**
* 获取指定字符串的长度。
*
* size(K key)
*/
public void getLength() {
Long size = stringRedisTemplate.opsForValue().size("test");
System.out.println(size);
}
/**
* 在原有的值基础上新增字符串到末尾。
*
* append(K key, String value)
*/
public void append() {
Integer append = stringRedisTemplate.opsForValue().append("test3", "database");
System.out.println(append);
}
/**
* 获取原来key键对应的值并重新赋新值
*
* getAndSet(K key, V value)
*/
public void getAndSet() {
String set = stringRedisTemplate.opsForValue().getAndSet("test", "set test");
System.out.println(set);
}
/**
* 获取指定key的值进行减1,如果value不是integer类型,会抛异常,如果key不存在会创建一个,默认value为0
*
* decrement(k key)
*/
public void decrement() {
stringRedisTemplate.opsForValue().decrement("test2");
stringRedisTemplate.opsForValue().decrement("test1");
}
/**
* 获取指定key的值进行加1,如果value不是integer类型,会抛异常,如果key不存在会创建一个,默认value为0
*
* increment(k key)
*/
public void increment() {
stringRedisTemplate.opsForValue().increment("test2");
stringRedisTemplate.opsForValue().increment("test1");
}
/**
* 删除指定key,成功返回true,否则false
*
* delete(k key)
*/
public void delete() {
Boolean delete = stringRedisTemplate.opsForValue().getOperations().delete("test1");
System.out.println(delete);
}
/**
* 删除多个key,返回删除key的个数
*
* delete(k ...keys)
*/
public void deleteMulti() {
Long delete = stringRedisTemplate.opsForValue().getOperations().delete(Arrays.asList("test1", "test2"));
System.out.println(delete);
}
}
opsForList(操作集合)
@Component
public class RedisList {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 在变量左边添加元素值。如果key不存在会新建,添加成功返回添加后的总个数
*
* leftPush(K key, V value)
*/
public void leftPush() {
Long aLong = stringRedisTemplate.opsForList().leftPush("list", "a");
System.out.println(aLong);
}
/**
* 向左边批量添加参数元素,如果key不存在会新建,添加成功返回添加后的总个数
*
* leftPushAll(K key, V... values)
*/
public void leftPushAll() {
Long pushAll = stringRedisTemplate.opsForList().leftPushAll("list", "e", "f", "g");
System.out.println(pushAll);
}
/**
* 向集合最右边添加元素。如果key不存在会新建,添加成功返回添加后的总个数
*
* rightPush(K key, V value)
*/
public void rightPush() {
Long aLong = stringRedisTemplate.opsForList().rightPush("list2", "a");
System.out.println(aLong);
}
/**
* 如果存在集合则添加元素。
*
* leftPushIfPresent(K key, V value)
*/
public void leftPushIfPresent() {
Long aLong = stringRedisTemplate.opsForList().leftPushIfPresent("list", "h");
System.out.println(aLong);
}
/**
* 向右边批量添加元素。返回当前集合元素总个数
*
* rightPushAll(K key, V... values)
*/
public void rightPushAll() {
Long aLong = stringRedisTemplate.opsForList().rightPushAll("list2", "b", "c", "d");
System.out.println(aLong);
}
/**
* 向已存在的集合中添加元素。返回集合总元素个数
*
* rightPushIfPresent(K key, V value)
*/
public void rightPushIfPresent() {
Long aLong = stringRedisTemplate.opsForList().rightPushIfPresent("list", "e");
System.out.println(aLong);
}
/**
* 获取集合长度
*
* size(K key)
*/
public void size() {
Long size = stringRedisTemplate.opsForList().size("list2");
System.out.println(size);
}
/**
* 移除集合中的左边第一个元素。返回删除的元素,如果元素为空,该集合会自动删除
*
* leftPop(K key)
*/
public void leftPop() {
String pop = stringRedisTemplate.opsForList().leftPop("list2");
System.out.println(pop);
}
/**
* 移除集合中左边的元素在等待的时间里,如果超过等待的时间仍没有元素则退出。
*
* leftPop(K key, long timeout, TimeUnit unit)
*/
public void leftPopWait() {
String pop = stringRedisTemplate.opsForList().leftPop("list2", 10, TimeUnit.SECONDS);
System.out.println(pop);
}
/**
* 移除集合中右边的元素。返回删除的元素,如果元素为空,该集合会自动删除
*
* rightPop(K key)
*/
public void rightPop() {
String pop = stringRedisTemplate.opsForList().rightPop("list2");
System.out.println(pop);
}
/**
* 移除集合中右边的元素在等待的时间里,如果超过等待的时间仍没有元素则退出。
*
* rightPop(K key, long timeout, TimeUnit unit)
*/
public void rightPopWait() {
String pop = stringRedisTemplate.opsForList().rightPop("list2", 10, TimeUnit.SECONDS);
System.out.println(pop);
}
/**
* 移除第一个集合右边的一个元素,插入第二个集合左边插入这个元素
*
* rightPopAndLeftPush(K sourceKey, K destinationKey)
*/
public void rightPopAndLeftPush() {
String s = stringRedisTemplate.opsForList().rightPopAndLeftPush("list2", "list3");
System.out.println(s);
}
/**
* 在集合的指定位置插入元素,如果指定位置已有元素,则覆盖,没有则新增,超过集合下标+n则会报错。
*
* set(K key, long index, V value)
*/
public void set() {
stringRedisTemplate.opsForList().set("list2", 2, "w");
}
/**
* 从存储在键中的列表中删除等于值的元素的第一个计数事件。count> 0:删除等于从左到右移动的值的第一个元素;
* count< 0:删除等于从右到左移动的值的第一个元素;count = 0:删除等于value的所有元素
*
* remove(K key, long count, Object value)
*/
public void remove() {
Long remove = stringRedisTemplate.opsForList().remove("list2", 2, "w");
System.out.println(remove);
}
/**
* 截取集合元素长度,保留长度内的数据。
*
* trim(K key, long start, long end)
*/
public void trim() {
stringRedisTemplate.opsForList().trim("list2", 0, 3);
}
/**
* 获取集合指定位置的值。
*
* index(K key, long index)
*/
public void index() {
Object listValue = stringRedisTemplate.opsForList().index("list2", 3);
System.out.println(listValue);
}
/**
* 获取指定区间的值。
*
* range(K key, long start, long end)
*/
public void range() {
List<String> list = stringRedisTemplate.opsForList().range("list", 0, -1);
System.out.println(list);
}
/**
* 删除指定集合,返回true删除成功
*
* delete(K key)
*/
public void delete() {
Boolean delete = stringRedisTemplate.opsForList().getOperations().delete("list2");
System.out.println(delete);
}
}
opsForHash(操作hashMap)
@Component
public class RedisHash {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 新增hashMap值
*
* put(H key, HK hashKey, HV value)
*/
public void put() {
stringRedisTemplate.opsForHash().put("hash","hash-key","hash-value");
stringRedisTemplate.opsForHash().put("hash","hash-key2","hash-value2");
}
/**
* 以map集合的形式添加键值对
*
* putAll(H key, Map<? extends HK,? extends HV> m)
*/
public void putAll() {
Map<String, String> map = new HashMap<>(16);
map.put("hash-key3", "value3");
map.put("hash-key4", "value4");
stringRedisTemplate.opsForHash().putAll("hash", map);
}
/**
* 如果变量值存在,在变量中可以添加不存在的的键值对,如果变量不存在,则新增一个变量,同时将键值对添加到该变量。添加成功返回true否则返回false
*
* putIfAbsent(H key, HK hashKey, HV value)
*/
public void putIfAbsent() {
Boolean absent = stringRedisTemplate.opsForHash().putIfAbsent("hash", "hash-key", "value1");
Boolean absent2 = stringRedisTemplate.opsForHash().putIfAbsent("hash", "hash-key5", "value5");
System.out.println(absent);
System.out.println(absent2);
}
/**
* 获取指定变量中的hashMap值。
*
* values(H Key)
*/
public void values() {
List<Object> values = stringRedisTemplate.opsForHash().values("hash2");
System.out.println(values.toString());
}
/**
* 获取变量中的键值对。
*
* entries(H key)
*/
public void entries() {
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries("hash");
System.out.println(entries.toString());
}
/**
* 获取变量中的指定map键是否有值,如果存在该map键则获取值,没有则返回null。
*
* get(H key, Object hashKey)
*/
public void get() {
Object value = stringRedisTemplate.opsForHash().get("hash", "hash-key");
System.out.println(value);
}
/**
* 获取变量中的键。
*
* keys(H key)
*/
public void keys() {
Set<Object> keys = stringRedisTemplate.opsForHash().keys("hash");
System.out.println(keys.toString());
}
/**
* 获取变量的长度
*
* size(H key)
*/
public void size() {
Long size = stringRedisTemplate.opsForHash().size("hash");
System.out.println(size);
}
/**
* 使变量中的键以long值的大小进行自增长。值必须为Integer类型,否则异常
*
* increment(H key, HK hashKey, long data)
*/
public void increment() {
Long increment = stringRedisTemplate.opsForHash().increment("hash", "hash-key2", 1);
System.out.println(increment);
}
/**
* 以集合的方式获取变量中的值。
*
* multiGet(H key, Collection<HK> hashKeys)
*/
public void multiGet() {
List<Object> values = stringRedisTemplate.opsForHash().multiGet("hash", Arrays.asList("hash-key", "hash-key2"));
System.out.println(values.toString());
}
/**
* 匹配获取键值对,ScanOptions.NONE为获取全部键对,ScanOptions.scanOptions().match("hash-key2").build()匹配获取键位map1的键值对,不能模糊匹配。
*
* scan(H key, ScanOptions options)
*/
public void scan() {
Cursor<Map.Entry<Object, Object>> scan = stringRedisTemplate.opsForHash().scan("hash", ScanOptions.NONE);
while (scan.hasNext()) {
Map.Entry<Object, Object> next = scan.next();
System.out.println(next.getKey() + "---->" + next.getValue());
}
}
/**
* 删除变量中的键值对,可以传入多个参数,删除多个键值对。返回删除成功数量
*
* delete(H key, Object... hashKeys)
*/
public void delete() {
Long delete = stringRedisTemplate.opsForHash().delete("hash", "hash-key", "hash-key1");
System.out.println(delete);
}
}
opsForSet(操作set集合)
@Component
public class RedisSet {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 向变量中批量添加值。返回添加的数量
*
* add(K key, V... values)
*/
public void add() {
Long add = stringRedisTemplate.opsForSet().add("set", "a", "b", "c");
System.out.println(add);
}
/**
* 获取变量的值
*
* members(K key)
*/
public void members() {
Set<String> set = stringRedisTemplate.opsForSet().members("set");
System.out.println(set);
}
/**
* 获取变量中值得长度
*
* size(k key)
*/
public void size() {
Long size = stringRedisTemplate.opsForSet().size("set");
System.out.println(size);
}
/**
* 随机获取变量中的某个元素
*
* randomMember(k key)
*/
public void randomMember() {
String member = stringRedisTemplate.opsForSet().randomMember("set");
System.out.println(member);
}
/**
* 随机获取变量中指定个数的元素
*
* randomMembers(k key, long count)
*/
public void randomMembers() {
List<String> members = stringRedisTemplate.opsForSet().randomMembers("set", 2);
System.out.println(members);
}
/**
* 检查给定的元素是否在变量中,true为存在
*
* isMember(k key, object value)
*/
public void isMember() {
Boolean member = stringRedisTemplate.opsForSet().isMember("set", "b");
System.out.println(member);
}
/**
* 转义变量的元素值到另一个变量中
*
* move(k key, v value, k targetKey)
*/
public void move() {
Boolean move = stringRedisTemplate.opsForSet().move("set", "b", "set2");
System.out.println(move);
}
/**
* 弹出变量中的元素。当元素全部弹完,变量也会删除
*
* pop(k key)
*/
public void pop() {
String pop = stringRedisTemplate.opsForSet().pop("set");
System.out.println(pop);
}
/**
* 批量删除变量中的元素,返回删除的数量
*
* remove(k key, v ...values)
*/
public void remove() {
Long remove = stringRedisTemplate.opsForSet().remove("set2", "b");
System.out.println(remove);
}
/**
* 匹配获取键值对,ScanOptions.NONE为获取全部键值对;ScanOptions.scanOptions().match("C").build()匹配获取键位map1的键值对,不能模糊匹配。
*
* scan(K key, ScanOptions options)
*/
public void scan() {
Cursor<String> set = stringRedisTemplate.opsForSet().scan("set", ScanOptions.NONE);
while (set.hasNext()) {
String next = set.next();
System.out.println(next);
}
}
/**
* 通过集合求差值。
*
* difference(k key, k otherKey)
*/
public void difference() {
Set<String> difference = stringRedisTemplate.opsForSet().difference("set", "set2");
System.out.println(difference);
}
/**
* 将求出来的差值元素保存
*
* differenceAndStore(K key, K otherKey, K targetKey)
*/
public void differenceAndStore() {
Long aLong = stringRedisTemplate.opsForSet().differenceAndStore("set", "set2", "set3");
System.out.println(aLong);
}
/**
* 获取去重的随机元素
*
* distinctRandomMembers(K key, long count)
*/
public void distinctRandomMembers() {
Set<String> set = stringRedisTemplate.opsForSet().distinctRandomMembers("set", 2);
System.out.println(set);
}
/**
* 获取两个变量中的交集
*
* intersect(K key, K otherKey)
*/
public void intersect() {
Set<String> intersect = stringRedisTemplate.opsForSet().intersect("set", "set2");
System.out.println(intersect);
}
/**
* 获取2个变量交集后保存到最后一个变量上。
*
* intersectAndStore(K key, K otherKey, K targetKey)
*/
public void intersectAndStore() {
Long aLong = stringRedisTemplate.opsForSet().intersectAndStore("set", "set2", "set3");
System.out.println(aLong);
}
/**
* 获取两个变量的合集
*
* union(K key, K otherKey)
*/
public void union() {
Set<String> union = stringRedisTemplate.opsForSet().union("set", "set2");
System.out.println(union);
}
/**
* 获取两个变量合集后保存到另一个变量中
*
* unionAndStore(K key, K otherKey, K targetKey)
*/
public void unionAndStore() {
Long aLong = stringRedisTemplate.opsForSet().unionAndStore("set", "set2", "set3");
System.out.println(aLong);
}
}
opsForZset
@Component
public class RedisZSet {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 添加元素到变量中同时指定元素的分值。
*
* add(K key, V value, double score)
*/
public void add() {
Boolean add = stringRedisTemplate.opsForZSet().add("zset", "a", 1);
System.out.println(add);
}
/**
* 通过TypedTuple方式新增数据。
*
* add(K key, Set<ZSetOperations.TypedTuple<V>> tuples)
*/
public void addByTypedTuple() {
ZSetOperations.TypedTuple<String> typedTuple1 = new DefaultTypedTuple<>("E", 2.0);
ZSetOperations.TypedTuple<String> typedTuple2 = new DefaultTypedTuple<>("F", 3.0);
ZSetOperations.TypedTuple<String> typedTuple3 = new DefaultTypedTuple<>("G", 5.0);
Set<ZSetOperations.TypedTuple<String>> typedTupleSet = new HashSet<>();
typedTupleSet.add(typedTuple1);
typedTupleSet.add(typedTuple2);
typedTupleSet.add(typedTuple3);
Long zset = stringRedisTemplate.opsForZSet().add("zset", typedTupleSet);
System.out.println(zset);
}
/**
* 获取指定区间的元素
*
* range(k key, long start, long end)
*/
public void range() {
Set<String> zset = stringRedisTemplate.opsForZSet().range("zset", 0, -1);
System.out.println(zset);
}
/**
* 用于获取满足非score的排序取值。这个排序只有在有相同分数的情况下才能使用,如果有不同的分数则返回值不确定。
*
* rangeByLex(K key, RedisZSetCommands.Range range)
*/
public void rangeByLex() {
Set<String> rangeByLex = stringRedisTemplate.opsForZSet().rangeByLex("zset", RedisZSetCommands.Range.range().lt("E"));
System.out.println(rangeByLex);
}
/**
* 用于获取满足非score的设置下标开始的长度排序取值。
*
* rangeByLex(k key, range range, limit limit)
*/
public void rangeByLexAndLimit() {
Set<String> zset = stringRedisTemplate.opsForZSet().rangeByLex("zset", RedisZSetCommands.Range.range().lt("E"),
RedisZSetCommands.Limit.limit().offset(1).count(2));
System.out.println(zset);
}
/**
* 根据设置的score获取区间值。
*
* rangeByScore(K key, double min, double max)
*/
public void rangeByScore() {
Set<String> zset = stringRedisTemplate.opsForZSet().rangeByScore("zset", 1, 3);
System.out.println(zset);
}
/**
* 获取RedisZSetCommands.Tuples的区间值。
*
* rangeWithScores(K key, long start, long end)
*/
public void rangeWithScores() {
Set<ZSetOperations.TypedTuple<String>> zset = stringRedisTemplate.opsForZSet().rangeWithScores("zset", 1, 3);
assert zset != null;
for (ZSetOperations.TypedTuple<String> next : zset) {
String value = next.getValue();
Double score = next.getScore();
System.out.println(value + "-->" + score);
}
}
/**
* 获取区间值的个数。
*
* count(k key, double min, double max)
*/
public void count() {
Long zset = stringRedisTemplate.opsForZSet().count("zset", 1, 3);
System.out.println(zset);
}
/**
* 获取变量中指定元素的索引,下标开始为0
*
* rank(k key, object o)
*/
public void rank() {
Long rank = stringRedisTemplate.opsForZSet().rank("zset", "a");
System.out.println(rank);
}
/**
* 匹配获取键值对,ScanOptions.NONE为获取全部键值对;ScanOptions.scanOptions().match("C").build()匹配获取键位map1的键值对,不能模糊匹配。
*
* scan(K key, ScanOptions options)
*/
public void scan() {
Cursor<ZSetOperations.TypedTuple<String>> zset = stringRedisTemplate.opsForZSet().scan("zset", ScanOptions.NONE);
while (zset.hasNext()) {
ZSetOperations.TypedTuple<String> next = zset.next();
System.out.println(next.getValue() + "-->" + next.getScore());
}
}
/**
* 获取指定元素的分值
*
* score(k key, object o)
*/
public void score() {
Double score = stringRedisTemplate.opsForZSet().score("zset", "a");
System.out.println(score);
}
/**
* 获取变量中元素的个数
*
* zCard(k key)
*/
public void zCard() {
Long zset = stringRedisTemplate.opsForZSet().zCard("zset");
System.out.println(zset);
}
/**
* 修改变量中元素的分值
*
* incrementScore(K key, V value, double delta)
*/
public void incrementScore() {
Double score = stringRedisTemplate.opsForZSet().incrementScore("zset", "a", 2);
System.out.println(score);
}
/**
* 索引倒序排列指定区间的元素
*
* reverseRange(K key, long start, long end)
*/
public void reverseRange() {
Set<String> zset = stringRedisTemplate.opsForZSet().reverseRange("zset", 1, 3);
System.out.println(zset);
}
/**
* 倒序排列指定分值区间的元素
*
* reverseRangeByScore(K key, double min, double max)
*/
public void reverseRangeByScore() {
Set<String> zset = stringRedisTemplate.opsForZSet().reverseRangeByScore("zset", 1, 3);
System.out.println(zset);
}
/**
* 倒序排序获取RedisZSetCommands.Tuples的分值区间值
*
* reverseRangeByScore(K key, double min, double max, long offset, long count)
*/
public void reverseRangeByScoreLength() {
Set<String> zset = stringRedisTemplate.opsForZSet().reverseRangeByScore("zset", 1, 3, 1, 2);
System.out.println(zset);
}
/**
* 倒序排序获取RedisZSetCommands.Tuples的分值区间值。
*
* reverseRangeByScoreWithScores(K key, double min, double max)
*/
public void reverseRangeByScoreWithScores() {
Set<ZSetOperations.TypedTuple<String>> zset = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores("zset", 1, 5);
assert zset != null;
zset.iterator().forEachRemaining(e-> System.out.println(e.getValue() + "--->" + e.getScore()));
}
/**
* 获取倒序排列的索引值
*
* reverseRank(k key, object o)
*/
public void reverseRank() {
Long aLong = stringRedisTemplate.opsForZSet().reverseRank("zset", "a");
System.out.println(aLong);
}
/**
* 获取2个变量的交集存放到第3个变量里面。
*
* intersectAndStore(K key, K otherKey, K destKey)
*/
public void intersectAndStore() {
Long aLong = stringRedisTemplate.opsForZSet().intersectAndStore("zset", "zset2", "zset3");
System.out.println(aLong);
}
/**
* 获取2个变量的合集存放到第3个变量里面。 返回操作的数量
*
* unionAndStore(K key, K otherKey, K destKey)
*/
public void unionAndStore() {
Long aLong = stringRedisTemplate.opsForZSet().unionAndStore("zset", "zset2", "zset3");
System.out.println(aLong);
}
/**
* 批量移除元素根据元素值。返回删除的元素数量
*
* remove(K key, Object... values)
*/
public void remove() {
Long remove = stringRedisTemplate.opsForZSet().remove("zset", "a", "b");
System.out.println(remove);
}
/**
* 根据分值移除区间元素。返回删除的数量
*
* removeRangeByScore(k key, double min, double max)
*/
public void removeRangeByScore() {
Long zset = stringRedisTemplate.opsForZSet().removeRangeByScore("zset", 1, 3);
System.out.println(zset);
}
/**
* 根据索引值移除区间元素。返回移除的元素集合
*
* removeRange(K key, long start, long end)
*/
public void removeRange() {
Set<String> zset = stringRedisTemplate.opsForZSet().reverseRange("zset", 0, 4);
System.out.println(zset);
}
}
Redis事务和锁机制
Redis事务的定义
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。Redis事务的主要作用就是串联多个命令防止别的命令插队。
Redis事务控制命令
命令 | 功能 |
---|---|
multi | 开始组队 |
exec | 执行队列中的命令 |
discard | 取消组队 |
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。组队的过程中可以通过discard取消组队
情况1 ,组队成功,提交成功
Redis事务错误处理
情况2,组队报错,提交失败:提交失败组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消
情况3, 组队成功,提交时有成功有失败。如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,其他的命令都会执行,不会回滚。
Redis事务场景案例
场景说明
想想一个场景:有很多人想用有你的账户,同时去参加双十一抢购
一个请求想给金额减8000
一个请求想给金额减5000
一个请求想给金额减1000
悲观锁(synchronized)
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁
在更新的时候会判断一下在此期间别人有没有去更新这个数据,没有才会去执行
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。
监视和取消监视key
在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务**执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。**
取消 WATCH 命令对所有 key 的监视。如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。
Redis事务的三个特性
- 单独的隔离操作
- 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
- 没有隔离级别的概念
- 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
- 不保证原子性
- 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
Redis Lua 脚本
什么是LUA
什么是LUA脚本
Lua 是一个小巧的[脚本语言](http://baike.baidu.com/item/脚本语言),Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。
LUA脚本的优势
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。利用lua脚本淘汰用户,解决超卖问题。redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis利用其单线程的特性,用任务队列的方式解决多任务并发问题。
创建LUA脚本
创建文件夹lua,创建脚本文件test.lua
LUA脚本
local current = redis.call('GET', KEYS[1])
if current == ARGV[1]
then redis.call('SET', KEYS[1], ARGV[2])
return true
end
return false
脚本解释
-- 目的: 修改redis中 value相同的数据
-- redis执行命令
-- 从redis中 根据key获取value值 赋值给current
local current = redis.call('GET', KEYS[1])
-- 根据key获取的数据的值 是否与value1相等
if current == ARGV[1]
-- 如果相同 则执行 redis.set(key,value2) 返回true
then redis.call('SET', KEYS[1], ARGV[2])
return true
-- 如果不相同 则结束 返回false
end
return false
创建配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class AppRedisConfiguration {
//简单序列化
@Bean
public RedisTemplate<String,String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
// 设置键序列化方式
redisTemplate.setKeySerializer(new StringRedisSerializer());
// 设置简单类型值的序列化方式
redisTemplate.setValueSerializer(new StringRedisSerializer());
// 设置默认序列化方式
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
//加载lua脚本,设置返回值类型
@Bean
public RedisScript<Boolean> script() {
Resource scriptSource = new ClassPathResource("lua/test.lua");
return RedisScript.of(scriptSource, Boolean.class);
}
}
创建测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.Collections;
import java.util.List;
@SpringBootTest
public class TestLua {
@Autowired
private RedisScript<Boolean> script;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Test
public void test() {
boolean flag = checkAndSet("hello","helloworld");
System.out.println(flag ? "修改成功" : "修改失败");
// 手工添加一个值,再试试
redisTemplate.opsForValue().set("key", "hello");
boolean flag1 = checkAndSet("world","hello");
System.out.println(flag1 ? "修改成功" : "修改失败");
}
private boolean checkAndSet(String value1,String value2) {
List<String> keyList = Collections.singletonList("key");
return redisTemplate.execute(script, keyList, value1,value2);
}
}
- RedisTemplate.execute说明
RedisTemplate.execute需要传入三个值
- 第一个参数 RedisScript script: Lua脚本
- 第二个参数 List keys:集合
- 如果是单个参数,使用这个可以转换为单元素集合
- Collections.singletonList(参数);
- 多参数
List<String> keys = Arrays.asList(key1, key2, key3);
- 如果是单个参数,使用这个可以转换为单元素集合
- 第三个参数 args:ARGV,也就是其他类型参数
Redis的持久化
我们知道Redis是一个内存型数据库,内存的特性是断电或者程序退出则不保存数据,但是经过实测我们发现,Redis重启服务后,之前存储的数据仍然在,那么这就是通过持久化的方式实现的.
Redis 提供了2个不同形式的持久化方式。
- RDB(Redis DataBase)定时数据快照 默认方式
- AOF(Append Of File) 指令日志文件 手动开启
RDB持久化
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
RDB持久化流程
执行流程
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。
Fork子进程
- Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
- 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“
写时复制技术
” - 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。
RDB持计划流程图
RDB相关配置与操作
RDB文件名配置
- 在redis.conf中配置文件名称,默认为dump.rdb
RDB文件位置配置
- rdb文件的保存路径,也可以修改。默认为Redis启动时命令行所在的目录下.
- 可以通过修改该配置,将RDB文件存到系统的制定目录下dir "/root/myredis/"
RDB自动执行快照策略
- save命令临时这只快照执行策略
- 格式:save 秒钟 写操作次数 RDB是整个内存的压缩过的Snapshot,RDB的数据结构,可以配置复合的快照触发条件, 默认是1分钟至少1万个key发生变化,或5分钟至少100个key发生变化,或1个小时至少1个key发生变化。
- 禁用 不设置save指令,或者给save传入空字符串
RDB手动执行快照命令
-
save VS bgsave
- save :使用主进程进行持久化指令,save时只管保存,其它不管,全部阻塞。手动保存。不建议。
- bgsave:Redis会在后台异步进行快照操作,快照同时还可以响应客户端请求。
可以通过lastsave 命令获取最后一次成功执行快照的时间
-
flushall命令
- 执行flushall命令,也会产生dump.rdb文件,但里面是空的,无意义
-
shutdown命令
- shutdown命令在关系服务的时候也会进行自动的持久化
RDB备份异常策略
- stop-writes-on-bgsave-error 配置
- 当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes.
RDB 文件压缩配置
- rdbcompression配置
- 对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。推荐yes.
- 对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。推荐yes.
RDB文件检查完整性配置
- rdbchecksum配置
- 在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能.推荐yes.
RDB手动备份操作
-
查询rdb文件的目录 将 *.rdb的文件拷贝到别的地方
-
rdb的恢复
-
关闭Redis
-
先把备份的文件拷贝到工作目录下 cp dump2.rdb dump.rdb
-
启动Redis, 备份数据会直接加载
-
RDB禁用操作
- 修改配置文件永久禁用【save " "】
- 通过指令临时禁用
动态停止RDB:redis-cli config set save "" save后给空值,表示禁用保存策略(不建议)
RDB优势
- 适合大规模的数据恢复
- 对数据完整性和一致性要求不高更适合使用
- 节省磁盘空间
- 恢复速度快
RDB劣势
- Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
- 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
- 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
AOF持久化
Append Only File 以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
AOF持计划流程
(1)客户端的请求写命令会被append追加到AOF缓冲区内;
(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;
AOF相关配置与操作
AOF文件名配置
- 可以在redis.conf中配置文件名称,默认为 appendonly.aof
AOF文件位置路径
- Redis6中,AOF文件的保存路径,同RDB的路径一致。
- Redis7有变化:
base:基本文件
incr:增量文件
manifest:清单文件
AOF开启-修复-恢复操作
AOF的备份机制和性能虽然和RDB不同, 但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载。
-
正常恢复数据
-
修改默认的appendonly no,改为yes,开启AOP方式
-
将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir)
-
恢复:重启redis然后重新加载
-
-
异常修复数据
- 修改默认的appendonly no,改为yes
- 如遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof --fix appendonly.aof.1.incr.aof进行恢复
- 备份被写坏的AOF文件
- 恢复:重启redis,然后重新加载
AOF同步频率设置
-
appendfsync always
- 始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好
-
appendfsync everysec
- 每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
-
appendfsync no
- redis不主动进行同步,把同步时机交给操作系统。
AOF压缩配置
-
什么是文件压缩 rewrite重写?
AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof
-
如何实现重写?
- AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0版本后的重写,是指上就是把rdb 的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。
- no-appendfsync-on-rewrite 设置重写策略
- 如果 no-appendfsync-on-rewrite=yes ,不写入aof文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能)
- 如果 no-appendfsync-on-rewrite=no, 还是会把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)
-
何时触发重写?
Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发,重写虽然可以节约大量磁盘空间,减少恢复时间。但是每次重写还是有一定的负担的,因此设定Redis要满足一定条件才会进行重写。
- auto-aof-rewrite-percentage 设置重写基准值
- 文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)
- auto-aof-rewrite-min-size 设置重写基准值
- 最小文件64MB。达到这个值开始重写。
例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写?100MB 系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。
- auto-aof-rewrite-percentage 设置重写基准值
-
重写的流程是?
- (1)bgrewriteaof触发重写,判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。
- (2)主进程fork出子进程执行重写操作,保证主进程不会阻塞。
- (3)子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区,保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失。
- (4)
- 1).子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。
- 2).主进程把aof_rewrite_buf中的数据写入到新的AOF文件。
- (5)使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。
AOF的优势
- 备份机制更稳健,丢失数据概率更低。
- 可读的日志文本,通过操作AOF文件,可以处理误操作。
AOF的劣势
- 比起RDB占用更多的磁盘空间。
- 恢复备份速度要慢。
- 每次读写都同步的话,有一定的性能压力。
- 存在个别Bug,造成无法恢复。
持久化方案选择
RDB和AOP用哪个好?
- 官方推荐两个都启用。
- 如果对数据不敏感,可以选单独用RDB。
- 不建议单独用 AOF,因为可能会出现Bug。
- 如果只是做纯内存缓存,可以都不用。
- AOF和RDB如果同时开启,系统默认取AOF中的持久化数据
Redis主从复制
主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主。
主从复制的作用
- 读写分离,性能扩展
- 容灾快速恢复
主从复制具体操作
实现思路
- 1 一个redis服务作为主机,主要负责写操作
- 2 两个redis服务作为从机,主要负责读操作
- 3 从机自动从主机同步数据下来
- 4 从机主动找主机,而主机不会找从机
- 5 正常来说主机和从机应该在不同的IP上开启redis服务,我们为了快速模拟,可以在一台机器上模拟出三个redis服务即可
一台机器上启动多个redis服务
- 使用redis-server启动服务时,要以来redis.conf配置文件.那么我们可以准备三个redis.conf文件,用来配置三个不同的服务,启动三次分别以来三个不同的服务即可
新建三个redis配置文件
用于定义每个服务的专属配置文件
-
新建redis6379.conf
关闭aof功能 >redis.conf中配置》appendonly no
include /root/myredis/redis.conf # 引入共同的配置
pidfile /var/run/redis_6379.pid # 使用独立的进程文件
port 6379 # 设置当前服务的端口号
dbfilename dump6379.rdb # 使用独立的RDB持久化文件 暂时不适用AOP持久化
- 新建redis6380.conf
include /root/myredis/redis.conf # 路径可变,是你配置文件的当前路径
pidfile /var/run/redis_6380.pid # 路径可变,指定你进程文件的路径
port 6380
dbfilename dump6380.rdb
- 新建redis6381.conf
include /root/myredis/redis.conf # 路径可变,是你配置文件的当前路径
pidfile /var/run/redis_6381.pid # 路径可变,指定你进程文件的路径
port 6381
dbfilename dump6381.rdb
在redis配置文件中,设置从机的优先级,值越小,优先级越高,用于选举主机时使用。默认100
replica-priority 10
启动三个服务
使用info replication查看主从相关信息
- 连接redis,使用:redis-cli -p 端口号
- 执行 info replication查看信息
配置主从机器
-
配从不配主,是让从机主动去找主机
-
在6380 和6381的机器上执行如下命令
slaveof 127.0.0.1 6379 # slaveof 主机的IP 主机端口号
注意
-
主机宕机,重启即可恢复主从状态,无需其他操作
-
从机宕机,重启后需要重新执行 slaveof 127.0.0.1 6379 才能恢复
-
从机可以在配置文件中写入slaveof 127.0.0.1 6379 ,这样重启无需手动输入slaveof 127.0.0.1 6379就可以恢复从机状态
主从复制原理
- Slave启动成功连接到master后会发送一个sync命令
- Master接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,master将传送整个数据文件到slave,以完成一次完全同步
- 全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
- 增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步
- 但是只要是重新连接master,一次完全同步(全量复制)将被自动执行
主动复制三种模式
第一种 一主二仆
- 问题1: 切入点问题,slave1、slave2是从头开始复制还是从切入点开始复制?比如从k4进来,那之前的k1,k2,k3是否也可以复制?
- 问题2 :从机是否可以写?set可否?
- 问题3:主机shutdown后情况如何?从机是上位还是原地待命?
- 问题4:主机又回来了后,主机新增记录,从机还能否顺利复制?
- 问题5:其中一台从机down后情况如何?依照原有它能跟上大部队吗(还会自动变为从机吗?)?
第二种 薪火相传
上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。用 slaveof <ip><port>
中途变更转向:会清除之前的数据,重新建立拷贝最新的,风险是一旦某个slave宕机,后面的slave都没法备份,主机挂了,从机还是从机,无法写数据了
第三种 反客为主
- 当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。用 slaveof no one 将从机变为主机。
哨兵模式
反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库
哨兵模式的使用步骤
第一步: 设置简单的一主二仆
第二步: 为哨兵模式准备配置文件-前端运行
-
在指定目录下新建sentinel.conf 配置文件中放入如下内容
sentinel monitor mymaster 主机的IP 主机的端口 1
-
其中mymaster为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。
哨兵配置文件设置
- 在redis根目录下 复制sentinel.conf文件 拷贝到指定目录下
- 修改其中的配置信息
- 修改选举配置
- 修改选举时间
启动哨兵
在刚配置好的sentinel.conf的目录下执行命令
redis-sentinel /root/myredis/sentinel.conf
或者
redis-sentinel sentinel.conf
- redis做压测可以用自带的redis-benchmark工具
哨兵模式的操作演示
主机宕机演示
当主机宕机,会从从机中选择一个作为新的主机,根据优先级slave-properity, 原主机重启后会成为从机
复制延时
由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。
故障恢复
优先级在redis.conf中默认:replica-priority 100,值越小优先级越高
偏移量是指获得原主机数据最全的
每个redis实例启动后都会随机生成一个40位的runid
Redis集群操作
目前面临问题分析
- 容量不够,redis如何进行扩容?
- 并发写操作, redis如何分摊?
- 另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。
- 之前通过代理主机来解决,但是redis3.0中提供了解决方案。就是
去中心化
集群配置。
Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。
Redis 集群通过分区(partition)来提供一定程度的可用性(availability ):即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求。
集群的搭建
第一步,搭建前的准备
- 之前操作产生的rdb和aof文件删除
- appendonly 修改回 no
- 清空主从复制和哨兵模式留下的一些文件
- 开启daemonize yes
- protected-mode no
- 注释掉bind
第二步,制作六个实例的配置文件
-
配置文件的内容
include /root/myredis/redis.conf port 6379 pidfile "/var/run/redis_6379.pid" dbfilename "dump6379.rdb" cluster-enabled yes cluster-config-file nodes-6379.conf cluster-node-timeout 15000
-
内容解释
include /root/myredis/redis.conf #引用公共的配置文件 port 6379 # 设置端口号 pidfile "/var/run/redis_6379.pid" # 设置pid进程文件 dbfilename "dump6379.rdb" # 设置rdb持久化问价名 cluster-enabled yes # 开启集群 cluster-config-file nodes-6379.conf # 设置集群使用的结点文件名 cluster-node-timeout 15000 # 设置结点失联时间
-
创建6379 6380 6381 6389 6390 6391 六个结点的配置文件
创建一个配置文件后,进行复制即可,然后再vim下,通过 :%s/6379/目标端口 来批量替换每个配置文件中的端口号
-
脚本命令: touch start.sh
#!/bin/sh
redis-server 6379.conf &
redis-server 6380.conf &
redis-server 6381.conf &
redis-server 6389.conf &
redis-server 6390.conf &
redis-server 6391.conf &
- 关闭脚本 shutdown.sh
#!/bin/sh
redis-cli -p 6379 shutdown &
redis-cli -p 6380 shutdown &
redis-cli -p 6381 shutdown &
redis-cli -p 6389 shutdown &
redis-cli -p 6390 shutdown &
redis-cli -p 6391 shutdown &
- 执行脚本
#启动redis
sh start.sh
#关闭redis
sh shutdown.sh
第三步 ,将六个服务合并为一个集群
-
切换目录到redis的src下
cd /opt/redis-7.0.10/src
-
运行如下指令
redis-cli --cluster create --cluster-replicas 1 192.168.126.128:6379 192.168.126.128:6380 192.168.126.128:6381 192.168.126.128:6389 192.168.126.128:6390 192.168.126.128:6391
此处不要用127.0.0.1, 请用真实IP地址 --replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好三组。
输入 yes 继续
集群的登录
集群登录方式
- 登录指令添加 -c 代表以集群方式登录
登录后查看集群信息
-
一个集群至少要有三个主节点。选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
-
分配原则尽量保证每个主数据库运行在不同的IP,每个从库和主库不在一个IP地址上。
cluster nodes
集群的slots
-
一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,
-
集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
-
集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:
- 节点 A 负责处理 0 号至 5460 号插槽。
- 节点 B 负责处理 5461 号至 10922 号插槽。
- 节点 C 负责处理 10923 号至 16383 号插槽。
集群中录入值
- 在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。
- redis-cli客户端提供了 –c 参数实现自动重定向。
- 如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。
-
不在一个slot下的键值,是不能使用mget,mset等多键操作。
-
可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。
集群中查找值
- cluster keyslot key 计算key应该保存在那个插槽
- cluster countkeysinslot slot的值 计算某个插槽中保存的key的数量
- CLUSTER GETKEYSINSLOT <slot><count> 返回 count 个 slot 槽中的键。
集群故障恢复
-
Redis集群的选举规则
- redis中的所有的主机互相监督.当有一台主机宕机时,则剩余的节点(主-从-超半数)投票选举新的主机.
==当主机数量缺失时 则集群系统崩溃.==
-
如果主节点下线?从节点能否自动升为主节点?注意:15秒超时
- 主节点恢复后,主从关系会如何?主节点回来变成从机。
- 如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?
- redis.conf中cluster-require-full-coverage 为yes 那么 ,整个集群都挂掉
- redis.conf中cluster-require-full-coverage 为no 那么,只有该插槽数据全都不能使用。
SpringBoot整合Redis集群
- 编辑yml文件
#Springboot整合Redis集群
spring:
data:
redis:
cluster:
nodes: 192.168.126.128:6379,192.168.126.128:6380,192.168.126.128:6381,192.168.126.128:6389,192.168.126.128:6390,192.168.126.128:6391
标签:redis,System,key,println,stringRedisTemplate,out
From: https://www.cnblogs.com/21CHS/p/18430664