Redis单线程&&多线程问题
redis是单线程还是多线程
在redis4.X之前的版本,redis是单线程;redis4.X版本之后,陆续开始支持多线程,比如持久化过程,但是核心工作线程仍然是单线程;redis6.X之后,redis针对部分设计大量数据操作,存在阻塞线程风险的命令提供了异步操作,如:zrange、ZRANK、flusdb等,但是核心工作线程依然是单线程。
redis为什么要使用单线程
- 单线程实现简单,避免了多线程场景下的线程同步、异步,锁竞争问题。
- 单线程中所有命令都是按顺序依次处理,保证了数据的一致性
- 因为redis数据结构简单,且数据都存储在内存中,所以cpu对redis的大部分命令都能很快的进行处理,因此cpu并不是redis的瓶颈,使用多线程并不能很大程度上提高redis的处理效率
- redis使用的是非阻塞I/O模型,因此I/O处理并不会导致CPU空闲,提供了对cpu的使用效率,因此减少了对多线程的依赖。
单线程的redis为什么还能保持高效率
- 基于内存操作:
Redis所有数据都存储在内存中,避免了磁盘读写。 - 数据结构简单:
简单的数据结构让大部分的命令处理复杂读都是O(1),因此性能较高。 - 多路复用和非阻塞I/O:
Redis使用I/O多路复用功能来监听多个socket连接客户端,这样就可以使用一个线程连接来处理多个请求,减少了多线程切换带来的损耗,避免了网络I/O阻塞操作。
已知redis的性能瓶颈一般是网络I/O,多路复用可以解决网络处理速度。传统命令处理过程如下:
由上图得到,当cpu执行命令效率较高的情况下,一次socket请求主要阻塞点在网路I/O的读写,而主线程的执行必须等待I/O的完成。I/O多路复用可以避免主线程阻塞,提高cpu的处理效率。
I/O多路复用中,主线程在socket连接匹配到I/O线程后,不会阻塞等待,这个过程使用了非阻塞I/O;完成了I/O的socket连接会释放信号,主动匹配主线程进行后续的命令处理;同理,结果回写到socket时,主线程也不会阻塞等待I/O线程的处理结果,而是等待回写成功的socket主动释放I/O流完成的信号,并进行后续处理。Rdis6.x之后,针对网络I/O的处理过程引入了多线,更好了提高了处理效率。 - 避免上下文切换
避免了多线程中cpu轮询导致的上下文切换带来的性能损耗,也避免了多线程中锁竞争和死锁问题。
后续版本的Redis为什么要引入多线程
bigKey的处理会带来较大的性能损耗,引入多线程可以通过异步操作处理bigkey,避免阻塞核心工作线程。如:unlink可以惰性删除bigKey
BigKey
moreKey处理
将批量的写命令写入到文件,使用管道扫描文件写入到redis,命令如下:
限制危险命令的使用
针对moreKey时用key * 会严重阻塞Redis;flushdb、flushall导致数据丢失;可以通过调整redis配置文件中的rename-command参数来禁止命令,如rename-command keys ""。
使用什么命令代替keys *
string类型:SCAN cursor [MATCH pattern] [COUNT count] [TYPE type]
set型:MATCH pattern] [COUNT count]
zset类型:rsor [MATCH pattern] [COUNT count]
hash类型:HSCAN key cursor [MATCH pattern] [COUNT count]
cat /temp/redisTest.tx | redis-cli -h 127.0.0.1 -p 6379 -a redis --pipe
BigKey产生的问题
- 分区数据存储不均
- 操作造成阻塞
怎么发现BigKey
- 通过--bigkeys参数,使用如下
redis-cli -h 127.0.0.1 -p 6379 -a redis --bigkeys
- 是用MEMORY命令
MEMORY USAGE key [SAMPLES count]
BigKey删除问题
- 渐进式删除(阻塞): string类型可直接使用del或unlink,其它类型可结合scan命令,对每次查出来的数据依次删除。
- 惰性删除(非阻塞):使用unlink
Redis与MySQL数据双写一致性
- 读取过程
- 从redis读取
- redis不存在时读取mysql
- mysql读取到数据时同步到redis
- 写过程
- 更新mysql
- 更新mysql成功后删除redis
双检加锁
当有大量请求查询一条Mysql存在,Redis不存在的命令时,避免大量命令重复读Mysql或写redis,使用双检加锁机制,实现过程见示例:
public BigDecimal queryFundShr(String code) {
/*
使用双检加锁策略保证数据库与缓存的一致性,避免大量请求进入因为查不到缓存直接查数据库,也避免大量重复的写操作进入redis
*/
Object fundShrObj = redisUtils.get(FUND_SHR_PRE + code);
// 1. 从redis缓存中读取数据,检查数据是否存在
if (null == fundShrObj) {
// 2. 缓存中不存在数据,则加锁读取数据库
synchronized (FUND_SHR_PRE) {
// 3. 同步块中先重新获取redis缓存,若此时其它线程已经将数据同步到redis,则不需要再查询数据库,避免穿透数据库
fundShrObj = redisUtils.get(FUND_SHR_PRE + code);
if (null != fundShrObj) {
return BigDecimal.valueOf(Double.parseDouble(String.valueOf(fundShrObj)));
}
FundInfoEntity entity = fundManageMapper.selectFundInfoByCode(code);
if (null == entity) {
return null;
}
// 4. 同步块中将数据同步到redis,避免重复的写操作进入redis;设置过期时间,当双写数据不一致时,通过过期时间能保证最终一致性
redisUtils.set(FUND_SHR_PRE + code, entity.getTotalShr(), 60*60*3);
return entity.getTotalShr();
}
}
return BigDecimal.valueOf(Double.parseDouble(String.valueOf(fundShrObj)));
}
一致性更新策略
- 给所有数据设置过期时间,定期更新缓存,避免更新缓存失败导致数据不一致,达到最终一致性
- 采用先更新数据库,再删除缓存的策略;为了保证最终一致性,删除缓存的过程可以引入消息中间件,删除失败时可以重试
数据统计
聚合统计
如寻找共同好友;可使用set数据结构,利用set的sdiff和sunion命令完成差集和并集。
排名统计
如按找时间排序,获取短视频的评论列表,可以使用zset数据结构,将所有评论数据的关键字段按发表时间插入到zset,使用zset可以快速实现排序分页查询。
二值统计
布隆过滤器实现,可使用bitmap实现布隆过滤器;签到日历实现。
签到日历实现代码示例:
@PutMapping(value = "/add/attendance")
@ApiOperation("【基数统计】签到日历-添加签到信息")
public R<String> addAttendance(@RequestParam("user") String user, @RequestParam("date") String date) {
R<String> r = R.ok("添加成功");
final String attendance_user_key = String.format("%s:%s:%s", "attendance_user", date.substring(0, date.length() - 2), user);
Calendar calendar = Calendar.getInstance();
calendar.setTime(DateUtils.parseDate(date, "yyyyMMdd"));
redisUtils.setBitMap(attendance_user_key, calendar.get(Calendar.DAY_OF_MONTH), true);
return r;
}
@GetMapping(value = "/attendance/calendar")
@ApiOperation("【基数统计】签到日历-查看用户某个月的签到日历")
public R<HashMap<String, String>> attendanceCalendar(@RequestParam("year") String year, @RequestParam("month") String month, @RequestParam("user") String user) {
R<HashMap<String, String>> r = new R<>(R.SUCCESS);
final String attendance_user_key = String.format("%s:%s:%s", "attendance_user", year + month, user);
HashMap<String, String> calendarMap = new HashMap<>(31);
for (int i = 1; i < 32; i++) {
Boolean attendanceStatus = redisUtils.getBitMapValue(attendance_user_key, i);
calendarMap.put(String.format("%s年%s月%s日",year, month, i), null != attendanceStatus && Boolean.TRUE.equals(attendanceStatus) ? "1" : "0");
}
r.setData(calendarMap);
return r;
}
布隆过滤器实现:
# 布隆过滤器实现代码
/**
* 添加bloom过滤器中的1标签
* @param bloomKey
* @param key
*/
public void addFlag(String bloomKey, String key) {
// 通过对key做hash运算得到bloom过滤器的偏移量
int hashValue = Math.abs(key.hashCode());
long index = (long) (hashValue % Math.pow(2, 23));
redisUtils.setBitMap(bloomKey, index, true);
}
/**
* 检查key是否存在
* @param bloomKey
* @param key
* @return
*/
public Boolean checkItem(String bloomKey, String key){
// 通过对key做hash运算得到bloom过滤器的偏移量
int hashValue = Math.abs(key.hashCode());
long index = (long) (hashValue % Math.pow(2, 23));
return redisUtils.getBitMapValue(bloomKey, index);
}
# 布隆过滤器使用-添加数据存在标记
@PostMapping(value = "/add")
@ApiOperation("【布隆过滤器】-添加基金产品")
public R<String> addFundInfo(@RequestBody FundInfoParam param) {
R<String> vm = R.ok();
FundInfoDto dto = new FundInfoDto();
BeanUtils.copyProperties(param, dto);
fundManageService.addFundInfo(dto);
// 插入数据库后添加布隆过滤器标记
bloomFilterUtils.addFlag("bloomFilter:fundShr", dto.getCode());
vm.setData("新增成功");
return vm;
}
# 布隆过滤器使用-校验数据是否存在
@Override
public BigDecimal queryFundShr(String code) {
// 添加布隆过滤器,避免恶意击穿数据库
if (!bloomFilterUtils.checkItem("bloomFilter:fundShr", code)) {
//布隆过滤器中不存在时,数据肯定不存在
return null;
}
/*
使用双检加锁策略保证数据库与缓存的一致性,避免大量请求进入因为查不到缓存直接查数据库,也避免大量重复的写操作进入redis
*/
Object fundShrObj = redisUtils.get(FUND_SHR_PRE + code);
// 1. 从redis缓存中读取数据,检查数据是否存在
if (null == fundShrObj) {
// 2. 缓存中不存在数据,则加锁读取数据库
synchronized (FUND_SHR_PRE) {
// 3. 同步块中先重新获取redis缓存,若此时其它线程已经将数据同步到redis,则不需要再查询数据库,避免穿透数据库
fundShrObj = redisUtils.get(FUND_SHR_PRE + code);
if (null != fundShrObj) {
return BigDecimal.valueOf(Double.parseDouble(String.valueOf(fundShrObj)));
}
FundInfoEntity entity = fundManageMapper.selectFundInfoByCode(code);
if (null == entity) {
return null;
}
// 4. 同步块中将数据同步到redis,避免重复的写操作进入redis;设置过期时间,当双写数据不一致时,通过过期时间能保证最终一致性
redisUtils.set(FUND_SHR_PRE + code, entity.getTotalShr(), 60*60*3);
return entity.getTotalShr();
}
}
return BigDecimal.valueOf(Double.parseDouble(String.valueOf(fundShrObj)));
}
基数统计
记录网站单日的访问人数,可使用hyperLogLog结构,对于重复访问的人进行过滤;hyperLogLog存在0.81%的统计误差,通过牺牲准确性,提高了数据的处理效率
分布式锁
基本实现
仅需实现锁的高可用、独占性、防死锁、不乱抢(约束当前线程只能操作自己的锁),可使用setnx实现,示例如下:
// 获取锁
private static boolean acquireLock(RedisUtils redisUtils, String lockKey) {
// 尝试获取锁,设置锁的超时时间和等待时间
long timeout = 10*1000L; // 10秒超时时间
long waitTime = 100; // 100毫秒等待时间
long endTime = System.currentTimeMillis() + timeout;
//通过while重试获取锁资源
while (System.currentTimeMillis() < endTime) {
// 尝试设置锁,如果返回1表示成功获取锁
if (redisUtils.setnx(lockKey, String.valueOf(Thread.currentThread().getId()), timeout)) {
// 添加看门狗,动态调控锁的有效时间
addWatchDog(redisUtils, lockKey, 3L);
return true;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return false;
}
// 释放锁,使用lua脚本,保证锁状态查看与删除锁的两个操作的原子性
private static void releaseLock(RedisUtils redisUtils, String lockKey) {
// 释放锁
Object lockThread = redisUtils.get(lockKey);
// 只删除当前线程自己的锁,使用rua脚本保证判断与删除过程的原子性
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
redisUtils.executeScript(new DefaultRedisScript<>(luaScript, Boolean.class), Collections.singletonList(lockKey), String.valueOf(lockThread));
}
// 看门狗实现,使用异步线程实现,也可根据时间业务场景采用定时任务或消息中间件
/**
* 添加看门口守护线程
* @param redisUtils
* @param lockKey
*/
private static void addWatchDog(final RedisUtils redisUtils, final String lockKey, final long delayTime) {
new Thread(() -> {
// 循环扫描全局锁是否存在,若存在则增加过期时间
while (redisUtils.hasKey(lockKey)) {
redisUtils.expire(lockKey, delayTime);
logger.info("【分布式可重入锁】-看门狗延期成功");
try {
Thread.sleep(800L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
可重入特性实现
使用redis中hash结构的hsetnx特性,可以通过分布式锁的基本功能;hash结构中可以存储锁的自定义属性,可以做锁的重入次数统计。实现示例:
// 加锁过程
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
String lockThreadId = LOCK_KEY_PRE + Thread.currentThread().getId();
// 通使用hash结构的实现可重入锁,添加long类型属性记录重入次数
Object lockCount = redisUtils.hget(this.lockKey, lockThreadId);
if (lockCount != null && (Integer)lockCount > 0) {
// 当前进程已经获取到锁,则对锁的重入次数加1
redisUtils.hincr(this.lockKey, lockThreadId, 1);
redisUtils.expire(this.lockKey, 3L);
return true;
}
long waitTime = 100; // 100毫秒等待时间
long endTime = System.currentTimeMillis() + time;
while (System.currentTimeMillis() < endTime) {
// 尝试设置锁,如果返回1表示成功获取锁
if (redisUtils.hsetnx(this.lockKey, lockThreadId, 1)) {
// 设置锁的过期时间为5s
redisUtils.expire(this.lockKey, 3L);
addWatchDog(redisUtils, lockKey, 3L);
return true;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return false;
}
// 释放锁
public void unlock() {
String lockThreadId = LOCK_KEY_PRE + Thread.currentThread().getId();
Object lockCount = redisUtils.hget(this.lockKey, lockThreadId);
// 判断当前线程的锁资源是否存在,存在则进行释放
if (lockCount != null) {
// 没次释放锁时,对锁的次数减一,减到0时删除锁
if (redisUtils.hincr(this.lockKey, lockThreadId, -1) < 1) {
redisUtils.del(this.lockKey);
}
}
}
实际使用中,如果需要加强分布式锁的数据一致性,可将上述实例中redis加锁过程和释放过程都使用lua脚本做原子性操作。
RedLock(红锁)使用
- 解决问题的背景
上面两种方式,利用集群redis实现分布式锁保证了AP(高可用、容错性)特性,但是无法完成CP-强一致性。
如线程1获取锁成功,master节点成功执行setnx命令后宕机,slave节点还没同步命令,哨兵完成故障转移后导致数据丢失,此时线程2可以成功获取锁,以至于分布式锁的独占性失效。 - 解决方案
RedLock通过提供多台节点共同完成分布式锁变量的存储来确保了一致性问题,具体需要提供的Redis节点数可根据公式:[N = 2C + 1]提供,N为总节点数,C为允许故障的节点数。 - 实践
参考Redisson官方介绍文件
// 1.提供redisson配置以及客户端实例
@Bean
public RedissonClient redissonClient() {
// 1. Create config object
Config config = new Config();
config.useSingleServer()
// use "rediss://" for SSL connection
.setAddress(String.format("redis://%s:%s",redisHost, redisPort))
.setPassword(redisPassword);
// 2. Create Redisson instance
// Sync and Async API
return Redisson.create(config);
// // Reactive API 该方式提供异步操作
// RedissonReactiveClient redissonReactive = redisson.reactive();
//
// // RxJava3 API
// RedissonRxClient redissonRx = redisson.rxJava();
}
// 2.业务代码中使用红锁
// 单重锁实现
public R<String> globalRedLock(){
RLock redLock = redissonClient.getLock("system:global:redLock");
logger.info("【主线程开始进入分布式可重入锁验证】,开始第1次获取锁。");
redLock.lock();
logger.info("【主线程开始进入分布式可重入锁验证】,第1次获取锁成功。");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("【主线程开始进入分布式可重入锁验证】,开始第2次获取锁。");
redLock.lock();
logger.info("【主线程开始进入分布式可重入锁验证】,第2次获取锁成功。");
logger.info("【主线程开始进入分布式可重入锁验证】,开始第1次释放锁。");
if (redLock.isLocked() && redLock.isHeldByCurrentThread()) {
// 删除当前线程自己的锁
redLock.unlock();
}
logger.info("【主线程开始进入分布式可重入锁验证】,第1次释放锁成功。");
logger.info("【主线程开始进入分布式可重入锁验证】,开始第2次释放锁。");
if (redLock.isLocked() && redLock.isHeldByCurrentThread()) {
// 删除当前线程自己的锁
redLock.unlock();
}
logger.info("【主线程开始进入分布式可重入锁验证】,第2次释放锁成功。");
new Thread(() -> {
logger.info("【子线程开始进入分布式可重入锁验证】,开始获取锁。");
redLock.lock();
logger.info("【子线程开始进入分布式可重入锁验证】,获取锁成功。");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("【子线程开始进入分布式可重入锁验证】,开始释放锁。");
if (redLock.isLocked() && redLock.isHeldByCurrentThread()) {
// 删除当前线程自己的锁
redLock.unlock();
}
logger.info("【子线程开始进入分布式可重入锁验证】,释放锁成功。");
}).start();
return R.ok("redis可重入锁验证成功");
}
//多重锁实现
public R<String> globalRedMultiLock(){
// 使用多个redis节点实例来实现多重锁
RLock redLock1 = redissonClient1.getLock("system:global:redMultiLock");
RLock redLock2 = redissonClient2.getLock("system:global:redMultiLock");
RLock redLock3 = redissonClient3.getLock("system:global:redMultiLock");
RedissonMultiLock redMultiLock = new RedissonMultiLock(redLock1,redLock2,redLock3);
logger.info("【主线程开始进入分布式可重入锁验证】,开始第1次获取锁。");
redMultiLock.lock();
logger.info("【主线程开始进入分布式可重入锁验证】,第1次获取锁成功。");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("【主线程开始进入分布式可重入锁验证】,开始第2次获取锁。");
redMultiLock.lock();
logger.info("【主线程开始进入分布式可重入锁验证】,第2次获取锁成功。");
logger.info("【主线程开始进入分布式可重入锁验证】,开始第1次释放锁。");
// 删除当前线程自己的锁
redMultiLock.unlock();
logger.info("【主线程开始进入分布式可重入锁验证】,第1次释放锁成功。");
logger.info("【主线程开始进入分布式可重入锁验证】,开始第2次释放锁。");
// 删除当前线程自己的锁
redMultiLock.unlock();
logger.info("【主线程开始进入分布式可重入锁验证】,第2次释放锁成功。");
new Thread(() -> {
logger.info("【子线程开始进入分布式可重入锁验证】,开始获取锁。");
redMultiLock.lock();
logger.info("【子线程开始进入分布式可重入锁验证】,获取锁成功。");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("【子线程开始进入分布式可重入锁验证】,开始释放锁。");
// 删除当前线程自己的锁
redMultiLock.unlock();
logger.info("【子线程开始进入分布式可重入锁验证】,释放锁成功。");
}).start();
return R.ok("redis可重入锁验证成功");
}
缓存过期淘汰策略
redis内存大小
通过maxmemory配置,默认为0(64位抢占物理机所有内存)
- 查看redis内存
info memory查看使用内存
get config maxmemory查看最大内存 - 内存用完的现象
新增命令内存不够时返回OOM错误
key过期删除策略
- 立即删除
CPU一直扫描过期key,并进行删除;占用了CPU大量的时间。 - 惰性删除
key过期后不做处理,在下次访问时进行删除;对CPU友好,但是过期数据永远不在访问,会出现内存溢出。 - 定期删除策略
Redis 针对过期 Key 的定期删除策略算法是一种渐进式的删除策略,它会在一定的时间间隔内,随机地检查一定数量的 Key 是否过期,并删除过期的 Key。这种策略可以有效地平衡内存使用率和 CPU 使用率。
算法流程
- Redis 会根据配置文件中指定的 hz 参数,计算出每次检查的 Key 的数量。hz 参数表示每秒钟检查的 Key 的数量,默认值为 10。
- Redis 会随机地选择一个数据库,并从该数据库中取出 hz 个 Key。
- Redis 会检查这 hz 个 Key 是否过期,如果过期则删除。
- Redis 会重复步骤 2 和 3,直到检查完所有数据库。
Redis内存管理中同时使用了定期删除与惰性删除。
内存淘汰策略
key过期后通过惰性删除和定时删除,仍有可能出现内存泄漏的问题,因此提供了8种内存淘汰策略作为兜底方案。
- noeviction(默认):当添加元素内存不足时不做任务处理,仅返回OOM的error错误
- allkeys-random: 对任意key随机删除
- allkeys-lru: 对所有key使用LRU算法进行删除,优先删除最近不经常使用的key
- allkeys-lfu:对所有key使用lfu算法进行删除,删除使用频次少的key
- volatile-ttl:删除马上要过期的key
- volatile-random: 对所有设置的过期时间的key随机删除
- volatile-lru: 对所有设置了过期时间的key使用LRU算法
- volatile-lfu:对所有设置了过期时间的key,使用lfu算法进行删除,删除使用频次少的key
skipList(跳表)
- 问题背景
针对单链表,可以快速进行增、删、改,但是查询的复杂度时O(n),针对数据量较大的查询操作不友好,通过跳表可以优化这个问题。 - 数据结构原理
跳表是对单链表的改造,在链表的基础上提供了多级索引实现快速查找,形成了多级索引链表的结构,其中索引链表部分不存储数据,只存储key以及数据指针。
跳表数据结构包含属性如下:
跳数整体数据结构示例图:
- 复杂度
时间复杂度:O(log n)
空间复杂度:O(log n)