Redis实战篇
涵盖内容如下:
1、导入黑马点评项目
- 当前模型
1.1 导入SQL
1.2 导入后端项目
解压到工作台,用idea打开即可
注: 不要忘了修改application.yaml文件中的mysql、redis地址信息
- 测试
- 启动项目后,在浏览器访问:http://localhost:8081/shop-type/list ,如果可以看到数据则证明运行没有问题
- 原项目使用的端口为8081,此处由于我的8081端口被java占用,将端口修改为8082,成功获取到如下信息
1.3 导入前端项目
自资料中提供了一个nginx压缩包,将其解压到任意目录,确保该目录不含中文、特殊字符和空格,此处我解压到D:\study\front
此处由于我后端的端口改为8082,前端nginx配置文件中也需要进行端口的修改,在D:\study\front\nginx-1.18.0\conf
作如下修改:
1.4 运行前端项目
- 在nginx所在目录下打开一个CMD窗口,输入命令:
start .\nginx.exe
- 打开浏览器并打开开发者工具,打开手机模式,访问: http://127.0.0.1:8080 ,即可看到页面
2、短信登陆
2.1 基于Session实现登录流程
2.2 session共享问题
多台参与轮巡的Tomcat并不共享session存储空间, 当请求切换到不同tomcat时导致数据丢失的问题.
- 早期做法: session拷贝
- 分析虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
- 存在的问题1、每台服务器中都有完整的一份session数据,服务器压力过大。2、session拷贝数据时,可能会出现延迟
- 现在: redis替换session
- 分析redis数据本身就是共享的,可以避免session
2.3 Redis代替session的业务流程
2.3.1 设计key的结构
**验证码的保存: **
- 存储数据结构
- 直接使用string就可以
- key
- 用户提交手机号, 后端发送验证码, 之后校验用户时客户端还需要带着这个key来取
- 用户提交正是手机号和验证码因此, 可以使用手机号作为key
**用户信息保存: **
-
存储数据的结构
- 如果不是特别在意内存,其实使用String就可以, 从优化的角度讲, 推荐使用hash
-
key的选择每个用户都有自己的session,但是redis的key是共享的,为了防止重复, 不能使用code
-
key的设计需要满足两点1、key要具有唯一性2、客户端要方便携带
-
如果采用手机号:
- 手机号是一个敏感信息, 保存在客户端(浏览器)后, 会有泄漏的风险
-
解决方案:
- 在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了
-
2.3.2 整体访问流程
2.3.3 基于Redis实现短信登录
- UserServiceImpl代码
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 发送验证码
* @param phone 前端传来的手机号
* @param session 不再使用, 已被代替
* @return 通用结果类
*/
@Override
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2. 如果不符合, 返回错误信息
return Result.fail("手机号格式错误!");
}
// 3. 符合, 生成验证码
String code = RandomUtil.randomNumbers(6);
// 4. 保存验证码 到redis LOGIN_CODE_KEY = "login:code:" LOGIN_CODE_TTL = 2L
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL,TimeUnit.MINUTES);
// 5. 发送验证码
log.debug("发送短信验证码成功, 验证码: {}", code);
// 返回ok
return Result.ok();
}
/**
* 登录
* @param loginForm 保存前端传来的手机号和验证码
* @param session 不再使用, 已被redis替代
* @return 通用结果类
*/
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 2. 如果不符合, 返回错误信息
return Result.fail("手机号格式错误!");
}
// 3. 从Redis获取验证码并校验
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)) {
// 不一致, 报错
return Result.fail("验证码错误");// 使用反向编码可以防止代码嵌套层次过深
}
// 4. 一致, 根据手机号查询用户
User user = query().eq("phone", phone).one();
// 5. 判断用户是否存在
if (user == null) {
// 6. 不存在, 创建新用户并保存到数据库
user = createUserWithPhone(phone);
}
// 7. 保存用户信息到redis
// 7.1 随机生成token, 作为登录令牌
String token = UUID.randomUUID().toString(true);
// 7.2 将User对象转为HashMap存储
// 使用UserDTO只保存不敏感的用户主要信息, 从而节省空间, 保证安全性
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue) -> fieldValue.toString()));// StringRedisTemplate要求字段值都是string
// 7.3 存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);
// 7.4 设置token有效期
stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
// 8. 返回token
return Result.ok(token);// 前端接收token, 并保存到浏览器上
}
/**
* 使用手机号创建用户
* @param phone 手机号
* @return 创建后的用户, 携带手机号和随机字符串生成的默认名
*/
private User createUserWithPhone(String phone) {
// 1. 创建用户 LOGIN_USER_KEY = "login:token:" LOGIN_USER_TTL = 30L
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
// 2. 保存用户
save(user);
return user;
}
}
2.3.4 登录拦截功能分析
- 第一个拦截器
- 对所有路径进行拦截, 实现保存用户信息 并 刷新token有效期的功能, 使若用户长时间不访问网站则自动清除token, 当用户在网站中活跃, 在该拦截器中刷新token的有效期
- 第二个拦截器
- 对指定路径实现拦截功能
2.4.5 拦截器代码具体实现
- RefreshTokenInterceptor
public class RefreshTokenInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;// 由调用者MvcConfig经由自动注入传来
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 获取请求头中的token
String token = request.getHeader("authorization");// token被前端保存在浏览器每次请求的请求头中
if (StrUtil.isBlank(token)) {
return true;// 若token不存在, 放行
}
// 2. 基于token获取redis中的用户
String key = RedisConstants.LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
// 3. 判断用户是否存在
if (userMap.isEmpty()) {
return true;// 若用户不存在, 放行
}
// 5. 将查询到的Hash数据转为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6. 存在, 用户信息保存到TheadLocal
UserHolder.saveUser(userDTO);
// 7. 刷新token有效期
stringRedisTemplate.expire(key,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
// 7. 放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户, 避免内存泄漏
UserHolder.removeUser();
}
}
// 在请求处理过程中,preHandle 方法会将用户信息加载到 ThreadLocal 中,并在整个请求周期内使用这些信息。请求处理完成后,afterCompletion 方法会清理这些信息,确保不会影响到其他请求。
- LoginInterceptor
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser() == null) {
// 没有, 需要拦截, 设置状态码
response.setStatus(401);
// 拦截
}
// 有用户, 则放行
return true;
}
}
- MvcConfig
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 登录拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
).order(1);
// token刷新的拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);// order的值越小, 越先执行
}
}
- 配置的拦截器默认是按照添加顺序执行, 可以通过order来指定执行的优先级, 默认值都为0, order的值越小, 执行的越早
3、商户查询缓存
3.1 如何使用缓存
实际开发中,会构筑多级缓存来使系统运行速度进一步提升,例如:本地缓存与redis中的缓存并发使用
浏览器缓存:主要是存在于浏览器端的缓存
**应用层缓存:**可以分为tomcat本地缓存,比如之前提到的map,或者是使用redis作为缓存
**数据库缓存:**在数据库中有一片空间是 buffer pool,增改查数据都会先加载到mysql的缓存中
**CPU缓存:**当代计算机最大的问题是 cpu性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了cpu的L1,L2,L3级的缓存
3.2 缓存更新策略
3.2.1 数据库缓存不一致解决方案
旁路缓存模式 读写穿透模式 异步缓存写入
Cache Aside Pattern √ 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理
Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致
方案一:
- 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
- 如何保证缓存与数据库的操作的同时成功或失败?(原子性)
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布式事务方案
- 先操作缓存还是先操作数据库?(线程安全问题)
- 先删除缓存,再操作数据库
- 先操作数据库,再删除缓存
- 更新数据库操作慢, 需要进行IO, 而线程2查询快, 写入redis也快, 因此发生几率高
- 发生的条件: 两个线程并行执行 + 缓存恰好到期 + 查完数据库后, 写完缓存前 更新好数据库, 发生几率低, 可以再加入超时时间
3.3 缓存穿透
缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
- 缓存空对象√
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗(null)
- 可能造成短期的不一致(ttl内, 也可以插入时手动更新)
- 布隆过滤
- 优点:内存占用较少,没有多余key
- 缺点:
- 实现复杂
- 存在误判可能(判断存在的时候不一定真的存在)
编码解决商品查询的缓存穿透问题:
- 核心思路:
在原来的逻辑中,我们如果发现这个数据在mysql中不存在,直接就返回404了,这样是会存在缓存穿透问题的
现在的逻辑中:如果这个数据不存在,我们不会返回404 ,还是会把这个数据写入到Redis中,并且将value设置为空,欧当再次发起查询时,我们如果发现命中之后,判断这个value是否是null,如果是null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。
- 核心代码:
public Shop queryWithPassTrough(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1. 从redis查询商户缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {// 有效值
// 3. 存在, 直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
// 判断命中的是否是空值
if (shopJson != null) {// 此时 shopJson == ""
// 返回一个错误信息
return null;
}
// 4. 不存在, 根据id查询数据库
Shop shop = getById(id);
// 5. 不存在, 返回错误
if (shop == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6. 存在, 写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 7. 返回
return shop;
}
小总结:
缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存null值
- 布隆过滤
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
3.4 缓存雪崩
缓存雪崩是指在同一时段**大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库**,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
3.5 缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
- 互斥锁
- 逻辑过期
逻辑分析:
假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
- 解决方案一: 锁 + double check
- 解决方案二: 逻辑过期
3.5.1 利用互斥锁解决缓存击穿问题
思路分析:
利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false
我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。
操作锁的代码:
/**
* 尝试获取锁
* @param key 锁在redis重点的键值
* @return 是否获取成功
*/
private boolean tryLock(String key) {
// 有效期一般为正常业务执行业务的10/20倍
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 释放锁
* @param key 锁在redis重点的键值
*/
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
操作代码:
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1. 从redis查询商户缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {// 有效值
// 3. 存在, 直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
// 判断命中的是否是空值(穿透)
if (shopJson != null) {// 此时 shopJson == ""
// 返回一个错误信息
return null;
}
// 4. 实现缓存重建
// 4.1 获取互斥锁
Shop shop = null;
String lockKey = "lock:shop:" + id;
try {
boolean isLock = tryLock(lockKey);
// 4.2 判断是否获取成功
if(!isLock) {
// 4.3 失败, 则休眠并重试
Thread.sleep(50);// 试一试
return queryWithMutex(id);// 递归??
}
// 4.4 成功
// 4.4.1 检测redis缓存是否存在, 做DoubleCheck
shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {// 有效值
// 3. 存在, 直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
// 判断命中的是否是空值(穿透)
if (shopJson != null) {// 此时 shopJson == ""
// 返回一个错误信息
return null;
}
// 4.4.2 根据id查询数据库
shop = getById(id);
// 模拟重建的延迟
Thread.sleep(200);
// 5. 不存在, 返回错误
if (shop == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6. 存在, 写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 7. 释放互斥锁
unlock(lockKey);
}
// 8. 返回
return shop;
}
3.5.2 利用逻辑过期解决缓存击穿问题
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
思路分析:
当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据(此时数据库里一定没有, 逻辑过期会提前存入所有需要的数据, 并设置逻辑过期时间)
一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据; 如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
- 新建实体类
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
- 在ShopServiceImpl 新增方法,利用单元测试进行缓存预热
public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {
// 1. 查询店铺数据
Shop shop = getById(id);
Thread.sleep(200);// 模拟延迟
// 2. 封装逻辑过期
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
// 3. 写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData));
}
@Test
public void testSaveShop() throws Exception{
shopService.saveShop2Redis(1L,10L);
}
- 正式代码
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1. 从redis查询商户缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isBlank(shopJson)) {// 有效值
// 3. 存在, 直接返回
return null;
}
// 4. 命中, 需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
// 5. 判断是否过期
LocalDateTime expireTime = redisData.getExpireTime();
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1 未过期, 直接返回店铺信息
return shop;
}
// 5.2 已过期, 需要缓存重建
// 6. 缓存重建
// 6.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2 判断是否获取锁成功
if (isLock) {
// 6.3.1 成功, doubleCheck
// 1. 从redis查询商户缓存
shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isBlank(shopJson)) {// 有效值
// 3. 存在, 直接返回
return null;
}
// 4. 命中, 需要先把json反序列化为对象
redisData = JSONUtil.toBean(shopJson, RedisData.class);
data = (JSONObject) redisData.getData();
shop = JSONUtil.toBean(data, Shop.class);
// 5. 判断是否过期
expireTime = redisData.getExpireTime();
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1 未过期, 直接返回店铺信息
return shop;
}
// 6.3.2 开启独立线程实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 重建缓存
this.saveShop2Redis(id,20L);// 实际业务中应该设置30mins
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4 返回过期商铺信息
return shop;
}
3.6 封装Redis工具类
基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
- 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
将逻辑进行封装
@Component
@Slf4j
public class CacheClient {
@Resource
private StringRedisTemplate stringRedisTemplate;
public void set(String key, Object value, Long time, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
// 设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
public <R,ID> R queryWithPassTrough(
String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id; // 使用者的查询方式, 查询逻辑不能确定, 需要从外界传递过来
// 1. 从redis查询商户缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isNotBlank(json)) {// 有效值
// 3. 存在, 直接返回
return JSONUtil.toBean(json, type);
}
// 判断命中的是否是空值
if (json != null) {// 此时 shopJson == ""
// 返回一个错误信息
return null;
}
// 4. 不存在, 根据id查询数据库
R r = dbFallback.apply(id);
// 5. 不存在, 返回错误
if (r == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6. 存在, 写入redis
this.set(key,r,time,unit);
// 7. 返回
return r;
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public <R,ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
// 1. 从redis查询商户缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isBlank(json)) {// 有效值
// 3. 存在, 直接返回
return null;
}
// 4. 命中, 需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
R r = JSONUtil.toBean(data, type);
// 5. 判断是否过期
LocalDateTime expireTime = redisData.getExpireTime();
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1 未过期, 直接返回店铺信息
return r;
}
// 5.2 已过期, 需要缓存重建
// 6. 缓存重建
// 6.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2 判断是否获取锁成功
if (isLock) {
// doubleCheck
json = stringRedisTemplate.opsForValue().get(key);
// 2. 判断是否存在
if (StrUtil.isBlank(json)) {// 有效值
// 3. 存在, 直接返回
return null;
}
// 4. 命中, 需要先把json反序列化为对象
redisData = JSONUtil.toBean(json, RedisData.class);
data = (JSONObject) redisData.getData();
r = JSONUtil.toBean(data, type);
// 5. 判断是否过期
expireTime = redisData.getExpireTime();
if (expireTime.isAfter(LocalDateTime.now())) {
// 5.1 未过期, 直接返回店铺信息
return r;
}
// 6.3.2 开启独立线程实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 查数据库
R r1 = dbFallback.apply(id);
// 写入缓存
setWithLogicalExpire(key,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 6.4 返回过期商铺信息
return r;
}
/**
* 尝试获取锁
* @param key 锁在redis重点的键值
* @return 是否获取成功
*/
private boolean tryLock(String key) {
// 有效期一般为正常业务执行业务的10/20倍
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 释放锁
* @param key 锁在redis重点的键值
*/
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}
ShopServiceImpl:
@Resource
private CacheClient cacheClient;
@Override
public Result queryById(Long id) {
// 缓存穿透
// Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class, this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);
// 逻辑过期解决缓存击穿
Shop shop = cacheClient
.queryWithLogicalExpire(CACHE_SHOP_KEY, id,Shop.class,this::getById,20L,TimeUnit.SECONDS);
if (shop == null) return Result.fail("店铺不存在!");
// 7. 返回
return Result.ok(shop);
}
4、优惠券秒杀
4.1 全局唯一id
场景引入
当用户抢购时,会生成订单并保存到tb_voucher_order表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显
- 受单表数据量的限制
全局ID生成器
是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
4.2 Redis实现全局唯一Id
@Component
public class RedisIdWorker {
// 开始时间戳
private static final long BEGIN_TIMESTAMP = 1704067200;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;
@Resource
StringRedisTemplate stringRedisTemplate;
// 不同的业务有不同的key, 因此需要设置一个业务前缀
public long nextId(String keyPrefix) {
// 1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long newSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = newSecond - BEGIN_TIMESTAMP;
// 2. 生成序列号
// 2.1 获取当前日期, 精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3. 拼接并返回
return timestamp << COUNT_BITS | count;
}
/* 获取指定时间的毫秒值
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2024, 1, 1, 0, 0, 0);
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println("second = " + second);
}*/
}
4.3 锁
悲观锁:
可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等
乐观锁:
会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如cas
乐观锁的典型代表:就是cas,利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值
常见的分布式锁有三种
Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见
Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述
4.4 Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
4.5 redission
4.5.1 入门
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
快速入门:
- 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
- 配置Redisson客户端
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.198.130:6379").setPassword("123456");
// 创建RedissonConfig对象
return Redisson.create(config);
}
/* @Bean
public RedissonClient redissonClient2(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.198.130:6380").setPassword("123456");
// 创建RedissonConfig对象
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.198.130:6381").setPassword("123456");
// 创建RedissonConfig对象
return Redisson.create(config);
}*/
}
- 如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
4.5.2 分布式锁-redission可重入锁原理
在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有,所以接下来我们一起分析一下当前的这个lua表达式
这个地方一共有3个参数
KEYS[1] : 锁名称
ARGV[1]: 锁失效时间
ARGV[2]: id + “:” + threadId; 锁的小key
去判断当前这个方法的返回值是否为null,如果是null,则对应则前两个if对应的条件,退出抢锁逻辑,如果返回的不是null,即走了第三个分支,在源码处会进行while(true)的自旋抢锁。
"if (redis.call('exists', KEYS[1]) == 0) then " + // 当前这个key不存在
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 创建
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置时间
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 自增
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重新设置时间值
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"
4.6 Redis消息队列
4.6.1 认识消息队列
最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
使用队列的好处在于 解耦
4.6.2 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
- 发送消息
eg:
- 读取消息
eg:
使用XREAD读取第一个消息
XREAD阻塞方式,读取最新的消息
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
4.6.3 基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
“>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
消费者监听消息的基本思路
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
4.7 异步秒杀下单代码实现
提前建立消息队列stream.orders
XGROUP create stream.orders g1 0 MKSTREAM
VoucherServiceImpl
- 前端添加秒杀券的时候将库存写入redis
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀的库存到redis当中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
}
seckill.lua
判断是否可以下单, 若可以下单, 发送消息到消息队列中
-- 1. 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]
-- 2. 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId -- lua拼接字符串
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3. 脚本业务
-- 3.1 判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
-- 3.2 库存不足, 返回1
return 1
end
-- 3.2 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
-- 3.3 存在, 说明是重复下单, 返回2
return 2
end
-- 3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
VoucherOrderServiceImpl
- setkillVoucher 前端发送请求处理新的订单
前置准备
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 用于在java中执行Lua脚本, Long为返回值类型
static {// 提前lua脚本中的内容读取进来
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 用于数据库的写入操作, 不要求速度, 给一个线程即可
@PostConstruct // 外部类初始化完毕就执行, 随时等待处理消息队列中的订单
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private IVoucherOrderService proxy;// 在外部获取代理类,用于异步处理订单
@Override
public Result setkillVoucher(Long voucherId) {
// 获取用户
Long userId = UserHolder.getUser().getId();
// 获取订单id
long orderId = redisIdWorker.nextId("order");
// 1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),String.valueOf(orderId)
);
// 2. 判断结果是否为0
int r = result.intValue();
if (r != 0) {
// 2.1 不为0, 代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3. 获取代理对象
// 获取代理对象(事务)
proxy = (IVoucherOrderService) AopContext.currentProxy();// 目标对象的原始方法是由代理对象执行的, 此处由spring的API获取代理对象
// 从而确保用户Id的值一样时, 锁就一样
// 4. 返回订单id
return Result.ok(orderId);
}
- SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler())
- 消息队列中接收到消息, 开始处理订单
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 用于在java中执行Lua脚本
static {// 提前lua脚本中的内容读取进来
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 用于数据库的写入操作, 不要求速度, 给一个线程即可
@PostConstruct // 外部类初始化完毕就执行, 随时等待处理消息队列中的订单
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 1. 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2. 判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 2.1 如果获取失败, 说明没有消息, 继续下一次循环
continue;
}
// 3. 解析消息中的订单消息
MapRecord<String, Object, Object> record = list.get(0);//
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 4. 如果获取成功, 可以下单
handleVoucherOrder(voucherOrder);
// 5. ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 2000 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2. 判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 2.1 如果获取失败, 说明pending-list没有消息, 结束循环
break;
}
// 3. 解析消息中的订单消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 4. 如果获取成功, 可以下单
handleVoucherOrder(voucherOrder);
// 5. ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理pending-list订单异常", e);// 如果遇到异常, 继续循环即可
try {
Thread.sleep(20);// 可以没有这一步, 防止太频繁
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}
- handleVoucherOrder
- 消息队列中的订单信息获取成功, 获取分布式锁, 准备开始下单
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 1. 获取用户
Long userId = voucherOrder.getUserId();
// 2. 创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 3. 获取锁
boolean isLock = lock.tryLock();// 根据实际情况, 确保实际业务一定能完成即可
// 4. 判断是否获取锁成功
if (!isLock) {
// 获取锁失败, 返回错误信息或重试
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
lock.unlock();
}
}
- 下单
@Transactional // 对两张表进行操作, 最好加上事务
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 5. 一人一单
Long userId = voucherOrder.getUserId();
// 5.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 5.2 判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过一次!");
return;
}
// 6. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
.update();
if (!success) {
// 扣减失败
log.error("库存不足!");
return ;
}
// 7. 创建订单
save(voucherOrder);
}
5、达人探店
5.1 发布探店笔记
BlogController
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId()); // 保存的博文中需要含有发布者的信息
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
5.2 查看探店笔记
BlogController
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id) {
return blogService.queryBlogById(id);
}
BlogServiceImpl
@Override
public Result queryBlogById(Long id) {
// 1. 查询blog
Blog blog = getById(id);
if (blog == null) {
return Result.fail("笔记不存在!");
}
// 2. 查询blog有关的用户
queryBlogUser(blog);
// 3. 查询blog是否被点赞了
isBlogLiked(blog);
return Result.ok(blog);
}
相关功能:
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
private void isBlogLiked(Blog blog) {
// 1. 获取登录用户
UserDTO user = UserHolder.getUser();
if (user == null) { // 用户未登录, 无需查询是否点赞
return;
}
Long userId = user.getId();
// 2. 判断当前登录用户是否已经点赞
String key = "blog:liked:" + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(score != null);
}
5.3 点赞功能
需求:
- 同一个用户只能点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤:
- 给Blog类中添加一个isLike字段,标示是否被当前用户点赞
- 修改点赞功能,利用Redis的SortedSet集合判断是否点赞过
- 修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
- 修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
- 在Blog添加一个字段
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;
- 点赞功能 (首页和博客详情页)
BlogController
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}
BlogServiceImpl
@Override
public Result likeBlog(Long id) {
// 1. 获取登录用户
Long userId = UserHolder.getUser().getId();
// 2. 判断当前登录用户是否已经点赞
String key = BLOCK_LIKED_KEY + id; // BLOCK_LIKED_KEY = "blog:liked:";
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score == null) {
// 3. 如果未点赞, 可以点赞
// 3.1 数据库点赞数 + 1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
// 3.2 保存用户到redis sortedSet集合 zadd key value score
if (isSuccess) {
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
} else {
// 4. 如果已点赞, 取消点赞
// 4.1 数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
if (isSuccess) {
// 4.2 把用户从redis的set集合移除
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
}
}
return Result.ok();
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
this.queryBlogUser(blog);
this.isBlogLiked(blog);
});
return Result.ok(records);
}
相关方法
private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
private void isBlogLiked(Blog blog) {
// 1. 获取登录用户
UserDTO user = UserHolder.getUser();
if (user == null) { // 用户未登录, 无需查询是否点赞
return;
}
Long userId = user.getId();
// 2. 判断当前登录用户是否已经点赞
String key = "blog:liked:" + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(score != null);
}
5.3 点赞排行榜
在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
Redis数据结构选择: SortedSet
- 所有点赞的人唯一
- 需要进行排序
点赞列表查询
BlogController
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
return blogService.queryBlogLikes(id);
}
BlogServiceImpl
@Override
public Result queryBlogLikes(Long id) {
String key = BLOCK_LIKED_KEY + id; // BLOG_LIKED_KEY
// 1. 查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null || top5.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 2. 解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
// 3. 根据用户id查询用户 WHERE id IN (5 , 1) ORDER BY FIELD(id,5,1)
List<UserDTO> userDTOS = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+ idStr +")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
// 4. 返回
return Result.ok(userDTOS);
}
6、好友关注
6.1 关注和取关
需求:基于该表数据结构,实现两个接口:
- 关注和取关接口
- 判断是否关注的接口
关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示:
FollowServiceImpl
- 关注和取关接口
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 0. 获取登录用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
// 1. 判断到底是关注还是取关
if (isFollow){
// 2. 关注, 新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if (isSuccess) {
// 把关注用户的id, 放入redis的set集合 sadd userId followUserId
stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
}else {
// 3. 取关, 删除 delete from tb_follow where user_id = ? and follow_user_id = ?
boolean isSuccess = remove(new QueryWrapper<Follow>()
.eq("user_id", userId)
.eq("follow_user_id", followUserId));
// 把关注用户的id从Redis集合中移除
if (isSuccess){
stringRedisTemplate.opsForSet().remove(key,followUserId.toString());
}
}
return Result.ok();
}
- 判断是否关注的接口
@Override
public Result isFollow(Long followUserId) {
// 1. 获取登录用户
Long userId = UserHolder.getUser().getId();
// 2. 查询是否关注
Integer count = query().eq("user_id", userId)
.eq("follow_user_id", followUserId).count();
// 3. 判断
return Result.ok(count > 0);
}
6.2 共同关注
FollowServiceImpl
@Override
public Result followCommons(Long id) {
// 1. 获取当前用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
// 2. 求交集
String key2 = "follows:" + id;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
if (intersect == null || intersect.isEmpty()) {
// 无交集
return Result.ok(Collections.emptyList());
}
// 3. 解析出id
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
// 4. 查询用户
List<UserDTO> users = userService.listByIds(ids).stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(users);
}
6.3 Feed流实现方案
Feed流的实现有两种模式:
- Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
- 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
采用Timeline的模式:
- 拉模式
- 推模式
- 推拉结合
拉模式:也叫做读扩散
该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序
优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。
推模式:也叫做写扩散。
推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了
优点:时效快,不用临时拉取
缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去
推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。
推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。
6.4 推送到粉丝收件箱
需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
- 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
- 查询收件箱数据时,可以实现分页查询
Feed流的滚动分页
我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据
BlogServiceImpl
@Override
public Result saveBlog(Blog blog) {
// 1. 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 2. 保存探店笔记
boolean isSuccess = save(blog);
if (!isSuccess) {
return Result.fail("新增笔记失败!");
}
// 3. 查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id = ?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
// 4. 推送笔记id给所有粉丝
for (Follow follow : follows) {
// 4.1 获取粉丝id
Long userId = follow.getUserId();
// 4.2 推送
String key = FEED_KEY + userId;
stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
}
// 5. 返回id
return Result.ok(blog.getId());
}
核心的意思:就是我们在保存完探店笔记后,获得到当前笔记的粉丝,然后把数据推送到粉丝的redis中去。
**滚动分页查询参数: **
ZREVRANGEBYSCORE key Max Min LIMIT offset count
name | value |
---|---|
max | 当前时间戳 |
min | 0 (最小时间戳) |
offset | 0 |
count | 根据与前端的约定, 此处定为2 |
- offset: 表示从Max向后数offset个开始作为查询count中的第一个
6.5 实现分页查询收件箱
需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
具体操作如下:
1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件
2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据
综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳 和偏移量这两个参数。
这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。
- 定义出具体的返回实体类
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}
- BlogController
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset) {
return blogService.queryBlogOfFollow(max,offset);
}
- BlogServiceImpl
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
// 1. 获取当前用户
Long userId = UserHolder.getUser().getId();
// 2. 查询收件箱 ZREVRANGEBYSCORE key MAX MIN LIMIT offset count
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 2);
// 3. 非空判断
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok();
}
// 4. 解析数据: blogId minTime(时间戳) offset
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int os = 1; // 偏移量, 每一个时间戳一定有一个数值
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
// 4.1 获取id
ids.add(Long.valueOf(tuple.getValue()));
// 4.2 获取分数(时间戳)
long time = tuple.getScore().longValue();
if (time == minTime) {
os++;
}else {
minTime = time;
os = 1; // 重置时间戳
}
}
// 5. 根据id查询blog
String idStr = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
for (Blog blog : blogs) { // 每一个博客都要做的事
// 5.1 查询blog有关的用户
queryBlogUser(blog);
// 5.2 查询blog是否被点赞了
isBlogLiked(blog);
}
// 6. 封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}
7、附近商户
7.1 GEO数据结构的基本用法
GEO就是Geolocation的简写形式,代表地理坐标。
- GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
- GEODIST:计算指定的两个点之间的距离并返回
- GEOHASH:将指定member的坐标转为hash字符串形式并返回
- GEOPOS:返回指定member的坐标
- GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
- GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
- GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
7.2 导入店铺数据到GEO
具体场景说明:
我们要做的事情是:将数据库表中的数据导入到redis中去,redis中的GEO,GEO在redis中就一个member和一个经纬度,我们把x和y轴传入到redis做的经纬度位置去,把id作为member
代码:
HmDianPingApplicationTests
@Test
public void loadShopData() throws Exception{
// 1.查询店铺信息
List<Shop> list = shopService.list();
// 2. 把店铺分组,按照typeId分组, id一致的放到一个集合
Map<Long,List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
// 3. 分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
// 3.1 获取类型id
Long typeId = entry.getKey();
String key = "shop:geo:" + typeId;
// 3.2 获取同类型的店铺的集合
List<Shop> value = entry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
// 3.3 写入redis GEOADD key 经度 纬度 member
for (Shop shop : value) {
// stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY()), shop.getId().toString());
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),new Point(shop.getX(),shop.getY()) // 第一个就是member
));
}
stringRedisTemplate.opsForGeo().add(key,locations);
}
}
效果:
7.3 实现附近商户功能
SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM
- 修改pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-data-redis</artifactId>
<groupId>org.springframework.data</groupId>
</exclusion>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
</dependency>
- ShopController
/**
* 根据商铺类型分页查询商铺信息
* @param typeId 商铺类型
* @param current 页码
* @return 商铺列表
*/
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value = "x",required = false) Double x,
@RequestParam(value = "y",required = false) Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}
- ShopServiceImpl
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
// 1. 判断是否需要根据坐标查询
if (x == null || y == null) {
// 不需要坐标查询, 按数据库查询
Page<Shop> pages = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
}
// 2. 计算分页参数
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
// 3. 查询redis, 按照距离排序, 分页, 结果: shopId , distance
String key = SHOP_GEO_KEY + typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDIATANCE
.search(
key,
GeoReference.fromCoordinate(x, y),
new Distance(5000), // 5000m以内, 默认m, 可以指定其他单位
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) // 包含距离, 查到end
);
// 4. 解析出id
if (results == null) {
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
if (list.size() <= from) {
// 没有下一页, 结束
return Result.ok(Collections.emptyList());
}
// 4.1 截取from - end的部分
List<Long> ids = new ArrayList<>(list.size()); // 收集id, 用于查询shops
Map<String,Distance> distanceMap = new HashMap<>(list.size()); // 收集距离
list.stream().skip(from).forEach(result -> {
// 4.2 获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));// 收集id
// 4.3 获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr,distance);// 收集距离
});
// 5. 根据id查询shop
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
// 5.1 将距离封装进shop
for (Shop shop : shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
// 6. 返回
return Result.ok(shops);
}
8、用户签到
8.1 BitMap功能演示
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0.
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示
Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位。
BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT :获取指定位置(offset)的bit值
- BITCOUNT :统计BitMap中值为1的bit位的数量
- BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
- BITOP :将多个BitMap的结果做位运算(与 、或、异或)
- BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
8.2 实现签到功能
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
思路:我们可以把年和月作为bitMap的key,然后保存到一个bitMap中,每次签到就到对应的位上把数字从0变成1,只要对应是1,就表明说明这一天已经签到了,反之则没有签到。
代码
UserController
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
UserServiceImpl
@Override
public Result sign() {
// 1. 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2. 获取日期
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
// 3. 生成key
String key = USER_SIGN_KEY + userId + keySuffix;
// 4. 获取当天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5. 写入Redis
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth - 1,true);
return Result.ok();
}
8.3 统计连续签到天数
**连续签到天数: **
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
得到本月到今天为止的所有签到数据:
BITFIELD key GET u[dayOfMonth] 0 ( dayOfMonth: 总共拿的签到天数, 0: 起始角标 )
假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可
从后向前遍历每个bit位:
让得到的10进制数字和1做与运算,每与一次,就把签到结果向右移动一位,依次类推,就能完成逐个遍历的效果了。
9、UV统计
9.1 HyperLogLog
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。
9.2 测试百万数据的统计
测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何
@Test
public void testHyperLogLog() throws Exception{
String[] values = new String[1000];
int j = 0;
for (int i = 0; i < 1000000; i++) {
j = i % 1000; // 0 ~ 999
values[j] = "user_" + i;
if (j == 999) {
// 发送到Redis
stringRedisTemplate.opsForHyperLogLog().add("hl2",values);
// 分批多次写入,每次写1000条数据
}
}
// 统计数量
Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
System.out.println("count = " + count);
}
经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小
初始:
used_memory:2162472
used_memory_human:2.06M
...
测试后:
used_memory:2176688
used_memory_human:2.08M
...
标签:实战篇,缓存,return,redis,Redis,stringRedisTemplate,key,id
From: https://blog.csdn.net/kiddkid/article/details/142601655